文章目录

  • 1、SpringBatch概述
    • 1.1 概述
    • 1.2 框架主要有以下功能:
    • 1.3 框架一共有4个角色:
  • 2.搭建SpringBatch项目
    • 2.1 [用Spring initializr搭建](https://start.spring.io/)
    • 2.2 解压缩导入IDEA
    • 2.3 启动项目前添加数据库驱动依赖
  • 3.SpringBatch入门程序
    • 3.1 创建confing包写一个类JobConfiguration
    • 3.2 添加Job、Step注入
    • 3.3 创建任务对象(代码如下)
  • 4. h2数据库替换为Mysql
    • 4.1 配置yml
  • 5. 核心API
  • 6. Job的创建和使用(代码如下)
  • 7.Flow的创建和使用
    • 7.1创建FlowDemo测试一下
  • 8.split实现并发执行
    • 8.1借助split创建SplitDemo 测试
  • 9. 决策器的使用
    • 9.1创建决策器
    • 9.2借助决策器创建DeciderDemo测试
  • 10、Job嵌套
    • 10.1 案例:创建两个Job,作为子Job
    • 10.2 再创建一个Job作为父Job
    • 10.3 启动前配置一下yml指定父类
  • 11.监听器的使用
    • 11.1接口实现监听器
    • 11.2 注解实现监听器
    • 11.3 借助监听器类创建ListenerDemo测试
  • 12.Job的参数
    • 12.1、案例:
  • 13、ItemReader概述
  • 13.1、案例
  • 14、从数据中读取数据
    • 14.1案列
  • 15.从普通文件中读取数据
    • 15.1 案例
  • 16.从xml文件中读取数据
    • 16.1案例
  • 17.从多个文件中读取数据
    • 17.1案例
  • 18. 数据输出 ItemWriter概述
    • 18.1案例

1、SpringBatch概述

1.1 概述

Spring Batch是一个轻量级的、完善的批处理应用框架,旨在支持企业系统建立健壮、高效的批处理应用。Spring Batch是Spring的一个子项目,使用java语言并基于Spring框架为基础开发,使得已经使用Spring框架开发者或企业更容易访问和利用企业服务。

Spring Batch提供了可重用的功能,这些功能对于处理大量的数据至关重要,包括记录/跟踪,事务管理,作业处理统计,作业重启,跳过和资源管理。 它还提供更高级的技术服务和功能,通过优化和分区技术实现极高容量和高性能的批处理作业。 Spring Batch可用于两种简单的用例(例如将文件读入数据库或运行存储过程)以及复杂的大量用例(例如在数据库之间移动大量数据,转换它等等) 上)。 大批量批处理作业可以高度可扩展的方式利用该框架来处理大量信息。

然而Spring Batch不是一个调度框架,它只关注于任务的处理,如日志监控、事务、并发问题等,但是它可以与其它调度框架一起联合使用,完成相应的调度任务,如Quartz、Tivoli、Control-M等。

1.2 框架主要有以下功能:

  • Transaction management(事务管理)

  • Chunk based processing(基于块的处理)

  • Declarative I/O(声明式的输入输出)

  • Start/Stop/Restart(启动/停止/再启动)

  • Retry/Skip(重试/跳过)

1.3 框架一共有4个角色:

  • JobLauncher是任务启动器 ,通过它来启动任务,可以看作是程序的入口。
  • Job表示一个具体的任务,Step代表着一个具体的步骤,一个任务可以包含一个Step(想象把大象放将冰箱这个任务需要多少个步骤你就明白了),也可以包含多个Step,由任务启动器进行启动。任务的具体执行内容,一个Step的执行过程包括读数据(ItemReader)、处理数据(ItemProcessor)、写数据(ItemWriter)。
  • JobRepository是存储数据的地方,可以看做是一个数据库的接口,在任务执行的时候需要通过它记录任务状态等等信息。

2.搭建SpringBatch项目

2.1 用Spring initializr搭建

2.2 解压缩导入IDEA


2.3 启动项目前添加数据库驱动依赖

     <dependency><groupId>com.h2database</groupId><artifactId>h2</artifactId><scope>runtime</scope></dependency>

3.SpringBatch入门程序

3.1 创建confing包写一个类JobConfiguration

  • 添加注解

3.2 添加Job、Step注入

3.3 创建任务对象(代码如下)

    @Beanpublic Job helloWorldJob(){return jobBuilderFactory.get("helloWorldJob").start(step1()).build();}@Bean//核心思想基于任务,而任务执行的是steppublic Step step1() {return stepBuilderFactory.get("step1").tasklet(new Tasklet() {//执行的功能@Override//execute需要RepeatStatus的一个返回值public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {System.out.println("Hello World");//指定RepeatStatus的状态值return RepeatStatus.FINISHED;}}).build();}

4. h2数据库替换为Mysql

  • 添加依赖
     <dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency>

4.1 配置yml

spring:datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/batch?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&autoReconnect=true&allowMultiQueries=trueusername: rootpassword: rootschema: classpath:/org/springframework/batch/core/schema-mysql.sqlbatch:initialize-schema: always

-再次启动程序去数据库刷新会

  • 持久化相关的信息

5. 核心API


上图介绍了任务Job的一些相关概念:

  • Job:封装处理实体,定义过程逻辑。
  • JobInstance:Job的运行实例,不同的实例,参数不同,所以定义好一个Job后可以通过不同参数运行多次。
  • JobParameters:与JobInstance相关联的参数。
  • JobExecution:代表Job的一次实际执行,可能成功、可能失败。
    所以,开发人员要做的事情,就是定义Job。

下图介绍了step步骤的一些相关概念:

Step是对Job某个过程的封装,一个Job可以包含一个或多个Step,一步步的Step按特定逻辑执行,才代表Job执行完成。

通过定义Step来组装Job可以更灵活地实现复杂的业务逻辑。

** 输入——处理——输出**

所以,定义一个Job关键是定义好一个或多个Step,然后把它们组装好即可。而定义Step有多种方法,但有一种常用的模型就是输入——处理——输出,即Item Reader、Item Processor和Item Writer。比如通过Item Reader从文件输入数据,然后通过Item Processor进行业务处理和数据转换,最后通过Item Writer写到数据库中去。

Spring Batch为我们提供了许多开箱即用的Reader和Writer,非常方便。

6. Job的创建和使用(代码如下)

package com.bosc.springbatch.config;@Configuration
@EnableBatchProcessing
public class JobDemo {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;//创建任务对象@Beanpublic Job jobDemoJob(){return jobBuilderFactory.get("jobDemoJob")//.start(step1())//next()指定下一个step,默认先执行step1再执行step2依次//.next(step2())//.next(step3()).start(step1())//on("COMPLETED"<结束step1>)用来指定一个条件.on("COMPLETED")//to(到达step2()).to(step2())//成功执行step2满足才会结束.from(step2()).on("COMPLETED").to(step3())//fail()表示step2执行失败step3是不能执行的/*.from(step2()).on("COMPLETED").fail()*///stopAndRestart停止并重新启动 一般用于测试/*.from(step2()).on("COMPLETED").stopAndRestart(step2())*///步骤执行完end().from(step3()).end().build();}@Beanpublic Step step1() {return stepBuilderFactory.get("step1")//step具体实现功能.tasklet(new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {//执行step1功能System.out.println("step1");return RepeatStatus.FINISHED;}//正常结束才会执行下一个}).build();}@Beanpublic Step step2() {return stepBuilderFactory.get("step2")//step具体实现功能.tasklet(new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {//执行step1功能System.out.println("step2");return RepeatStatus.FINISHED;}//正常结束才会执行下一个}).build();}@Beanpublic Step step3() {return stepBuilderFactory.get("step3")//step具体实现功能.tasklet(new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {//执行step1功能System.out.println("step3");return RepeatStatus.FINISHED;}//正常结束才会执行}).build();}
}

7.Flow的创建和使用

  • Flow是多个Step的集合
  • 可以被多个Job复用
  • 使用FlowBulider来创建

7.1创建FlowDemo测试一下

public class FlowDemo {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;//创建若干个step@Beanpublic Step flowDemoStep1(){return stepBuilderFactory.get("flowDemoStep1").tasklet(new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {System.out.println("flowDemoStep1");return RepeatStatus.FINISHED;}}).build();}@Beanpublic Step flowDemoStep2(){return stepBuilderFactory.get("flowDemoStep1").tasklet(new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {System.out.println("flowDemoStep2");return RepeatStatus.FINISHED;}}).build();}@Beanpublic Step flowDemoStep3(){return stepBuilderFactory.get("flowDemoStep1").tasklet(new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {System.out.println("flowDemoStep3");return RepeatStatus.FINISHED;}}).build();}//创建Flow对象,指明Flow对象包含哪些Step@Beanpublic Flow flowDemoFlow(){//FlowBuilder创建Flow对象return new FlowBuilder<Flow>("FlowDemoFlow")//由多个Step构成.start(flowDemoStep1()).next(flowDemoStep2()).build();}//创建Job对象@Beanpublic Job flowDemoJob(){return jobBuilderFactory.get("flowDemoJob")//可以接受Step、Flow.start(flowDemoFlow()).next(flowDemoStep3()).end().build();}
}

8.split实现并发执行

实现任务中的多个step或多个flow并发执行

1、创建若干个step
2、创建两个flow
3、创建一个任务包含以上两个flow,并让这两个flow并发执行

8.1借助split创建SplitDemo 测试

public class SplitDemo {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;//创建3个step,然后放到2个flow当中,创建作业,作用中包含2个flow,在作业中实现这2个flow的并发执行@Beanpublic Step splitDemoStep1(){return stepBuilderFactory.get("splitDemoStep1").tasklet(new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {System.out.println("splitDemoStep1");return RepeatStatus.FINISHED;}}).build();}@Beanpublic Step splitDemoStep2(){return stepBuilderFactory.get("splitDemoStep2").tasklet(new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {System.out.println("splitDemoStep2");return RepeatStatus.FINISHED;}}).build();}@Beanpublic Step splitDemoStep3(){return stepBuilderFactory.get("splitDemoStep3").tasklet(new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {System.out.println("splitDemoStep3");return RepeatStatus.FINISHED;}}).build();}//创建Flow@Beanpublic Flow splitDemoFlow1(){return new FlowBuilder<Flow>("splitDemoFlow1").start(splitDemoStep1()).build();}@Beanpublic Flow splitDemoFlow2(){return new FlowBuilder<Flow>("splitDemoFlow2").start(splitDemoStep2()).next(splitDemoStep3()).build();}//创建任务@Beanpublic Job splitDemoJob(){return jobBuilderFactory.get("splitDemoJob").start(splitDemoFlow1())//split()实现并发.split(new SimpleAsyncTaskExecutor())//指明splitDemoFlow2.add(splitDemoFlow2()).end().build();}
}

9. 决策器的使用

  • 接口: JobExecutionDecider

9.1创建决策器

//决策器 实现接口
public class MyDecider implements JobExecutionDecider {//定义一个count成员private int count;@Overridepublic FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {//每次count余数为0返回一个FlowExecutionStatus设结果count++;if (count%2==0)//返回偶数return new FlowExecutionStatus("even");else//返回偶数return new FlowExecutionStatus("odd");}
}

9.2借助决策器创建DeciderDemo测试

public class DeciderDemo {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;//创建Step@Beanpublic Step deciderDemoStep1(){return stepBuilderFactory.get("deciderDemoStep1").tasklet((stepContribution, chunkContext) -> {System.out.println("deciderDemoStep1");return RepeatStatus.FINISHED;}).build();}@Beanpublic Step deciderDemoStep2(){return stepBuilderFactory.get("deciderDemoStep2").tasklet((stepContribution, chunkContext) -> {System.out.println("even");return RepeatStatus.FINISHED;}).build();}@Beanpublic Step deciderDemoStep3(){return stepBuilderFactory.get("deciderDemoStep3").tasklet((stepContribution, chunkContext) -> {System.out.println("odd");return RepeatStatus.FINISHED;}).build();}//创建决策器@Bean//调用JobExecutionDecider接口public JobExecutionDecider myDecider(){//对应的类return new MyDecider();}//创建任务@Beanpublic Job deciderDemoJob(){return jobBuilderFactory.get("deciderDemoJob").start(deciderDemoStep1())//获取决策器对象.next(myDecider())//如果返回even执行step2.from(myDecider()).on("even").to(deciderDemoStep2()).from(myDecider()).on("odd").to(deciderDemoStep3())//无论返回什么再返回到决策器继续执行.from(deciderDemoStep3()).on("*").to(myDecider()).end().build();}
}

10、Job嵌套

一个Job可以嵌套在另一个Job中,被嵌套的Job为子Job,外部Job称为父Job。子Job不能单独执行,需要由父Job来启动

10.1 案例:创建两个Job,作为子Job

public class ChildJob1 {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Beanpublic Step childJobStep1(){return stepBuilderFactory.get("childJobStep1").tasklet(new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {System.out.println("childJobStep1");return RepeatStatus.FINISHED;}}).build();}@Beanpublic Job childJobOne(){return jobBuilderFactory.get("childJobOne").start(childJobStep1()).build();}
}
@Configuration
public class ChildJob2 {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Beanpublic Step childJob2Step1(){return stepBuilderFactory.get("childJob2Step1").tasklet(new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {System.out.println("childJob2Step1");return RepeatStatus.FINISHED;}}).build();}@Beanpublic Step childJob2Step2(){return stepBuilderFactory.get("childJob2Step2").tasklet(new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {System.out.println("childJob2Step2");return RepeatStatus.FINISHED;}}).build();}@Beanpublic Job childJobTwo(){return jobBuilderFactory.get("childJobOne").start(childJob2Step1()).next(childJob2Step2()).build();}
}

10.2 再创建一个Job作为父Job

//父Job
@Configuration
public class NestedDemo {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate Job childJobOne;@Autowiredprivate Job childJobTwo;//注入启动对象@Autowiredprivate JobLauncher launcher;@Beanpublic Job parentJob(JobRepository jobRepository, PlatformTransactionManager transactionManager){return jobBuilderFactory.get("parentJob").start(childJob1(jobRepository,transactionManager)).next(childJob2(jobRepository,transactionManager)).build();}//返回的是Job类型的Step,特殊的stepprivate Step childJob1(JobRepository jobRepository, PlatformTransactionManager transactionManager ) {return new JobStepBuilder(new StepBuilder("childJob1"))//指明对应的Job对象.job(childJobOne)//启动对象.launcher(launcher)//使用启动父Job的启动对象//指明持久化存储对象.repository(jobRepository)//事务管理器.transactionManager(transactionManager).build();}private Step childJob2(JobRepository jobRepository, PlatformTransactionManager transactionManager) {return new JobStepBuilder(new StepBuilder("childJob1"))//指明对应的Job对象.job(childJobTwo)//启动对象.launcher(launcher)//使用启动父Job的启动对象//指明持久化存储对象.repository(jobRepository)//事务管理器.transactionManager(transactionManager).build();}
}

10.3 启动前配置一下yml指定父类

spring:batch:job: #指定父Jobnames: parentJob

11.监听器的使用

监听器用来监听批处理作业的执行情况,创建监听可以通过实现接口或使用注解

  • JobExecutionListener(before,after)
  • StepExecutionListener(before,after)
  • ChunkExecutionListener(before,after,error)
  • ItemReadListener,ItemProcessListener,ItemWriteListener(before,after,error)

11.1接口实现监听器

public class MyJobListener implements JobExecutionListener {@Overridepublic void beforeJob(JobExecution jobExecution) {System.out.println(jobExecution.getJobInstance().getJobName()+"before..");}@Overridepublic void afterJob(JobExecution jobExecution) {System.out.println(jobExecution.getJobInstance().getJobName()+"after..");}
}

11.2 注解实现监听器

public class MYChunkListener {//执行之前执行的方法@BeforeChunkpublic void beforeChunk(ChunkContext context){System.out.println(context.getStepContext().getJobName()+"before...");}@AfterChunkpublic void afterChunk(ChunkContext context){System.out.println(context.getStepContext().getStepName()+"after...");}
}

11.3 借助监听器类创建ListenerDemo测试

@Configuration
public class ListenerDemo {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Beanpublic Job listenerJob(){return jobBuilderFactory.get("listenerJob").start(step1())//创建一个监听对象.listener(new MyJobListener()).build();}@Beanpublic Step step1() {return stepBuilderFactory.get("step1")//实现数据的读取,完整的读完chunk指定的值再进行数据的输出处理 可read,process,write.<String,String>chunk(2)//调用容错.faultTolerant().listener(new MYChunkListener())//读数据.reader(read())//写数据.writer(write()).build();}//读取字符串类型@Beanpublic ItemReader<String> read() {return new ListItemReader<>(Arrays.asList("1234","js","vue"));}//写入字符串类型@Beanpublic ItemWriter<String> write() {return new ItemWriter<String>() {//每次读的数据以集合传递给writer@Overridepublic void write(List<? extends String> list) throws Exception {for (String item:list){System.out.println(item);}}};}
}

12.Job的参数

在Job运行时可以key=value形式参数

12.1、案例:

public class ParametersDemo implements StepExecutionListener {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;private Map<String, JobParameter> parameters;@Beanpublic Job ParameterJob() {return jobBuilderFactory.get("ParameterJob").start(ParameterStep()).build();}//Job执行的是step,Job使用的数据肯定是在step中使用//那我们只需要给step传递数据,如何给step传递参数?//使用监听,使用Step级别的监听来传递参数@Beanprivate Step ParameterStep() {return stepBuilderFactory.get("ParameterStep").listener(this).tasklet(new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {//输出接收到的参数的值System.out.println(parameters.get("info"));return RepeatStatus.FINISHED;}}).build();}@Overridepublic void beforeStep(StepExecution stepExecution) {//获取到任务的参数,传递的参数赋值给parameters;parameters = stepExecution.getJobParameters().getParameters();}@Overridepublic ExitStatus afterStep(StepExecution stepExecution) {return null;}

13、ItemReader概述

  • ItemReader:提供数据的接口

  • 在这个接口中只有一个方法read(),读取数据时是以一条数据为单位,每条数据是一个item的循环读取,它读取一个数据并且移动到下一个数据上去,在读取结束时必须返回一个null,否则表明数据没有读取完毕

13.1、案例

@Configuration
public class ItemReaderDemo {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Beanpublic Job itemReaderDemoJob(){return jobBuilderFactory.get("itemReaderDemoJob").start(itemReaderDemoStep()).build();}@Beanpublic Step itemReaderDemoStep() {return stepBuilderFactory.get("itemReaderDemoStep")//读完两个进行输出处理.<String,String>chunk(2).reader(itemReaderDemoRead()).writer(list -> {for (String item:list){System.out.println(item+"...");}}).build();}//ItemReader用于数据读取@Beanpublic MyReader itemReaderDemoRead() {List<String> data= Arrays.asList("cat","dog","pig","duck");return new MyReader(data);}}
public class MyReader implements ItemReader<String>{private Iterator<String> iterator;//集合迭代器public MyReader(List<String> list){this.iterator = list.iterator();}@Overridepublic String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {//一个数据一个数据的读,如果还有下一个数据返回,否则返回空if (iterator.hasNext()){return this.iterator.next();}elsereturn null;}
}

14、从数据中读取数据

  • JdbcPagingItemReader是ItemReader的一个子类,从数据库中分页读取

14.1案列

  • 创建一个User
public class User {private Integer id;private String username;private String password;private Integer age;public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}public Integer getAge() {return age;}public void setAge(Integer age) {this.age = age;}@Overridepublic String toString() {return "User{" +"id=" + id +", username='" + username + '\'' +", password='" + password + '\'' +", age='" + age + '\'' +'}';}
}
  • 创建Job、Step
@Configuration
public class ItemReaderDemodb {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate DataSource dataSource;@Autowired@Qualifier("dbJdbcWriter")private ItemWriter<? super User> dbJdbcWriter;@Beanpublic Job itemReaderDbJOb(){return jobBuilderFactory.get("itemReaderDbJOb").start(itemReaderDbStep()).build();}@Beanpublic Step itemReaderDbStep() {return stepBuilderFactory.get("itemReaderDbStep").<User,User>chunk(2).reader(dbJdbcReader()).writer(dbJdbcWriter).build();}@Bean@StepScope//只限于step范围之内public JdbcPagingItemReader<User> dbJdbcReader() {JdbcPagingItemReader<User> reader = new JdbcPagingItemReader<>();//指明数据源reader.setDataSource(dataSource);//设置每次取多少数据reader.setFetchSize(2);//把读取的记录转换成User对象reader.setRowMapper(new RowMapper<User>() {@Overridepublic User mapRow(ResultSet rs, int rowNum) throws SQLException {User user = new User();user.setId(rs.getInt(1));user.setUsername(rs.getString(2));user.setPassword(rs.getString(3));user.setAge(rs.getInt(4));return user;}});//指定sql语句MySqlPagingQueryProvider provider = new MySqlPagingQueryProvider();//指明查询字段provider.setSelectClause("id,username,password,age");provider.setFromClause("from user");//指定根据那个字段进行排序Map<String, Order> sort = new HashMap<>(1);sort.put("id",Order.ASCENDING);provider.setSortKeys(sort);reader.setQueryProvider(provider);return reader;}
}
  • 实现ItemWriter接口
@Component("dbJdbcWriter")
public class DbJdbcWriter implements ItemWriter<User> {@Overridepublic void write(List<? extends User> list) throws Exception {for (User user:list){System.out.println(user);}}
}

15.从普通文件中读取数据

  • 使用ItemReader的子类FlatFileItemReader来实现数据的读取

15.1 案例

  • 在resources下新建conmter.txt格式如下
id,firstName,lastNmae,birthday
1,Stone,Brrett,1964-10-19 14:11:03
2,Raymond,Pace,1977-12-11 21:44:20
  • 创建Customer实体类
package cn.bosc.itemreaderfile;public class Customer {private Long id;private String firstName;private String lastName;private String birthday;public Long getId() {return id;}public void setId(Long id) {this.id = id;}public String getFirstName() {return firstName;}public void setFirstName(String firstName) {this.firstName = firstName;}public String getLastName() {return lastName;}public void setLastName(String lastName) {this.lastName = lastName;}public String getBirthday() {return birthday;}public void setBirthday(String birthday) {this.birthday = birthday;}@Overridepublic String toString() {return "Customer{" +"id=" + id +", firstName='" + firstName + '\'' +", lastName='" + lastName + '\'' +", birthday='" + birthday + '\'' +'}';}
}
  • 创建FileItemReaderDemo
@Configuration
public class FileItemReaderDemo {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowired@Qualifier("fileFileWriter")private ItemWriter<? super Customer> fileFileWriter;@Beanpublic Job FileItemReaderDemoJob(){return jobBuilderFactory.get("FileItemReaderDemoJob").start(FileItemReaderDemoStep()).build();}@Beanpublic Step FileItemReaderDemoStep() {return stepBuilderFactory.get("FileItemReaderDemoStep").<Customer,Customer>chunk(100).reader(flatFileReader()).writer(fileFileWriter).build();
}@Bean@StepScopepublic FlatFileItemReader<Customer> flatFileReader() {FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();reader.setResource(new ClassPathResource("conmter.txt.txt"));reader.setLinesToSkip(1);//跳过第一行;//解析数据DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();tokenizer.setNames(new String[]{"id","firstName","lastNmae","birthday"});//把解析出的一行数据映射为conmter对象DefaultLineMapper<Customer> mapper = new DefaultLineMapper<>();mapper.setLineTokenizer(tokenizer);mapper.setFieldSetMapper(fieldSet -> {Customer customer = new Customer();customer.setId(fieldSet.readLong("id"));customer.setFirstName(fieldSet.readString("firstName"));customer.setBirthday(fieldSet.readString("birthday"));customer.setLastName(fieldSet.readString("lastNmae"));return customer;});mapper.afterPropertiesSet();reader.setLineMapper(mapper);return reader;}
}
  • 创建FlatFileWriter调用ItemWriter
package cn.bosc.itemreaderfile;import org.springframework.batch.item.ItemWriter;
import org.springframework.stereotype.Component;import java.util.List;@Component("fileFileWriter")
public class FlatFileWriter implements ItemWriter<Customer> {@Overridepublic void write(List<? extends Customer> item) throws Exception {for (Customer customer : item) {System.out.println(customer);}}
}

16.从xml文件中读取数据

  • 使用ItemReader的子类StaxEventItemReader来实现xml的读取

16.1案例

  • 添加依赖
<!-- xstream xml start --><dependency><groupId>com.thoughtworks.xstream</groupId><artifactId>xstream</artifactId><version>1.4.10</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-oxm</artifactId><version>4.3.7.RELEASE</version></dependency><!-- xstream xml end -->
  • 在resources下创建xml文件
<?xml version="1.0" encoding="utf-8" ?>
<customers><customer><id>1</id><firstName>mufu</firstName><lastName>Mad</lastName><birthday>2017-06-09 19:30:40PM</birthday></customer><customer><id>2</id><firstName>mufu</firstName><lastName>Mad</lastName><birthday>2017-06-09 19:30:40PM</birthday></customer><customer><id>3</id><firstName>mufu</firstName><lastName>Mad</lastName><birthday>2017-06-09 19:30:40PM</birthday></customer><customer><id>4</id><firstName>mufu</firstName><lastName>Mad</lastName><birthday>2017-06-09 19:30:40PM</birthday></customer>
</customers>
  • 创建ItemReaderXmlDemo
@Configuration
public class ItemReaderXmlDemo {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowired@Qualifier("xmlFileWriter")ItemWriter<? super Customer> xmlFileWriter;@Beanpublic Job ItemReaderXmlDemoJob(){return jobBuilderFactory.get("ItemReaderXmlDemoJob").start(ItemReaderXmlDemoStep()).build();}@Beanpublic Step ItemReaderXmlDemoStep() {return stepBuilderFactory.get("ItemReaderXmlDemoStep").<Customer,Customer>chunk(10).reader(xmlFileReader()).writer(xmlFileWriter).build();}@Bean@StepScope//范围public StaxEventItemReader<Customer> xmlFileReader() {//调用Customer实体类StaxEventItemReader<Customer> reader = new StaxEventItemReader<>();reader.setResource(new ClassPathResource("customer.xml"));//指定需要处理的根标签reader.setFragmentRootElementName("customer");//把xml转成对象XStreamMarshaller xMarshaller = new XStreamMarshaller();Map<String,Class> map = new HashMap<>();map.put("customer",Customer.class);xMarshaller.setAliases(map);reader.setUnmarshaller(xMarshaller);return reader;}}
@Component("xmlFileWriter")
class XmlFileWriter implements ItemWriter<Customer> {@Overridepublic void write(List<? extends Customer> item) throws Exception {for (Customer customer : item) {System.out.println(customer);}}
}

17.从多个文件中读取数据

  • 使用ItemReader的子类MultiResourceItemReader来实现

17.1案例

  • 创建多个文件方便实现
  • 创建MultiResourceItemReaderDemo
@Configuration
public class MultiResourceItemReaderDemo {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Value("classpath:/conmter*.txt")private Resource[] fileResources;@Autowired@Qualifier("multiFileWriter")private ItemWriter<? super Customer> multiFileWriter;@Beanpublic Job MultiResourceItemReaderDemoJob(){return jobBuilderFactory.get("MultiResourceItemReaderDemoJob").start(MultiResourceItemReaderDemoStep()).build();}@Beanpublic Step MultiResourceItemReaderDemoStep() {return stepBuilderFactory.get("MultiResourceItemReaderDemoStep").<Customer,Customer>chunk(10).reader(multiFileReader()).writer(multiFileWriter).build();}@Bean@StepScopepublic MultiResourceItemReader<Customer> multiFileReader() {MultiResourceItemReader<Customer> reader = new MultiResourceItemReader<>();reader.setDelegate(flatFileReader());reader.setResources(fileResources);return reader;}//调用单个文件读取数据@Bean@StepScopepublic FlatFileItemReader<Customer> flatFileReader() {FlatFileItemReader<Customer> reader = new FlatFileItemReader<Customer>();reader.setResource(new ClassPathResource("conmter.txt"));reader.setLinesToSkip(1);//跳过第一行;//解析数据DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();tokenizer.setNames(new String[]{"id","firstName","lastNmae","birthday"});//把解析出的一行数据映射为conmter对象DefaultLineMapper<Customer> mapper = new DefaultLineMapper<>();mapper.setLineTokenizer(tokenizer);mapper.setFieldSetMapper(new FieldSetMapper<Customer>() {@Overridepublic Customer mapFieldSet(FieldSet fieldSet) throws BindException {Customer customer = new Customer();customer.setId(fieldSet.readLong("id"));customer.setFirstName(fieldSet.readString("firstName"));customer.setBirthday(fieldSet.readString("birthday"));customer.setLastName(fieldSet.readString("lastNmae"));return customer;}});mapper.afterPropertiesSet();reader.setLineMapper(mapper);return reader;}}
@Component("multiFileWriter")
class MultiFileWriter implements ItemWriter<Customer>{@Overridepublic void write(List<? extends Customer> item) throws Exception {for (Customer customer : item) {System.out.println(customer);}}
}

18. 数据输出 ItemWriter概述

  • ItemReader是一个数据一个数据的读,而ItemWiter是一批一批的输出【chunk()限制】提高批处理性能

18.1案例

  • 创建实现类
@Component("MyWriter")
public class MyWriter implements ItemWriter<String> {@Overridepublic void write(List<? extends String> list) throws Exception {System.out.println(list.size());for (String s : list) {System.out.println(s);}}
}
  • 调用MyWriter
@Configuration
public class ItemWriterDemo {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowired@Qualifier("MyWriter")private ItemWriter<String> MyWriter;@Beanpublic Job ItemWriterDemoJob(){return jobBuilderFactory.get("ItemWriterDemoJob").start(ItemWriterDemoStep()).build();}@Beanpublic Step ItemWriterDemoStep() {return stepBuilderFactory.get("ItemWriterDemoStep").<String,String> chunk(2).reader(myRead()).writer(MyWriter).build();}@Beanpublic ItemReader<String> myRead() {List<String> items  = new ArrayList<>();for (int i = 0; i < 50; i++) {items.add("java"+i);}return new ListItemReader<String>(items);};}

未更完。。。。。

SpringBatch批处理实战教程相关推荐

  1. RabbitMQ实战教程

    RabbitMQ实战教程 1.什么是RabbitMQ 1.1 MQ(Message Queue)消息队列 1.1.1 异步处理 1.1.2 应用解耦 1.1.3 流量削峰 1.2 背景知识介绍 1.2 ...

  2. 2021 PyTorch官方实战教程(一)Tensor 详解

    点击上方"AI算法与图像处理",选择加"星标"或"置顶"重磅干货,第一时间送达 这个系列时pytorch官方实战教程,后续会继续更新.. 一 ...

  3. [转载]批处理入门教程

    [转载]批处理入门教程 写这篇教程的目的,是让每一个看过这些文字的朋友记住一句话:如果爱可以让事情变的更简单,那么就让它简单吧!看这篇教程的方法 脚本之家补充说明:批处理相对来说是比较简单的语言,大家 ...

  4. Vbs与批处理高级教程

    Vbs 与批处理高级教程 Vbs 脚本编程简明教程之一 -为什么要使用 Vbs ? 在 Windows 中,学习计算机操作也许很简单,但是很多计算机工作是重复性劳动,例如你每周也许需要对一些计算机文件 ...

  5. PyTorch 高级实战教程:基于 BI-LSTM CRF 实现命名实体识别和中文分词

    20210607 https://blog.csdn.net/u011828281/article/details/81171066 前言:译者实测 PyTorch 代码非常简洁易懂,只需要将中文分词 ...

  6. ArcGIS水文分析实战教程(9)雨量计算与流量统计

    ArcGIS水文分析实战教程(9)雨量计算与流量统计 本章导读:降水是水文循环中重要的一环,降水包括雨.雪.雾.露.雹等,本章介绍的是降雨的环节.通过雨量站与插值的方式,实现雨量的空间分布就算,为水文 ...

  7. 宏基因组分析实战教程1. 背景知识

    上次我写的学习经验和推荐的教程--<微生物组入门必读+宏基因组实操课程=新老司机赶快上车>,小伙伴们当天阅读破2700+人次,3.5天破3000+,达到了宏基因组快车满三千人发车的要求.我 ...

  8. js模板字符串自定义类名_【Vue.js 入门到实战教程】07Vue 组件注册 | 基本使用和组件嵌套...

    来源 | https://xueyuanjun.com/post/21929除了前面介绍的基本语法之外,Vue.js 还支持通过组件构建复杂的功能模块,组件可以称得上是 Vue.js 的灵魂,是 Vu ...

  9. DOS批处理高级教程:第三章 FOR命令中的变量(转)

    DOS批处理高级教程:第一章 批处理基础 DOS批处理高级教程:第二章 DOS循环for命令详解 DOS批处理高级教程:第三章 for命令中的变量 DOS批处理高级教程:第四章 批处理中的变量 DOS ...

最新文章

  1. 硬件厂商纷纷“变软”:FPGA行业巨头Xilinx推出Vitis AI平台,并在GitHub上开源
  2. [web安全]深入理解反射式dll注入技术
  3. C++通过hiredis连接到redis
  4. adroid 如何测试端口号_多进程启动设备和appium实现自动化测试
  5. 下列哪个适合做链栈_朋友圈人格图鉴:三天可见 vs 全部可见,哪个更适合做恋人?...
  6. 下个十年的 C 位:物联网趋势大剧透
  7. View绘制--onMeasure() 、onLayout()
  8. 【LeetCode】贪心算法--分发糖果(135)
  9. 算法眼中的世界是什么样子?他们用一些彩色方块画了出来
  10. .net winform 的 OnKeyDown 与 方向键
  11. 计算机代码坑人小程序bat,运用bat写的整人小程序有哪些?
  12. Linux驱动之设备树(设备树下的LED驱动实验)
  13. SpringBoot-SSMP超详细整合案例
  14. 时序分析基本概念介绍ILM
  15. 【数据结构】字符串 模式匹配算法的理解与实现 Brute Force算法(BF算法)与KMP算法 (C与C++分别实现)
  16. PTA 单链表分段逆转 (12 分)
  17. 洛谷P3647 [APIO2014] 连珠线 题解
  18. Java多线程--深入浅出Java多线程
  19. python办公自动化:让PyAutoGUI来帮你干活---实践版
  20. PyCharm+Anaconda配置OpenCV4.4和PyQt5

热门文章

  1. matlab学习之Bode图
  2. 【沁恒WCH CH32V307V-R1的单线半双工模式串口通讯】
  3. 1542. 找出最长的超赞子字符串 哈希+状态压缩
  4. 计算机动画制作第一节教案,冀教版八年级信息技术1《电脑动画制作初探》教案教学设计...
  5. 人物动画计算机课教学反思,《走进动画》的教学反思
  6. Ubuntu cuda torch安装
  7. 如何生成手机HLS Player(M3U8Player)观看电视频道直播
  8. 瘦客户端与胖客户端的理解
  9. 常见问题解决 V8.3.5升级出现未知错误
  10. xshell连接阿里云服务器