Spring Boot 多线程数据同步
因为最近项目上线,需要同步期初数据-工序,大概有120万数据,采用之前Mybatis批量插入,一次5000的方式,单线程,大概需要近半个小时,后面为了提高效率,采用多线程编程,速度提升了大概2倍,耗时15分钟,同步120万条数据数
采用的是SpringBoot的多线程和@Async和Future
先了解下概念:
此处引用其他网站的解释:
什么是SpringBoot多线程
Spring是通过任务执行器(TaskExecutor)来实现多线程和并发编程,使用ThreadPoolTaskExecutor来创建一个基于线城池的TaskExecutor。在使用线程池的大多数情况下都是异步非阻塞的。我们配置注解@EnableAsync
可以开启异步任务。然后在实际执行的方法上配置注解@Async
上声明是异步任务
以下写法是单线程一次新增5000数据
public void syncWholeTools() {// 获取最大的AutoIDInteger maxAutoId = readToolsService.getMaxAutoId();final Integer batchNumber = 5000;Integer count = maxAutoId / batchNumber;Integer currentAotoId = 0;//分批次新增for (int i = 0; i < count; i++) {List<ReadToolModel> readToolModelList = readToolsService.getToolListByAutoId(currentAotoId,(i + 1) * batchNumber);currentAotoId = (i + 1) * batchNumber + 1;writeToolService.createBomDetail(readToolModelList);}List<ReadToolModel> readToolModelList = readToolsService.getToolListByAutoId(currentAotoId, maxAutoId);writeToolService.createBomDetail(readToolModelList);}
使用Spring Boot多线程
启动类需要加上@EnableAsync注解
yml配置
tools:core:poolsize: 100max:poolsize: 200queue:capacity: 200keepAlive:seconds: 30thread:name:prefix: tool
配置类:
@Configuration
@EnableAsync
public class AsyncConfig {//接收报文核心线程数@Value("${tools.core.poolsize}")private int toolsCorePoolSize;//接收报文最大线程数@Value("${tools.max.poolsize}")private int toolsMaxPoolSize;//接收报文队列容量@Value("${tools.queue.capacity}")private int toolsQueueCapacity;//接收报文线程活跃时间(秒)@Value("${tools.keepAlive.seconds}")private int toolsKeepAliveSeconds;//接收报文默认线程名称@Value("${tools.thread.name.prefix}")private String toolsThreadNamePrefix;/*** toolsTaskExecutor:(接口的线程池). <br/>** @return TaskExecutor taskExecutor接口* @since JDK 1.8*/@Bean(name = "ToolsTask")public ThreadPoolTaskExecutor toolsTaskExecutor() {//newFixedThreadPoolThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// 设置核心线程数executor.setCorePoolSize(toolsCorePoolSize);// 设置最大线程数executor.setMaxPoolSize(toolsMaxPoolSize);// 设置队列容量executor.setQueueCapacity(toolsQueueCapacity);// 设置线程活跃时间(秒)executor.setKeepAliveSeconds(toolsKeepAliveSeconds);// 设置默认线程名称executor.setThreadNamePrefix(toolsThreadNamePrefix);// 设置拒绝策略// rejection-policy:当pool已经达到max size的时候,如何处理新任务// CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 等待所有任务结束后再关闭线程池executor.setWaitForTasksToCompleteOnShutdown(true);executor.initialize();return executor;}
}
数据新增的操作:
@Component
public class SyncToolsHandler {private static final Logger LOG = LoggerFactory.getLogger(SyncToolsHandler.class);@AutowiredWriteToolService writeToolService;@Async(value = "ToolTask")public Future <String> syncTools(List <ReadToolModel> readToolList, int pageIndex) {System.out.println("thread name " + Thread.currentThread().getName());LOG.info(String.format("此批数据的段数为:%s 此段数据的数据条数为:%s", pageIndex, readToolList.size()));//声明future对象Future <String> result = new AsyncResult <String>("");//循环遍历if (null != readToolList && readToolList.size() > 0) {try {int listSize = readToolList.size();int listStart = 0, listEnd = 0;int ropeNum = listSize/2000;/*** 假设101条数据每次40,总共循环101/4=2* 0<=X<40 i=0 40*i 40*i+40* 40<=X<80 i=1 40*i 40*i+40* 80<=X<101 40*i+40 101*/for(int i = 0 ; i < ropeNum; i++) {//数据入库操作listStart = i *2000;listEnd = i * 2000 +2000;writeToolService.createBomDetail(readToolList.subList(listStart, listEnd));}writeToolService.createBomDetail(readToolList.subList(listEnd, listSize));} catch (Exception e) {//记录出现异常的时间,线程nameresult = new AsyncResult <String>("fail,time=" + System.currentTimeMillis() + ",thread id=" + Thread.currentThread().getName() + ",pageIndex=" + pageIndex);} }return result;}}
创建线程分批传入Future:
@Service
public class ToolsThread {private static final Logger LOG = LoggerFactory.getLogger(ToolsThread.class);@Autowiredprivate SyncToolsHandler syncToolsHandler;@AutowiredReadToolsService readToolsService;// 核心线程数@Value("${book.core.poolsize}")private int threadSum;public void receiveBookJobRun() {List<ReadToolModel> readToolModels = new ArrayList<ReadToolModel>();readToolModels = readToolsService.getToolListByAutoId(0, 5000000);// 入库开始时间Long inserOrUpdateBegin = System.currentTimeMillis();LOG.info("数据更新开始时间:" + inserOrUpdateBegin);// 接收集合各段的 执行的返回结果List<Future<String>> futureList = new ArrayList<Future<String>>();// 集合总条数if (readToolModels != null) {// 将集合切分的段数(2*CPU的核心数)int threadSum = 2 * Runtime.getRuntime().availableProcessors();int listSize = readToolModels.size();int listStart, listEnd;// 当总条数不足threadSum条时 用总条数 当做线程切分值if (threadSum > listSize) {threadSum = listSize;}// 将list 切分多份 多线程执行for (int i = 0; i < threadSum; i++) {// 计算切割 开始和结束listStart = listSize / threadSum * i;listEnd = listSize / threadSum * (i + 1);// 最后一段线程会 出现与其他线程不等的情况if (i == threadSum - 1) {listEnd = listSize;}// 数据切断List<ReadToolModel> readToolList= readToolModels.subList(listStart, listEnd);// 每段数据集合并行入库futureList.add(syncToolsHandler.syncTools(readToolList, i));}// 对各个线程段结果进行解析for (Future<String> future : futureList) {String str;if (null != future) {try {str = future.get().toString();LOG.info("current thread id =" + Thread.currentThread().getName() + ",result=" + str);} catch (ExecutionException | InterruptedException e) {LOG.info("线程运行异常!");}} else {LOG.info("线程运行异常!");}}}Long inserOrUpdateEnd = System.currentTimeMillis();LOG.info("数据更新结束时间:" + inserOrUpdateEnd + "。此次更新数据花费时间为:" + (inserOrUpdateEnd - inserOrUpdateBegin));}
}
同步时间大概15分钟
2020-08-08 16:58:29 [main] INFO com.commons.service.sync.ToolsThread -数据更新结束时间:1596877109637。此次更新数据花费时间为:990284
Spring Boot 多线程数据同步相关推荐
- Spring Boot与数据访问
Spring Boot与数据访问 JBDC.MyBatis.Spring Data JPA 一.简介 对于数据访问层,无论是SQL还是NOSQL,Spring Boot默认采用整合 Spring Da ...
- Spring Boot - 构建数据访问层
文章目录 基础规范: JDBC 关系型数据库访问规范 JDBC 规范中的核心编程对象 DriverManager DataSource Connection Statement/PreparedSta ...
- java 多线程跑数据_java——多线程的实现方式、三种办法解决线程赛跑、多线程数据同步(synchronized)、死锁...
多线程的实现方式:demo1.demo2 demo1:继承Thread类,重写run()方法 packagethread_test;public class ThreadDemo1 extendsTh ...
- Spring boot(6) 数据访问
Spring boot(6) 数据访问 学习视频:https://www.bilibili.com/video/BV19K4y1L7MT?p=62&spm_id_from=pageDriver ...
- 护网必备技能:Spring Boot 接口数据加解密 功能实现
护网必备技能:Spring Boot 接口数据加解密 功能实现 文章目录 护网必备技能:Spring Boot 接口数据加解密 功能实现 1. 尽量少改动,不影响之前的业务逻辑: 2. 考虑到时间紧迫 ...
- java数据同步解决方案_Java实现多线程数据同步的几种方法
1. 应用背景 程序在设计当中如果采取多线程操作的时候,如果操作的对象是一个的话,由于多个线程共享同一块内存空间,因此经常会遇到数据安全访问的问题,下面看一个经典的问题,银行取钱的问题:1).你有一张 ...
- 使用Spring Data MongoDB和Spring Boot进行数据聚合
MongoDB聚合框架旨在对文档进行分组并将其转换为聚合结果. 聚合查询包括定义将在管道中执行的几个阶段. 如果您对有关该框架的更深入的细节感兴趣,那么 mongodb docs是一个很好的起点. 这 ...
- 具有Spring Boot和数据功能的Java头优先弹性搜索
在本文中,我将为您提供有关如何在Java项目中使用Elastic Search的简单介绍. 由于Spring Boot是开始我们项目的最简单,最快的方法,因此我选择使用它. 此外,我们将大量使用心爱的 ...
- Spring Boot 接口数据加解密,so easy!
今天这篇文章聊一聊接口安全问题,涉及到接口的加密.解密 和产品.前端同学对外需求后,梳理了相关技术方案, 主要的需求点如下: 尽量少改动,不影响之前的业务逻辑: 考虑到时间紧迫性,可采用对称性加密方式 ...
最新文章
- TestNG 使 Java 单元测试轻而易举
- StringBuilder与StringBuffer比较
- Asp.net Ajax AutoComplete 控件的用法
- .net aspose.words 域加载图片_使用Python批量替换csdn文章的图片链接
- 密码学加密算法分类_密码学中的国际数据加密算法(IDEA)
- 工作275:表单验证重置
- php form表单属性,HTML5 表单属性
- oracle怎么装系统,【Oracle安装与操作系统用户组】
- 全国计算机一级ms office考试题型,全国计算机等级考试一级MS Office题型剖析
- Thinkphp新增字段无法插入到数据库问题
- sap 中migo收货自动打印smartform_EWM MES/ERP集成 生产收货的几种方式
- Summernote个性化定制使用帮助(三)
- 第二次项目冲刺(Beta阶段)--第五天
- greensock 框架
- 华为管理学案例分析_华为案例分析——管理学作业.ppt
- 日期计算器输入天数计算日期_如何在Windows计算器中执行日期计算
- SHOI 2008 仙人掌图 BZOJ 1023
- APP开发中这十个细节能直接影响到用户体验,那么如何提升用户体验?
- 关于Obj-c代码静态扫描 iPhone代码静态扫描的问题(clang-analyzer)
- 设计模式讲解与代码实践(二十三)——模板方法
热门文章
- 生命本来就是一个奇迹
- 如何使用Microsoft的Support站点
- 荒野行动计算机中丢失api,荒野行动PC版提示没有Normaliz.dll怎么办 没有Normaliz.dll解决方法...
- 解决命令行运行python文件,出现No module named *** 报错问题
- WACOM数位板没有压感问题的解决步骤
- Axes Studio工作室章程
- 未来最赚钱的17大行业:云计算、大数据、虚拟现实、人工智能……
- dboxShare 开源企业网盘系统v2.0.0.2011
- 制造业数字化转型的五大法则!
- 计算机投影仪的作用是什么,投影仪怎么连接电脑呢?操作方法是什么呢?