这次写的功能是 实现springbatch的job每次调用、每次都会执行job里查询mysql列表的reader。因为碰到一个问题,批处理服务启动之后,调用job时,springbatch始终只处理一次,下次调用 显示已完成,原因就是初始化列表是服务启动时填充,只会填充一次,使用完之后不会再次填充列表,所以下次进入job数据为空。我们现在就解决这个问题,让job的每次调用都会从数据库获取最新的列表,并且批处理掉这份数据。

继上一次springbatch读取rabbitmq文章 继续编写。

因为采用的是通用设计,所以使用mybatis读取mysql列表同样也是。

废话不多说我们这次使用到的核心是 MyBatisCursorItemReader 。MyBatisCursorItemReader 是mybatis专门为了springbatch游标读取编写的。

首先我们修改原来的BatchConfig类 加入sqlSessionFactory方法,此处实例化使用的单例,因为使用到的次数比较多,为了节约内存资源,所以这么设计。

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;

import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.data.mongodb.core.MongoTemplate;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

@Configuration

public class BatchConfig {

@Autowired

protected JobBuilderFactory jobBuilderFactory;

@Autowired

protected StepBuilderFactory stepBuilderFactory;

@Autowired

protected RabbitTemplate amqpTemplate;

@Autowired

protected MongoTemplate mongoTemplate;

@Autowired

private MybatisProperties properties;

private static SqlSessionFactoryBean sessionFactory;

//构造线程

@Bean

protected ThreadPoolTaskExecutor taskExecutor(){

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

executor.setCorePoolSize(10);

executor.setMaxPoolSize(15);

executor.setKeepAliveSeconds(300);

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());

executor.setQueueCapacity(10000);

executor.setThreadGroupName("spring_batch");

return executor;

}

public SqlSessionFactory sqlSessionFactory() {

try {

if(sessionFactory == null){

sessionFactory = new SqlSessionFactoryBean();

sessionFactory.setDataSource(SpringContextHolder.getBean("dataSource"));

sessionFactory.setMapperLocations(properties.resolveMapperLocations());

return sessionFactory.getObject();

}

return sessionFactory.getObject();

} catch (Exception e) {

e.printStackTrace();

return null;

}

}

}

另外编写了专门使用mybatis循环读取mysql列表

import org.apache.ibatis.session.SqlSessionFactory;

import org.mybatis.spring.batch.MyBatisCursorItemReader;

import org.springframework.batch.item.ItemReader;

import java.util.Map;

/**

* Description: 用mybatis返回list 循环读取值

* @date: 2019/4/28 9:31

*/

public class ReadToListByMybatis {

/**

* Description: 读取mybatis list

* @param: 查询方法需要的参数 ,queryId 可反射的mybatis接口的ID

* example:

* parameterValues.put("createDate", "2019-04-28");

* parameterValues.put("queryId", "com.xx.xx.batch.dao.XXDao.select");

*

* @date: 2019/4/28 9:28

*/

public static ItemReader myBatisCursorItemReader(MapparameterValues, SqlSessionFactory sqlSessionFactory) {

MyBatisCursorItemReader reader = new MyBatisCursorItemReader();

reader.setSqlSessionFactory(sqlSessionFactory);

reader.setParameterValues(parameterValues);

reader.setQueryId(parameterValues.get("queryId").toString());

return reader;

}

}

现在我们修改自定义config 使用mybatis从mysql中读取

import com.xx.xx.config.BatchConfig;

import com.xx.xx.batch.dao.UserDao;

import com.xx.xx.batch.entity.User;

import com.xx.xx.batch.listener.UserJobCompletionListener;

import com.xx.xx.batch.processor.UserProcessor;

import com.xx.xx.batch.read.RabbitRead;

import com.xx.xx.batch.writer.MongoWriter;

import com.xx.xx.batch.writer.MysqlWriter;

import org.springframework.batch.core.Job;

import org.springframework.batch.core.JobExecutionListener;

import org.springframework.batch.core.Step;

import org.springframework.batch.core.launch.support.RunIdIncrementer;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

@Configuration

public class UserBatchConfig extends BatchConfig {

@Resource

private UserDao userDao;

private Mapparam;

@Bean

public Step userStep(){

param.put("queryId", "com.xx.xx.batch.dao.UserDao.select");

return stepBuilderFactory.get("userStep").chunk(10000)

.reader(ReadToListByMybatis.myBatisCursorItemReader(param,sqlSessionFactory())).processor(new UserProcessor()).writer(new MysqlWriter(userDao)).taskExecutor(taskExecutor())

.build();

}

@Bean

public Step userStep2(){

//业务逻辑可自写

/*param.put("queryId", "com.xx.xx.batch.dao.UserDao.select");

return stepBuilderFactory.get("userStep").chunk(10000)

.reader(ReadToListByMybatis.myBatisCursorItemReader(param,sqlSessionFactory())).processor(new UserProcessor()).writer(new MysqlWriter(userDao)).taskExecutor(taskExecutor())

.build();*/

}

@Bean

public Job processJob1(){

param = new HashMap();

param.put("createDate", "2019-04-28");

// 此处userStep2 代表可以使用多个step 执行完第一个step之后就开始执行step2

return jobBuilderFactory.get("processJob1").

incrementer(new RunIdIncrementer()).listener(listener()).

flow(userStep()).next(userStep2()).end().build();

}

// 监听事件

@Bean

public JobExecutionListener listener() {

return new UserJobCompletionListener();

}

}

接下来我们修改UserDao增加id方法

@Mapper

public interface UserDaoextends MysqlCommonDao{

@Override

Integer add(T user);

@Override

List select(Mapmap);

}

在修改mapper文件

insert into user_batch_test values(#{id},#{age},#{name})

SELECT * FROM user

1=1 and create_date IS NOT NULL

and create_date = #{date}

最后 ,在启动项目的时候 springbatch会初始化任务以及任务里的reader,在调用任务job时,批处理掉从mysql查出来的列表。

下次调用任务job时,又会重新的读取mysql查询最新列表并处理。

mysql mybatis list循环_Springbatch集成mybatis循环批量读取mysql相关推荐

  1. Java 捕获 mybatis异常_3 springboot集成mybatis和全局异常捕获

    mybatis有两种方式,一种是基于XML,一种是基于注解 springboot集成mybatis 首先先创建表,这里都简化了 DROP TABLE IF EXISTS `user`; CREATE ...

  2. python读取mysql中的数据_Python笔记:用Python读取MySQL中的数据

    Python处理数据分析的优势,很多人都知道(可以实现更复杂.更灵活的操作,包括数据预处理.数据可视化输出等),但是加载到Python中的数据,只是临时存储在内存中的一张虚拟表(退出之后就会被释放掉了 ...

  3. mybatis类型转换配置(springboot集成mybatis的配置)

    使用mybatis将string转为blob存入数据库时出现ora-01465异常,无效的十六进制转换!求解,求解 oracle中用于保存位串的数据类型是RAW,LONGRAW(推荐使用BLOB). ...

  4. 二级菜单从mysql中取_MyEclipes中如何如何让通过读取Mysql中的数据来实现二级菜单分类...

    展开全部 示例:#mysql 表`32313133353236313431303231363533e78988e69d8331333332633663test1`结构: mysql> desc  ...

  5. mac xampp连接mysql数据库_请问在mac下xampp无法读取mysql的数据

    报错信息如下: Fatal error: Uncaught Error: Call to undefined function mysql_connect() in /Applications/XAM ...

  6. 第 5 课 SpringBoot集成Mybatis(2)-配置文件版

    第五课 SpringBoot集成Mybatis(2)-配置文件版 文章目录 第五课 SpringBoot集成Mybatis(2)-配置文件版 1. 引入依赖:pom.xml 2. 配置applicat ...

  7. python循环批量读取tushare财务数据

    前段时间接到有个需求,需要获取A股中的1570个公司的财务数据,在tushare官方的代码当中,一次只能获取一个公司的数据,那就需要利用循环批量地去读取.(个人的tushareID:498867) 公 ...

  8. 华南农业大学Linux课程综合实验-超详细版(实现用Go、nodejs、python、php读取mysql数据)

    文章目录 一.准备工作 1. 领取阿里云服务器 2. 服务器初始设置 2.1 设置实例密码 2.1.1 找不到控制台页面 2.2 远程登录云服务器 2.3 修改云服务器密码 2.4 实现自动远程连接 ...

  9. mybatis获取mysql源数据类型_spring集成mybatis实现mysql数据库读写分离

    前言 在网站的用户达到一定规模后,数据库因为负载压力过高而成为网站的瓶颈.幸运的是目前大部分的主流数据库都提供主从热备功能,通过配置两台数据库主从关系,可以将一台数据库的数据更新同步到另一台服务器上. ...

最新文章

  1. [tire+最短路]Bless You Autocorrect!
  2. java对密码进行加密的方法_如何在JAVA中使用MD5加密对密码进行加密
  3. 国庆期间,我造了台计算机
  4. cocos 新工程遇到的问题
  5. H264 视频文件 帧格式 传输封装等 杂碎
  6. canvas笔记-lineCap的使用
  7. CVPR2021 Oral《Seeing Out of the Box》北科大中山大学微软提出端到端视觉语言表征预训练方法...
  8. 【TensorFlow】TensorFlow从浅入深系列之六 -- 教你深入理解经典损失函数(交叉熵、均方误差)
  9. Service,测试
  10. 时区转换问题java编码,Java UTC ZonedDateTime转换成指定时区时间方法及使用示例代码...
  11. cad卸载不干净_卸载软件不干净?这样做,完全没有残留
  12. 求最大值 最小值 下标 及格率 c语言,输入某班的C语言成绩,计算输出其及格率...
  13. 界面音效以及3D音效通过参数控制声音加载声音资源包
  14. 不用Maven打jar包
  15. android生成md5,使用Android studio生成签名文件以及获取MD5
  16. 日订单量达到100万单后,我们做了订单中心重构
  17. dbavear 连接hive
  18. java怎么计算时间差_请问java怎么计算时间差
  19. html存储数据的方法,数据存储方式有哪些
  20. 如何在一个月内快速瘦20斤?

热门文章

  1. postgresql|数据库|基于本地备份的远程备份策略
  2. Java 字符串 压缩 解压
  3. 出海的中国企业,为什么有80%都选择了这家云服务商?
  4. 《SpringBoot篇》12.@Valid与@Validated的区别
  5. linux查看jvm内存
  6. Dockerfile 详解
  7. 首页搬迁 - 一步一步指导搬家
  8. 电脑微信2.9.0测试版更新内容
  9. Dubbo原理解析(非常透彻)
  10. 产品设计在生活中——设计与生产的关系