前言

最近项目要做聚合支付,聚合支付顾名思义就是将市面上常用的三方支付进行聚合,这样开发者只需要对接我们一方就可以同时对接支付宝、微信支付等其他第三方支付平台,省去了挨个平台对接调试的时间,既然支付给人家聚合了,那么必然要有个配套的功能那就是聚合账单,如果要做聚合账单就意味着需要分别从不同的三方渠道获取到商户的账单数据,再所有渠道该商户的对账单数据聚合成一份账单并进行加工成统一的格式方便接入方进行对账,这个功能还是能很好的戳中接入方财务人员的痛点的,如果没有聚合对账,那么意味着财务需要对所有平台的账单进行分别下载再分别对账;

好了,说了那么多,我们这里要解决的问题就是批量将下载下来的商户账单进行加载,加载后统一处理后再输出聚合账单,这就涉及到一个问题,需要有个批量系统进行对数据进行处理,下面就开始我们今天的主角Spring Batch。

1. 为什么要用Spring Batch

为什么要用Spring Batch?回答这个问题那就要先想明白,我们遇到了什么问题,在前面说了,我们需要做一个账单批量加工的功能,这里就需要一个框架有以下功能:

1)能够定义不同job之间的依赖关系,这样可以进行不同job的顺序调整了和依赖关系设定;

2)能够定义job中的步骤,并且能够对每个步骤的输入、输出数据进行操作;

3)能够支持对多种数据源进行操作,不但能批量操作文件,也能够对数据库进行批量操作;

4)对于批量异常数据能够有接口进行自定义处理;

5)支持对job进行重跑;

6)支持对job失败后自动重试等容错处理机制;

7)支持对每个job的输入数据源进行校验;

上面罗列了下实现一个批量账单加工功能需要有上面这些基础功能,做过批量的同学一看这些基础功能,就会发出感叹,这不就是一个批量框架最基础的功能吗?但是目前opensource社区,确实这类批量框架很少,之前在国有银行里面,我们用的框架是自研框架,因为市面上不管是收费的还是开源的批量产品都很难满足我们的功能和性能方面的要求,而且据我了解,大部分有大型批量系统的功能用的批量框架也都是自研框架,很少用现成的产品或者开源的批量产品,因为笔者目前所在的银行规模很小,根本没有自研的能力,核心系统、数仓这些系统也都是买来的,用的批量产品也没法直接拿来用,既然这样就干脆找个开源的批量框架来解决问题,对比后最后选择的是Spring Batch,一方面是他拥有上面提到的所有功能,另一方面,新版本的SpringBatch可以很好的和SpringBoot结合起来,方便后面迁移到Cloud上。

2. Spring Batch架构

下面先介绍下SpringBatch框架里用到的名词,然后再介绍下他们之间的关系。

Job: 是 Spring Batch 的核心概念,它包含了批处理的所有操作;

Step:每个 Job 由一个或多个 Step 组成,每个Step中涉及到ItemReader、ItemProcessor、ItemWriter,这三个接口顾名思义,一个负责数据源读取、一个负责业务逻辑处理、一个负责处理后的数据输出;

jobRepository:定义 Job 时,需要指定一个 JobRepository,用来存储 Job 在运行过程中的状态信息,为什么要存储状态信息呢?因为如果 Job 失败了,Spring 支持从失败的地方重新运行,而不是从头开始。

JobLauncher:很好理解launcher是用来执行Job的,如果不设置,系统也会默认给Job配置一个默认Launcher;

Screenshot 2017-11-28 17.08.20.png

Spring Batch 架构主要分为三类高级组件 : 应 层(Application), 核心层(Core) ,基础架构层(Infrastructure)

应 层层(Application)包括开发人员用SpringBatch编写的所有批处理作业和自定义代码;

Batch (Batch Core) 包含加载和控制批处理作业所必须的核心类,包括JobLaunch,Job,Step的实现;

应 层(Application) 与核心 (Core) 都是构建在基础架构层之上,基础架构层包含readers(ItemReader) writers(ItemWriter), services ( 如重试模块RetryTemplate), 可以被应用层和核心层所使用。

Screenshot 2017-11-28 16.56.05.png

介绍了SpringBatch的整体架构和组件后,下面介绍下开发者要做的内容,借助于SpringBatch已经实现了大部分的基础功能,作为开发者要做的除了配置每个Step之间的依赖关系和Job的一些参数就是开发每一步的Step,Step是一个独立封装域对象,包含了所有定义和控制实际处理信息批量任务的序列。每一个Step都是开发者自己编写的,一个Step的简单或复杂完全取决于开发者,可以把一个大的Step拆成很多个,也可以在一个Step中实现,完全看开发者的意愿。

所有的批处理都可以描述为最简单的过程:读取大量数据,执行自定义的计算或者转换,写出处理结果,SpringBatch提供了三个主要接口来执行大量数据的读取、处理与写出:ItemReader、ItemProcessor、ItemWriter。

Screenshot 2017-11-28 17.27.33.png

ItemReader

ItemReader就是一种从各个数据源读取数据,然后提供给后续步骤 使用的接口,目前SpringBatch已经给我们实现了3种常用格式的处理:

1)Flat平面纯文本处理,FlatFileItemReader类实现了从纯文本文件中读取一行数据,目前支持三种格式处理:定长字符串处理、分隔符字符串处理、正则表达式字符串处理,这三种处理基本能够满足我们常见需求了,而且常见的批量数据也都是格式化的纯文本;

2)XML,XMLItemReader类提供了解析、验证、映射数据的功能,能够对XML进行处理,同时可以根据XSD schema验证格式信息是否正确;

3)数据库,SQLItemReader类实现了通过JDBC查询出数据集,然后进行数据处理;

如果上面提供的三种都不能满足要求,还可以自己去实现IteamReader接口,来完成从字符串到实体对象的转换:

package org.springframework.batch.item;

public interface ItemReader {

T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;

}

ItemWriter

ItemWriter功能上类似ItemReader的反向操作,资源任需要定位、打开和关闭,区别ItemWriter执行的是写入操作,而不是读取;

框架同样也实现了类似Reader的常用Writer类,FlatFileItemWriter、XMLItemWriter、SQLItemWriter,具体使用方法可以参考JavaDoc这里就不一一详解了。如果这几个常用Writer类满足不了你的需求那么你也可以继承ItemWriter自己去实现Writer类:

package org.springframework.batch.item;

import java.util.List;

public interface ItemWriter {

void write(List extends T> var1) throws Exception;

}

自定义Writer类的话需要注意下,这里需要实现的write方法,入参是一个范型list,而ItemReader的read方法是处理一条字符串,返回一个范型对象。

ItemProcessor

ItemProcessor顾名思义就是数据的处理类,这个类系统没有实现类,因为是否需要对数据进行处理,对数据如何处理都是开发者自己来决定的,所以这里框架只是提供了接口,让大家去实现ItemProcessor接口中的process方法。

package org.springframework.batch.item;

public interface ItemProcessor {

O process(I var1) throws Exception;

}

从接口我们可以看到ItemProcessor是一个双范型接口,需要设置输入和输出类型,第一个类型为我们ItemReader的输出类型,第二个类型为ItemWriter的输入类型也就是process方法按照开发者的意愿处理后输出的类型。

3. Spring Batch批量加载支付宝账单

上面我们大概说了下SpringBatch用到的架构和主要的类,因为框架的api还是比较多的,全部介绍一遍估计大家也不愿意看,下面我们直接来实战,实现一个功能,大家就会很快上手SpringBatch了,这里我们实现到指定目录读取支付宝和微信的账单并处理后输出到指定的文件,这里我们没有使用数据库,如果要使用数据库只需要设置writer里的datasource并使用SQL相关的writer类就可以了,这里不能贴出生产代码所以只是个简单的demo,整个例子并不能直接用到生产,这里仅仅是举个例子让大家能够熟悉SpringBatch,实现的功能还远远达不到生产的要求。

实体类AlipayTranDO

定义的阿里账单实体类,省略了setter、getter方法。

public class AlipayTranDO {

private String tranId;

private String channel;

private String tranType;

private String counterparty;

private String goods;

private String amount;

private String isDebitCredit;

private String state;

...

}

实体类HopPayTranDO

需要转化的目标账单记录类。

public class HopPayTranDO {

private String tranId;

private String channel;

private String tranType;

private String counterparty;

private String goods;

private String amount;

private String isDebitCredit;

private String state;

private String tranDate;

private String merId;

...

}

支付宝账单Reader类

Reader类中核心实现是通过FlatFileItemReader来处理支付宝账单单条文本记录,csv的文本记录包含了8个字段,分别通过逗号分隔,然后与AlipayTranDO类属性字段映射,必须保证同名;通过setLineTokenizer()设置行分割方式和分割字段;通过setFieldSetMapper()方法设置读取字段映射的类为AlipayTranDO.class,整体过程还是比较简单的;

在Reader类中还有一个getMultiAliReader()方法,该方法是获取多个文件作为Resource,让上面定义的FlatFileItemReader去对每个文件的每条记录单独处理,网上绝大多数的例子都是只处理一个文件,实际使用过程中不可能只处理一个批量文件,所以例子中我引入了MultiResourceItemReader类,该类是SpringBatch中用于处理多文件的情况,通过给MultiResourceItemReader设置Resource数组,并通过setDelegate设置处理单文件单Item的类实例,最后将该多文件读取的ItemReader配置在Job中即可实现多文件的读取功能。

public class AlipayFileItemReader {

private FlatFileItemReader reader;

public FlatFileItemReader getAlipayFileItemReader() {

reader = new FlatFileItemReader();

reader.setLineMapper(new DefaultLineMapper() {{

setLineTokenizer(new DelimitedLineTokenizer() {{

setNames(new String[] { "tranId", "channel", "tranType", "counterparty", "goods", "amount", "isDebitCredit", "state" });

}});

setFieldSetMapper(new BeanWrapperFieldSetMapper() {{

setTargetType(AlipayTranDO.class);

}});

}});

reader.setLinesToSkip(5);

return reader;

}

public MultiResourceItemReader getMultiAliReader() {

//TODO: 获取所有当天待加载的支付宝账单,

// 这里只是简单的放了两个csv账单文件,

// 实际处理过程中,肯定是从数据库或者接口获取需要加载的账单文件路径

MultiResourceItemReader reader = new MultiResourceItemReader();

Resource[] files = new Resource[]{new FileSystemResource("data/alipay/208012345_20141030.csv"),

new FileSystemResource("data/alipay/208054321_20141030.csv")};

reader.setResources(files);

reader.setDelegate(this.getAlipayFileItemReader());

return reader;

}

AlipayItemProcessor支付宝账单处理转换类

因为是例子,所以processor的处理过程比较简单,只是将从支付宝账单csv中读取的字段赋值给自己定义的内部聚合账单实体类,并多加工了两个字段,在实际的使用过程中大家可以按照自己系统批量加工的需求去处理加工字段;

public class AlipayItemProcessor implements ItemProcessor {

private static final Logger log = LoggerFactory.getLogger(AlipayItemProcessor.class);

@Override

public HopPayTranDO process(AlipayTranDO alipayTranDO) throws Exception {

HopPayTranDO hopPayTranDO = new HopPayTranDO();

hopPayTranDO.setTranId(alipayTranDO.getTranId());

hopPayTranDO.setChannel(alipayTranDO.getChannel());

hopPayTranDO.setTranType(alipayTranDO.getTranType());

hopPayTranDO.setCounterparty(alipayTranDO.getCounterparty());

hopPayTranDO.setGoods(alipayTranDO.getGoods());

hopPayTranDO.setAmount(alipayTranDO.getAmount());

hopPayTranDO.setIsDebitCredit(alipayTranDO.getIsDebitCredit());

hopPayTranDO.setState(alipayTranDO.getState());

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

String dateNowStr = sdf.format(new Date());

hopPayTranDO.setTranDate(dateNowStr);

hopPayTranDO.setMerId("00000001");

log.info(alipayTranDO.toString());

return hopPayTranDO;

}

}

AlipayFileItemWriter账单写入类

该文件写入类就是将刚才Processor输出的加工后的HopPayTranDO对象列表写入到文件中,代码一看就明白,不用太多解释,不明白的可以留言。

public class AlipayFileItemWriter {

public FlatFileItemWriter getAlipayItemWriter() {

FlatFileItemWriter txtItemWriter = new FlatFileItemWriter();

txtItemWriter.setAppendAllowed(true);

txtItemWriter.setShouldDeleteIfExists(true);

txtItemWriter.setEncoding("UTF-8");

txtItemWriter.setResource(new FileSystemResource("data/sample-data.txt"));

txtItemWriter.setLineAggregator(new DelimitedLineAggregator() {{

setDelimiter(",");

setFieldExtractor(new BeanWrapperFieldExtractor() {{

setNames(new String[]{"tranId", "channel", "tranType", "counterparty", "goods", "amount", "isDebitCredit", "state", "tranDate", "merId" });

}});

}});

return txtItemWriter;

}

}

BillBatchConfig JOB配置类

Job配置比较简单,先将下面要用到的JobBuilderFactory、StepBuilderFactory、AlipayFileItemReader、AlipayItemProcessor、AlipayFileItemWriter先分别注入实例,再配置step步骤,如果有多个步骤可以在flow(step1)后面.next(step2),对于简单的批量这样的线性设置步骤就可以满足要求了,如果涉及到复杂情况,我会再写一篇来介绍SpringBatch的高级应用来讲解,本篇就不赘述了。

@Configuration

@EnableBatchProcessing

public class BillBatchConfig {

@Autowired

public JobBuilderFactory jobBuilderFactory;

@Autowired

public StepBuilderFactory stepBuilderFactory;

@Autowired

private AlipayFileItemReader alipayFileItemReader;

@Autowired

private AlipayItemProcessor alipayItemProcessor;

@Autowired

private AlipayFileItemWriter alipayFileItemWriter;

@Bean

public Job importAliJob() {

return jobBuilderFactory.get("importAliJob")

.incrementer(new RunIdIncrementer())

.flow(step1())

.end()

.build();

}

@Bean

public Step step1() {

return stepBuilderFactory.get("step1")

. chunk(10)

.reader(alipayFileItemReader.getMultiAliReader())

.processor(alipayItemProcessor)

.writer(alipayFileItemWriter.getAlipayItemWriter())

.build();

}

}

小结

本篇文章只是SpringBatch的一个引子,还有很多功能在这里没有详细阐述,例如在批量中涉及到的为了提高性能的并发任务执行、数据校验、失败重试、记录处理异常处理等。接下来会再写一篇关于Springbatch的高级应用,里面会把上面提到的一些高级用法进行详细介绍,下面奉上前面功能的代码。

springboot批量更新实体_Spring Batch批量处理支付宝账单实践-基础篇相关推荐

  1. springboot批量更新实体_mybatis+mysql+springboot批量插入,批量更新数据

    在开发过程中都会遇到批量的插入或者是更新数据,本人只写了一下自己在做的过程中遇到问题,写此篇文章的作用:第一是记录自己在工作过程中的问题.第二是方便同行的伙伴们避免出现这样的问题消耗太多的时间.废话不 ...

  2. plsql 存储过程 批量提交_Spring Batch 批量处理策略

    为了帮助设计和实现批量处理系统,基本的批量应用是通过块和模式来构建的,同时也应该能够为程序开发人员和设计人员提供结构的样例和基础的批量处理程序. 当你开始设计一个批量作业任务的时候,商业逻辑应该被拆分 ...

  3. mysql批量更新报错_Mysql批量更新的三种方式

    前言 批量插入由于mysql的VALUES原生支持,使用较为便利. 批量更新的写法一般有三种,在更新数量较少的情况下,前两种性能不相上下.但是在更新字段增加,更新条数较多(500以上)建议使用第三种写 ...

  4. java jdbc 批量更新_java – JDBC PreparedStatement,批量更新和生成的密钥

    我在批处理中使用jdbc preparedStatement并尝试获取由此创建的生成密钥时出现问题. 代码 : PreparedStatement stmt = null; ... connectio ...

  5. mysql update多条件批量更新_Mybatis中进行批量更新(updateBatch)

    更新多条数据,每条数据都不一样 背景描述:通常如果需要一次更新多条数据有两个方式,(1)在业务代码中循环遍历逐条更新.(2)一次性更新所有数据(更准确的说是一条sql语句来更新所有数据,逐条更新的操作 ...

  6. mysql的批量更新的语法,MySql 批量更新语法

    mysql数据库在批量更新某表的字段时,语法如下: UPDATE A  AS a INNER JOIN ( SELECT * FROM A WHERE .... ) AS b ON a.order_i ...

  7. mysql 批量更新数据 备份_mysql 批量更新与批量更新多条记录的不同值实现方法...

    批量更新 mysql更新语句很简单,更新一条数据的某个字段,一般这样写: UPDATE mytable SET myfield = 'value' WHERE other_field = 'other ...

  8. java mongodb批量更新数据_MongoDB的批量查询条件进行批量更新数据

    今天遇到这样一个场景:在Java中批量更新MongoDB数据,不过每次更新的条件有不一样,那如何有效地进行更新操作呢? 刚开始的时候,我是想到循环批量更新操作,即每一种查询条件进行一次批量更新过程,这 ...

  9. SpringBoot图文教程「概念+案例 思维导图」「基础篇上」

    有天上飞的概念,就要有落地的实现 概念+代码实现是本文的特点,教程将涵盖完整的图文教程,代码案例 每个知识点配套自测面试题,学完技术自我测试 本文初学向,所以希望文中所有的代码案例都能敲一遍 大哥大姐 ...

最新文章

  1. Spark 1.6.2 + Beam 2.0.0读取Mongodb数据进行相应逻辑处理
  2. python基于什么语言-一种基于Python语言的EDA开发平台及其使用方法与流程
  3. 某小公司 RESTful、共用接口、前后端分离、接口约定的实践
  4. ifconfig 命令找不到解决
  5. 如何实现一次编码,到处运行?新一代云端一体化探索
  6. 大家都在用这个神器分析数据,而你还只会Excel表头过滤?
  7. SpringBoot文件上传异常之提示The temporary upload location xxx is not valid
  8. 【Python实例第27讲】增量PCA
  9. 天线的布局、基本术语、种类、隔离度设计要求介绍
  10. Android项目源码(八个)
  11. MySQL查询数据库日志的查询
  12. Linux下安装vmWare tools工具(详细讲解)
  13. 那些想上天的亿万富翁,开启了新的“太空竞赛”
  14. Failed to compile../public/UEditor/dialogs/template/template.html 1:0Module parse failed: Unexpec
  15. 网站如何做域名转移?闲置域名要及时处理
  16. echarts之toolbox-x,y
  17. 我的工作随手记(一)
  18. Hbase的scan原理
  19. gtj2018如何生成工程量报表_问答系列之广联达GTJ2018常见问题汇总
  20. 3GPP TS 23501-g51 中英文对照 | 4.3.1 Non-roaming architecture

热门文章

  1. Unity3D–Texture图片空间和内存占用分析
  2. Unity3D横版过关游戏(一)
  3. 基于Vue的架构设计
  4. bash有意思的记录
  5. c语言模拟器怎么打程序,C语言初学者练手小项目——万花模拟器
  6. Adroid游戏开发实例讲解(三)-小蝌蚪找妈妈附源码
  7. 博士怎样顺利开题,读博规划【对比国内外读博现状】
  8. 2020年第二届网鼎杯,青龙队伍题目,共十八道
  9. 火车票软件哪个好用_订车票的软件哪个好?2018十大购买车票APP排行榜推荐
  10. 巴比特 | 元宇宙每日必读:超八成跑输大盘,元宇宙概念股陷入集体滑坡的困境,网友:“凭概念炒上去,凭本事跌下来”...