LTS简介以及与SpringBoot的简单集成

一 什么是LTS

关于定时任务,虽然Spring提供了基于注解@EnableScheduling @Scheduled的实现方式。其实现是通过线程池ScheduledThreadPoolExecutor的方式,具体这里就不多做介绍啦,有兴趣的小伙伴可以自行了解下~
但是以上只适用于单机的情况下,如果是分布式项目的话,就会显得有些力不从心了。所以这里给大家介绍一个分布式任务调度框架 -> LTS,它是阿里巴巴的一个开源项目。
借用官方文档的一句话就是:

LTS 着力于解决分布式任务调度问题,将任务的提交者和执行者解耦,解决任务执行的单点故障,支持动态扩容,出错重试等机制。代码程序设计上,参考了优秀开源项目Dubbo,Hadoop的部分思想。

二 LTS架构总览

以下图片来自官方文档

乍一看这都啥跟啥啊,有点麻烦的样子,别急,且听我慢慢道来,咱们看图说话。

从上图可以看出,LTS包含以下几种节点类型:

  1. JobClient -> 负责提交任务,并接收任务执行反馈结果。
  2. JobTracker -> 负责任务调度,接收并分配任务。
  3. TaskTracker -> 负责执行任务,执行完之后将任务执行结果反馈给 JobTracker
  4. Monitor -> 负责收集各个节点的监控信息,包括任务监控信息,节点JVM监控信息。
  5. Admin -> 则是后台管理,负责节点管理,任务队列管理,监控管理等。

LTS的这五种节点都是无状态的,都可以部署多个,动态扩容,来实现负载均衡,实现更大的负载量, 并且框架采用FailStore策略使LTS具有很好的容错能力。

既然节点可以以集群的方式部署,那么肯定少不了注册中心啦!如上图第一行所示:
注册中心可以使Zookeeper或者Redis官方推荐使用Zookeeper作为注册中心(划重点,要考的)。

继续看图,FailStore -> 顾名思义就是失败存储,主要用于在部分场景远程RPC调用失败的情况,采取现存储本地KV文件系统,待远程通信恢复的时候再进行数据补偿。
主要用于节点容错,当远程数据交互失败之后,存储在本地,等待远程通信恢复的时候,再将数据提交。

接着是FailStore的右边,有个QueueManager -> 任务队列 ,主要用于存储任务数据和任务执行日志等。支持mysqlmongodb实现,官方推荐使用mysql,当然也可以自己扩展实现,例如Oracle。

然后是节点组NodeGroup,每个节点组的节点都是平等的,对外提供相同的服务。每个节点组中都有一个master节点,动态选举得到的。

最后是ClusterName,也就是LTS集群,就如上图所示,整个图就是一个集群,包含LTS的五种节点。

OK,大概了解了LTS的架构之后,让我们继续了解一下LTS的执行流程吧!

三 LTS执行流程

以下图片来自官方文档

说了这么多,那么LTS都支持什么类型的任务呢?目前支持以下几种任务:

实时任务,提交之后就会马上执行的任务;
定时任务,在指定时间点执行的任务,譬如 今天3点执行(单次)。
Cron任务:CronExpression,和quartz类似(但是不是使用quartz实现的)譬如 0 0/1 * ?
Repeat任务(重复任务):譬如每隔5分钟执行一次,重复50次就停止。

由于篇幅有限,关于LTS的介绍就先到这里,更多介绍小伙伴可以参阅 官方文档 ~
LTS gihub地址 -> LTS项目源码
LTS 例子地址 -> LTS使用实例

你可以直接下载LTS使用实例,按照官方指示,在本地运行起来。 里面集成了spring以及springboot等的使用~

好了,接下来,咱们就正式进入SprigBoot和LTS的集成。

四 SpringBoot集成LTS

特别说明:本示例的主要目的仅仅是告诉大家如何使用LTS,所以偷了个懒,将所有节点都揉合到了一个工程,实际项目是分开部署的,因需而定。
整个工程其实很简单,不信你看:

1. 准备工作
  • 新建SpringBoot工程(这不废话吗);
  • 导入相应的依赖,如果你是导入的官方example,则不需要做这些工作。如果是新建SpringBoot工程,我的版本是 2.2.1.RELEASE,则不要直接把官方的example pom依赖复制过来,因为版本兼容问题,可能会导致错误~ 以下是我的pom文件依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--lts--><dependency><groupId>com.github.ltsopensource</groupId><artifactId>lts</artifactId><version>1.7.0</version></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.25.Final</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.58</version></dependency><dependency><groupId>org.mapdb</groupId><artifactId>mapdb</artifactId><version>2.0-beta10</version></dependency><dependency><groupId>com.101tec</groupId><artifactId>zkclient</artifactId><version>0.3</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.0.14</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.26</version></dependency><dependency><groupId>org.javassist</groupId><artifactId>javassist</artifactId><version>3.20.0-GA</version></dependency></dependencies>

注意如果你用的是Redis作为注册中心,mongodb作为任务队列,那么请引入相应的依赖,我这里用的是Zookeeper和mysql。

  • 然后是配置文件信息,如下:
##########################################
# jobclient->负责提交任务以及接收任务执行结果 #
##########################################
#集群名称
lts.jobclient.cluster-name=test_cluster
#注册中心
lts.jobclient.registry-address=zookeeper://127.0.0.1:2181
#JobClient节点组名称
lts.jobclient.node-group=test_jobClient
#是否使用RetryClient
lts.jobclient.use-retry-client=true
#失败存储,用于服务正常后再次执行(容错处理)
lts.jobclient.configs.job.fail.store=mapdb#######################################
# jobtracker->负责调度任务 接收并分配任务 #
#######################################
lts.jobtracker.cluster-name=test_cluster
lts.jobtracker.listen-port=35001
lts.jobtracker.registry-address=zookeeper://127.0.0.1:2181
lts.jobtracker.configs.job.logger=mysql
lts.jobtracker.configs.job.queue=mysql
lts.jobtracker.configs.jdbc.url=jdbc:mysql://127.0.0.1:3306/lts
lts.jobtracker.configs.jdbc.username=root
lts.jobtracker.configs.jdbc.password=root###########################################################
# tasktracker->负责执行任务 执行完任务将执行结果反馈给JobTracker #
###########################################################
lts.tasktracker.cluster-name=test_cluster
lts.tasktracker.registry-address=zookeeper://127.0.0.1:2181
#TaskTracker节点组默认是64个线程用于执行任务
#lts.tasktracker.work-threads=64
lts.tasktracker.node-group=test_trade_TaskTracker
#lts.tasktracker.dispatch-runner.enable=true
#lts.tasktracker.dispatch-runner.shard-value=taskId
lts.tasktracker.configs.job.fail.store=mapdb################################################################
# jmonitor->负责收集各个节点的监控信息,包括任务监控信息,节点JVM监控信息 #
################################################################
lts.monitor.cluster-name=test_cluster
lts.monitor.registry-address=zookeeper://127.0.0.1:2181
lts.monitor.configs.job.logger=mysql
lts.monitor.configs.job.queue=mysql
lts.monitor.configs.jdbc.url=jdbc:mysql://127.0.0.1:3306/lts
lts.monitor.configs.jdbc.username=root
lts.monitor.configs.jdbc.password=root

肯定有人要问了,哪来的lts数据库,所以,还需要新建一个数据库,名为lts ,其他不用管,因为表信息在项目启动之后会自动创建的。

除了application.properties配置文件,还需要新建log4j.properties日志配置文件,如下:

log4j.rootLogger=INFO,stdout
log4j.appender.stdout.Threshold=INFO
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d [%t] (%F:%L) %-5p %c %x - %m%n

说明:以上配置信息,均可以在官方示例lts-example找到。

完成以上准备工作之后,接着便是实现任务提交以及任务执行了。

2. 启动类设置

其实很简单,加上相应的注解即可:

@SpringBootApplication
@EnableJobClient        //JobClient
@EnableTaskTracker      //TaskTracker
@EnableJobTracker       //JobTracker
@EnableMonitor          //Monitor
public class LtstestApplication {public static void main(String[] args) {SpringApplication.run(LtstestApplication.class, args);}}

哈哈,是不是和集成微服务很像啊,也是需要添加相应的注解,具体作用一目了然,就不多做赘述啦~

3. JobClient提交任务

关于Jobclient使用的官方建议

一般在一个JVM中只需要一个JobClient实例即可,不要为每种任务都新建一个JobClient实例,这样会大大的浪费资源,因为一个JobClient可以提交多种任务。

本示例中,我直接写在了TestController中,模拟了提交两个不同的任务:

     @Autowiredprivate JobClient jobClient; @GetMapping("test01")public Map<String, Object> test01() {//模拟提交一个任务Job job = new Job();job.setTaskId("task-AAAAAAAAAAAAAAA");job.setCronExpression("0/3 * * * * ?");//设置任务类型 区分不同的任务 执行不同的业务逻辑job.setParam("type", "aType");job.setNeedFeedback(true);//任务触发时间 如果设置了 cron 则该设置无效
//        job.setTriggerTime(DateUtils.addDay(new Date(), 1).getTime());//任务执行节点组job.setTaskTrackerNodeGroup("test_trade_TaskTracker");//当任务队列中存在这个任务的时候,是否替换更新job.setReplaceOnExist(false);Map<String, Object> submitResult = new HashMap<String, Object>(4);try {//任务提交返回值 responseResponse response = jobClient.submitJob(job);submitResult.put("success", response.isSuccess());submitResult.put("msg", response.getMsg());submitResult.put("code", response.getCode());} catch (Exception e) {log.error("提交任务失败", e);throw new RuntimeException("提交任务失败");}return submitResult;}@GetMapping("test02")public Map<String, Object> test02() {//模拟提交一个任务Job job = new Job();job.setTaskId("task-BBBBBBBBBBBBBBB");job.setCronExpression("0/6 * * * * ?");//设置任务类型 区分不同的任务 执行不同的业务逻辑job.setParam("type", "bType");job.setNeedFeedback(true);//任务触发时间 如果设置了 cron 则该设置无效
//        job.setTriggerTime(DateUtils.addDay(new Date(), 1).getTime());//任务执行节点组job.setTaskTrackerNodeGroup("test_trade_TaskTracker");//当任务队列中存在这个任务的时候,是否替换更新job.setReplaceOnExist(false);Map<String, Object> submitResult = new HashMap<String, Object>(4);try {Response response = jobClient.submitJob(job);submitResult.put("success", response.isSuccess());submitResult.put("msg", response.getMsg());submitResult.put("code", response.getCode());} catch (Exception e) {log.error("提交任务失败", e);throw new RuntimeException("提交任务失败");}return submitResult;}

注意看:JobClient我们可以直接引入,然后构建一个Job,通过JobClient进行提交。任务提交之后,JobTracker会对任务进行分发,分发方式有如下两种:

TaskTracker会定时发送pull请求给JobTracker, 默认1s一次, 在发送pull请求之前,会检查当前TaskTracker是否有可用的空闲线程,如果没有则不会发送pull请求,同时也会检查本节点机器资源是否足够,主要是检查cpu和内存使用率,默认超过90%就不会发送pull请求,当JobTracker收到TaskTracker节点的pull请求之后,再从任务队列中取出相应的已经到了执行时间点的任务 push给TaskTracker,这里push的个数等于TaskTracker的空余线程数。

还有一种途径是,每个TaskTracker线程处理完当前任务之后,在反馈给JobTracker的时候,同时也会询问JobTracker是否有新的任务需要执行,如果有JobTracker会同时返回给TaskTracker一个新的任务执行。所以在任务量足够大的情况下,每个TaskTracker基本上是满负荷的执行的。

4. TaskTracker执行任务

关于TaskTracker使用的官方建议

一个JVM一般也尽量保持只有一个TaskTracker实例即可,多了就可能造成资源浪费。
当遇到一个TaskTracker要运行多种任务的时候,在一个JVM中,最好使用一个TaskTracker去运行多种任务,因为一个JVM中使用多个TaskTracker实例比较浪费资源(当然当你某种任务量比较多的时候,可以将这个任务单独使用一个TaskTracker节点来执行)。

上面提交了两个任务,分别是任务A任务B,所以这里演示的是一个TaskTracker执行多种不同的任务
任务的执行必须实现JobRunner接口,如下任务A

public class JobRunnerA implements JobRunner {@Overridepublic Result run(JobContext jobContext) throws Throwable {//  TODO A类型Job的逻辑System.out.println("我是Runner A");return null;}}

任务B同理,就不重复贴出代码了。

需要指出的是,在SpringBoot中,任务的执行需要添加@JobRunner4TaskTracker注解,但是有且只能有一个@JobRunner4TaskTracker注解。所以,对于同一个TaskTracker执行不同的任务,需要进行调度执行,如下:

/*** 总入口,在 taskTracker.setJobRunnerClass(JobRunnerDispatcher.class)* JobClient 提交 任务时指定 Job 类型  job.setParam("type", "aType")*/
@JobRunner4TaskTracker
public class JobRunnerDispatcher implements JobRunner {private static final Logger log = LoggerFactory.getLogger(JobRunnerDispatcher.class);private static final ConcurrentHashMap<String/*type*/, JobRunner>JOB_RUNNER_MAP = new ConcurrentHashMap<String, JobRunner>();static {JOB_RUNNER_MAP.put("aType", new JobRunnerA()); // 也可以从Spring中拿JOB_RUNNER_MAP.put("bType", new JobRunnerB());}@Overridepublic Result run(JobContext jobContext) throws Throwable {Job job = jobContext.getJob();String type = job.getParam("type");return JOB_RUNNER_MAP.get(type).run(jobContext);}}

说明:该JobRunnerDispatcher 类同样实现了JobRunner接口,并且添加了 @JobRunner4TaskTracker注解,表示该类才是真正会执行任务的地方。通过该类,实现不同的任务执行。

实际上,到这里基本整个LTS任务从提交到执行就已经完成了,也就是简单的集成完成了。
可以直接启动项目了~

5. master节点监听以及任务完成处理类

这个不必多说,直接看代码好了(来自lts-example):

/*** 主节点监听*/
@MasterNodeListener
public class MasterNodeChangeListener implements MasterChangeListener {private static final Logger log = LoggerFactory.getLogger(MasterNodeChangeListener.class);/*** @param master   master节点* @param isMaster 表示当前节点是不是master节点*/@Overridepublic void change(Node master, boolean isMaster) {// 一个节点组master节点变化后的处理 , 譬如我多个JobClient, 但是有些事情只想只有一个节点能做。if (isMaster) {log.info("我变成了节点组中的master节点了, 恭喜, 我要放大招了");} else {log.info(StringUtils.format("master节点变成了{},不是我,我不能放大招,要猥琐", master));}}}
/*** 任务完成处理类*/
@Component
public class JobCompletedHandlerImpl implements JobCompletedHandler {private static final Logger log = LoggerFactory.getLogger(JobCompletedHandlerImpl.class);@Overridepublic void onComplete(List<JobResult> jobResults) {//对任务执行结果进行处理 打印相应的日志信息if (CollectionUtils.isNotEmpty(jobResults)) {for (JobResult jobResult : jobResults) {String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());log.info("任务执行完成taskId={}, 执行完成时间={}, job={}",jobResult.getJob().getTaskId(), time, jobResult.getJob().toString());}}}}
6. Admin后台管理

这个直接去官方源码复制下来admin模块,然后丢到Tomcat本地启动就OK了~

7.项目启动

启动项目,分别调用接口 /test01() /test02()

可以看到,master节点的变化是在监听中的,以及不同的任务分别在执行。
OK,那么这些信息,在后台管理能看到吗,答案是肯定的如图:

可以看到,通过后台管理,我们可以暂停或者删除任务,以及添加新的任务。当然还有很多其他功能…

OK,关于LTS的简单介绍,以及与SpringBoot的集成,就到这里啦~
希望对看过的小伙伴能有帮助~

LTS简介以及与SpringBoot的简单集成相关推荐

  1. SpringBoot与SpringCloud集成

    SpringBoot与SpringCloud集成 : 简介 Spring Cloud是一系列框架的有序集合.它利用Spring Boot的开发便利性巧妙地简化了分布式系统基础设施的开发,如服务发现注册 ...

  2. SpringBoot 中JPA集成PostgreSql(详细步骤)避坑!

    SpringBoot 中JPA集成PostgreSql(详细步骤) 什么是JPA?(Java Persistence API) Spring Data JPA, part of the larger ...

  3. springboot security 权限校验_十二、SpringBoot 优雅的集成Spring Security

    前言 至于什么是Spring security ,主要两个作用,用户认证和授权.即我们常说的,用户只有登录了才能进行其他操作,没有登录的话就重定向到登录界面.有的用户有权限执行某一操作,而有的用户不能 ...

  4. SpringBoot与Docker集成

    SpringBoot与Docker集成 许多人正在使用容器包装其Spring Boot应用程序,而构建容器并不是一件容易的事.这是Spring Boot应用程序开发人员的指南,容器对于开发人员而言并非 ...

  5. SpringBoot使用JWT集成Ng-Alain之Token失效处理

    在 SpringBoot使用JWT集成Ng-Alain中,我们简单介绍了SpringBoot与Ng-Alain的集成,在这种前后端分离框架实践中,我们使用了JWT来作为交互的安全标识,考虑一个问题,从 ...

  6. 一个springboot 项目a集成另一个springboot 项目b

    一个springboot 项目a集成另一个springboot 项目b 并且可以运行访问b的controller层 操作1: 项目b打包依赖修改,把上面的springboot默认打包依赖注释,改为下面 ...

  7. 1、OPenGL ES - 简介、iOS中GLKit简单应用

    OPenGL ES -  简介.iOS中GLKit简单应用 一.OPenGL ES 1.简介: OpenGL ES 是以手持和嵌入式为目标的高级的3D图形应用程序编程接口(API),OpenGL ES ...

  8. SpringBoot项目中集成第三方登录功能

    SpringBoot项目中集成第三方登录功能 引言 1 环境准备 2 代码实现 3 第三方平台认证申请 4 打包和部署项目 5 第三方平台登录认证测试 6 参考文章 引言 最近想把自己在公众号上介绍过 ...

  9. SpringBoot 分布式系统简单了解

    SpringBoot 分布式系统简单了解 SpringBoot 框架的使用基本是了解的差不多了,马上就进入微服务的阶段了,在此之前必须要了解什么是分布式系统. 1. 分布式系统简介 分布式系统(dis ...

最新文章

  1. Insertion Sort List
  2. 大神,快来救救我,我搞不定啊
  3. Easyui入门视频教程 第01集---认识Easyui
  4. 学长毕业日记 :本科毕业论文写成博士论文的神操作20170404
  5. java string
  6. 七、朴素贝叶斯中文文本分类
  7. 检测工业级交换机性能好坏的8种方法
  8. Maven中安装本地Jar包到仓库中或将本地jar包上传
  9. C++函数分文件编写
  10. 基于人脸识别的课堂签到管理系统(三)---实时时间显示以及百度AI人脸识别
  11. 矩阵运算_Eigen使用_旋转矩阵/角轴/欧拉角/四元数相互转换
  12. tomcat利用setenv对内存的限制和docker中tomcat内存的优雅配置
  13. touch 创建一个普通文件或更新已有文件的时间
  14. 三人表决器实验报告总结_三人表决器实验报告
  15. 【无标题】2021年烷基化工艺证考试及烷基化工艺操作证考试
  16. 表格的边框如何做到和EXCEL的外边框为粗线,内部为细线的效果
  17. 分布式与集群的区别?
  18. 4、公司融资 - 开公司创业系列文章
  19. CPU GPU设计工作原理
  20. 爱笑的女孩子,运气不会太差

热门文章

  1. java动态规划鸡蛋问题_动态规划——楼层扔鸡蛋问题
  2. 数据告诉你:哪个行业最难逃35岁危机?
  3. 对角化原理和停机问题
  4. 前端错误日志监控-sentry安装
  5. Vue抛 Property or method turn is not defined on the instance but referenced during render. 的解决方法
  6. 使用jquery.fileupload.js上传文件时添加进度条
  7. pyhton爬取:爬取爱豆(李易峰)微博评论,看看爱豆粉丝的关注点在哪(附源码)
  8. 互联网公司图鉴:利用人性的弱点才能赚钱
  9. 8.18 B组 T1 分火腿
  10. Python面试题(6)