因为最近项目上线,需要同步期初数据-工序,大概有120万数据,采用之前Mybatis批量插入,一次5000的方式,单线程,大概需要近半个小时,后面为了提高效率,采用多线程编程,速度提升了大概2倍,耗时15分钟,同步120万条数据数

采用的是SpringBoot的多线程和@Async和Future

先了解下概念:

此处引用其他网站的解释:

什么是SpringBoot多线程

Spring是通过任务执行器(TaskExecutor)来实现多线程和并发编程,使用ThreadPoolTaskExecutor来创建一个基于线城池的TaskExecutor。在使用线程池的大多数情况下都是异步非阻塞的。我们配置注解@EnableAsync可以开启异步任务。然后在实际执行的方法上配置注解@Async上声明是异步任务

以下写法是单线程一次新增5000数据

public void syncWholeTools() {// 获取最大的AutoIDInteger maxAutoId = readToolsService.getMaxAutoId();final Integer batchNumber = 5000;Integer count = maxAutoId / batchNumber;Integer currentAotoId = 0;//分批次新增for (int i = 0; i < count; i++) {List<ReadToolModel> readToolModelList = readToolsService.getToolListByAutoId(currentAotoId,(i + 1) * batchNumber);currentAotoId = (i + 1) * batchNumber + 1;writeToolService.createBomDetail(readToolModelList);}List<ReadToolModel> readToolModelList = readToolsService.getToolListByAutoId(currentAotoId, maxAutoId);writeToolService.createBomDetail(readToolModelList);}

使用Spring Boot多线程

启动类需要加上@EnableAsync注解

yml配置

tools:core:poolsize: 100max:poolsize: 200queue:capacity: 200keepAlive:seconds: 30thread:name:prefix: tool

配置类:


@Configuration
@EnableAsync
public class AsyncConfig {//接收报文核心线程数@Value("${tools.core.poolsize}")private int toolsCorePoolSize;//接收报文最大线程数@Value("${tools.max.poolsize}")private int toolsMaxPoolSize;//接收报文队列容量@Value("${tools.queue.capacity}")private int toolsQueueCapacity;//接收报文线程活跃时间(秒)@Value("${tools.keepAlive.seconds}")private int toolsKeepAliveSeconds;//接收报文默认线程名称@Value("${tools.thread.name.prefix}")private String toolsThreadNamePrefix;/*** toolsTaskExecutor:(接口的线程池). <br/>** @return TaskExecutor taskExecutor接口* @since JDK 1.8*/@Bean(name = "ToolsTask")public ThreadPoolTaskExecutor toolsTaskExecutor() {//newFixedThreadPoolThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// 设置核心线程数executor.setCorePoolSize(toolsCorePoolSize);// 设置最大线程数executor.setMaxPoolSize(toolsMaxPoolSize);// 设置队列容量executor.setQueueCapacity(toolsQueueCapacity);// 设置线程活跃时间(秒)executor.setKeepAliveSeconds(toolsKeepAliveSeconds);// 设置默认线程名称executor.setThreadNamePrefix(toolsThreadNamePrefix);// 设置拒绝策略// rejection-policy:当pool已经达到max size的时候,如何处理新任务// CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 等待所有任务结束后再关闭线程池executor.setWaitForTasksToCompleteOnShutdown(true);executor.initialize();return executor;}
}

数据新增的操作

@Component
public class SyncToolsHandler {private static final Logger LOG = LoggerFactory.getLogger(SyncToolsHandler.class);@AutowiredWriteToolService writeToolService;@Async(value = "ToolTask")public Future <String> syncTools(List <ReadToolModel> readToolList, int pageIndex) {System.out.println("thread name " + Thread.currentThread().getName());LOG.info(String.format("此批数据的段数为:%s 此段数据的数据条数为:%s", pageIndex, readToolList.size()));//声明future对象Future <String> result = new AsyncResult <String>("");//循环遍历if (null != readToolList && readToolList.size() > 0) {try {int listSize = readToolList.size();int listStart = 0, listEnd = 0;int ropeNum = listSize/2000;/*** 假设101条数据每次40,总共循环101/4=2* 0<=X<40     i=0  40*i   40*i+40* 40<=X<80   i=1  40*i   40*i+40* 80<=X<101  40*i+40   101*/for(int i = 0 ; i < ropeNum; i++) {//数据入库操作listStart = i *2000;listEnd = i * 2000 +2000;writeToolService.createBomDetail(readToolList.subList(listStart, listEnd));}writeToolService.createBomDetail(readToolList.subList(listEnd, listSize));} catch (Exception e) {//记录出现异常的时间,线程nameresult = new AsyncResult <String>("fail,time=" + System.currentTimeMillis() + ",thread id=" + Thread.currentThread().getName() + ",pageIndex=" + pageIndex);}  }return result;}}

创建线程分批传入Future:

@Service
public class ToolsThread {private static final Logger LOG = LoggerFactory.getLogger(ToolsThread.class);@Autowiredprivate SyncToolsHandler syncToolsHandler;@AutowiredReadToolsService readToolsService;// 核心线程数@Value("${book.core.poolsize}")private int threadSum;public void receiveBookJobRun() {List<ReadToolModel> readToolModels = new ArrayList<ReadToolModel>();readToolModels = readToolsService.getToolListByAutoId(0, 5000000);// 入库开始时间Long inserOrUpdateBegin = System.currentTimeMillis();LOG.info("数据更新开始时间:" + inserOrUpdateBegin);// 接收集合各段的 执行的返回结果List<Future<String>> futureList = new ArrayList<Future<String>>();// 集合总条数if (readToolModels != null) {// 将集合切分的段数(2*CPU的核心数)int threadSum = 2 * Runtime.getRuntime().availableProcessors();int listSize = readToolModels.size();int listStart, listEnd;// 当总条数不足threadSum条时 用总条数 当做线程切分值if (threadSum > listSize) {threadSum = listSize;}// 将list 切分多份 多线程执行for (int i = 0; i < threadSum; i++) {// 计算切割 开始和结束listStart = listSize / threadSum * i;listEnd = listSize / threadSum * (i + 1);// 最后一段线程会 出现与其他线程不等的情况if (i == threadSum - 1) {listEnd = listSize;}// 数据切断List<ReadToolModel> readToolList= readToolModels.subList(listStart, listEnd);// 每段数据集合并行入库futureList.add(syncToolsHandler.syncTools(readToolList, i));}// 对各个线程段结果进行解析for (Future<String> future : futureList) {String str;if (null != future) {try {str = future.get().toString();LOG.info("current thread id =" + Thread.currentThread().getName() + ",result=" + str);} catch (ExecutionException | InterruptedException e) {LOG.info("线程运行异常!");}} else {LOG.info("线程运行异常!");}}}Long inserOrUpdateEnd = System.currentTimeMillis();LOG.info("数据更新结束时间:" + inserOrUpdateEnd + "。此次更新数据花费时间为:" + (inserOrUpdateEnd - inserOrUpdateBegin));}
}

同步时间大概15分钟

2020-08-08 16:58:29 [main] INFO  com.commons.service.sync.ToolsThread -数据更新结束时间:1596877109637。此次更新数据花费时间为:990284

Spring Boot 多线程数据同步相关推荐

  1. Spring Boot与数据访问

    Spring Boot与数据访问 JBDC.MyBatis.Spring Data JPA 一.简介 对于数据访问层,无论是SQL还是NOSQL,Spring Boot默认采用整合 Spring Da ...

  2. Spring Boot - 构建数据访问层

    文章目录 基础规范: JDBC 关系型数据库访问规范 JDBC 规范中的核心编程对象 DriverManager DataSource Connection Statement/PreparedSta ...

  3. java 多线程跑数据_java——多线程的实现方式、三种办法解决线程赛跑、多线程数据同步(synchronized)、死锁...

    多线程的实现方式:demo1.demo2 demo1:继承Thread类,重写run()方法 packagethread_test;public class ThreadDemo1 extendsTh ...

  4. Spring boot(6) 数据访问

    Spring boot(6) 数据访问 学习视频:https://www.bilibili.com/video/BV19K4y1L7MT?p=62&spm_id_from=pageDriver ...

  5. 护网必备技能:Spring Boot 接口数据加解密 功能实现

    护网必备技能:Spring Boot 接口数据加解密 功能实现 文章目录 护网必备技能:Spring Boot 接口数据加解密 功能实现 1. 尽量少改动,不影响之前的业务逻辑: 2. 考虑到时间紧迫 ...

  6. java数据同步解决方案_Java实现多线程数据同步的几种方法

    1. 应用背景 程序在设计当中如果采取多线程操作的时候,如果操作的对象是一个的话,由于多个线程共享同一块内存空间,因此经常会遇到数据安全访问的问题,下面看一个经典的问题,银行取钱的问题:1).你有一张 ...

  7. 使用Spring Data MongoDB和Spring Boot进行数据聚合

    MongoDB聚合框架旨在对文档进行分组并将其转换为聚合结果. 聚合查询包括定义将在管道中执行的几个阶段. 如果您对有关该框架的更深入的细节感兴趣,那么 mongodb docs是一个很好的起点. 这 ...

  8. 具有Spring Boot和数据功能的Java头优先弹性搜索

    在本文中,我将为您提供有关如何在Java项目中使用Elastic Search的简单介绍. 由于Spring Boot是开始我们项目的最简单,最快的方法,因此我选择使用它. 此外,我们将大量使用心爱的 ...

  9. Spring Boot 接口数据加解密,so easy!

    今天这篇文章聊一聊接口安全问题,涉及到接口的加密.解密 和产品.前端同学对外需求后,梳理了相关技术方案, 主要的需求点如下: 尽量少改动,不影响之前的业务逻辑: 考虑到时间紧迫性,可采用对称性加密方式 ...

最新文章

  1. TestNG 使 Java 单元测试轻而易举
  2. StringBuilder与StringBuffer比较
  3. Asp.net Ajax AutoComplete 控件的用法
  4. .net aspose.words 域加载图片_使用Python批量替换csdn文章的图片链接
  5. 密码学加密算法分类_密码学中的国际数据加密算法(IDEA)
  6. 工作275:表单验证重置
  7. php form表单属性,HTML5 表单属性
  8. oracle怎么装系统,【Oracle安装与操作系统用户组】
  9. 全国计算机一级ms office考试题型,全国计算机等级考试一级MS Office题型剖析
  10. Thinkphp新增字段无法插入到数据库问题
  11. sap 中migo收货自动打印smartform_EWM MES/ERP集成 生产收货的几种方式
  12. Summernote个性化定制使用帮助(三)
  13. 第二次项目冲刺(Beta阶段)--第五天
  14. greensock 框架
  15. 华为管理学案例分析_华为案例分析——管理学作业.ppt
  16. 日期计算器输入天数计算日期_如何在Windows计算器中执行日期计算
  17. SHOI 2008 仙人掌图 BZOJ 1023
  18. APP开发中这十个细节能直接影响到用户体验,那么如何提升用户体验?
  19. 关于Obj-c代码静态扫描 iPhone代码静态扫描的问题(clang-analyzer)
  20. 设计模式讲解与代码实践(二十三)——模板方法

热门文章

  1. 生命本来就是一个奇迹
  2. 如何使用Microsoft的Support站点
  3. 荒野行动计算机中丢失api,荒野行动PC版提示没有Normaliz.dll怎么办 没有Normaliz.dll解决方法...
  4. 解决命令行运行python文件,出现No module named *** 报错问题
  5. WACOM数位板没有压感问题的解决步骤
  6. Axes Studio工作室章程
  7. 未来最赚钱的17大行业:云计算、大数据、虚拟现实、人工智能……
  8. dboxShare 开源企业网盘系统v2.0.0.2011
  9. 制造业数字化转型的五大法则!
  10. 计算机投影仪的作用是什么,投影仪怎么连接电脑呢?操作方法是什么呢?