Elastic-job实现分布式定时任务

前言

最近接了一个新的需求,需要使用到定时任务,由于我们的系统是分布式的,所以Spring自带的定时任务无法满足我们当前系统对定时任务的需要。我们需要一个能够便捷、高可用的分布式定时任务框架。比较了几个主流的分布式框架,我最终pick了当当开源项目ElasticJob

正文

Elastic-job

ElasticJob是一种分布式调度解决方案,由两个单独的项目ElasticJob-LiteElasticJob-Cloud组成。

通过灵活的调度,资源管理和作业管理功能,它创建了适合Internet场景的分布式调度解决方案,并通过开放式架构设计提供了多元化的作业生态系统。它为每个项目使用统一的作业API。开发人员只需要一次编写代码,就可以随意部署。

ElasticJob-LiteElasticJob-Cloud存在一定的区别

  • ElasticJob-Cloud基于mesos运行,是mesosFramework
  • ElasticJob-Lite自己可独立运行,轻量级去中心化。

ElasticJob提供了三种作业类型:

  • DataFlow类型:用于处理数据流,它又提供2种作业类型,分别是ThroughputDataFlowSequenceDataFlow。需要继承相应的抽象类。
  • Script类型:用于处理脚本,可直接使用,无需编码。
  • Simple类型(常用):用于作业简单的业务处理,未经任何封装的类型。需要继承AbstractSimpleElasticJob,该类只提供了一个方法用于覆盖,此方法将被定时执行。用于执行普通的定时任务,与Quartz原生接口相似,只是增加了弹性扩缩容和分片等功能。

ElasticJob的原理

1.通过分片的方式拆分任务

  • 任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。

例如:

  • 有一个遍历数据库某张表的作业,现有2台服务器。为了快速的执行作业,那么每台服务器应执行作业的50%。为满足此需求,可将作业分成2片,每台服务器执行1片。作业遍历数据的逻辑应为:服务器A遍历ID以奇数结尾的数据;服务器B遍历ID以偶数结尾的数据。
  • 如果分成10片,则作业遍历数据的逻辑应为:每片分到的分片项应为ID%10,而服务器A被分配到分片项0,1,2,3,4;服务器B被分配到分片项5,6,7,8,9,直接的结果就是服务器A遍历ID以0-4结尾的数据;服务器B遍历ID以5-9结尾的数据。
  • 任务总片数:shardingContext.getShardingTotalCount(),当前分片项:shardingContext.getShardingItem()

2.分片项与业务处理解耦

  • Elastic-Job并不直接提供数据处理的功能,框架只会将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与真实数据的对应关系。

3.个性化参数的适用场景

  • 个性化参数即shardingItemParameter,可以和分片项匹配对应关系,用于将分片项的数字转换为更加可读的业务代码。

例如:

  • 按照地区水平拆分数据库,数据库A是北京的数据;数据库B是上海的数据;数据库C是广州的数据。如果仅按照分片项配置,开发者需要了解0表示北京;1表示上海;2表示广州。
  • 合理使用个性化参数可以让代码更可读,如果配置为0=北京,1=上海,2=广州,那么代码中直接使用北京,上海,广州的枚举值即可完成分片项和业务逻辑的对应关系。

我们利用分片+自定义个性参数可以解决分布式调度时可能会遇到的数据倾斜问题。

4.Elasticjob幂等性机制保证:

  • 同一个分片在当前作业实例上不会被重复执行(通过检测相关的临时节点)
  • 一个作业分片不能同时在多个作业实例上执行(默认相隔10s),即时正在执行分片时发生宕机,重新指派给新机器,这个时候就会导致分片数据的重复执行。 在ElasticJob中,重新分片的时候,需要等待这个作业实例的所有分片作业执行完成才行。所以正在执行的分片任务不会被重复分配给其他作业实例。所以我们定时任务也要考虑设计作业的幂等性。

SpringBoot使用Elastic-job实现分布式定时任务

elastic-job-lite-consoleElasticJob提供的可视化运维平台:elastic-job-lite-console安装与下载

  1. 本控制台和Elastic Job并无直接关系,是通过读取Elastic Job的注册中心数据展现作业状态,或更新注册中心数据修改全局配置。
  2. 控制台只能控制作业本身是否运行,但不能控制作业进程的启停,因为控制台和作业本身服务器是完全分布式的,控制台并不能控制作业服务器。
  3. 访问地址:http://ip:8899 root/root 配置相关的Zookeeper address

maven依赖

<dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-core</artifactId><version>2.1.4</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></exclusion><exclusion><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclusion><exclusion><groupId>com.google.guava</groupId><artifactId>guava</artifactId></exclusion></exclusions>
</dependency><dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-spring</artifactId><version>2.1.4</version>
</dependency><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.4.7</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion><exclusion><groupId>log4j</groupId><artifactId>log4j</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></exclusion></exclusions>
</dependency>

SimpleProducerJob.class:一个用于测试的定时任务

@Slf4j
@Service
public class SimpleProducerJob implements SimpleJob {SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");@Overridepublic void execute(ShardingContext shardingContext) {log.info("当前时间为"+df.format(System.currentTimeMillis()));}
}

Elastic-job.xml配置文件:用于配置作业注册中心和定时作业

  • sharding-total-countsharding-item-parameters这两个参数用于配置Elastic-job的分片。
  • Elastic-job借助了分片机制和 失效转移机制可以通过异步并行的方式解决大批数据进行推送时的性能问题。
  • Elastic-job.xml更多配置可参考:Elastic-Job开发指南
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"xmlns:job="http://www.dangdang.com/schema/ddframe/job"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.dangdang.com/schema/ddframe/reghttp://www.dangdang.com/schema/ddframe/reg/reg.xsdhttp://www.dangdang.com/schema/ddframe/jobhttp://www.dangdang.com/schema/ddframe/job/job.xsd"><!--配置作业注册中心 --><reg:zookeeper id="regCenter" server-lists="127.0.0.1:2181" namespace="spring-dubbo-producer" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" /><!--样例job--><job:simple id="simpleProducerJob" class="com.luo.producer.task.SimpleProducerJob" registry-center-ref="regCenter" cron="*/5 * * * * ?" sharding-total-count="2" sharding-item-parameters="0=A,1=B" overwrite="true"/></beans>

cron即时间表达式,常用的如下:

每隔5秒执行一次:*/5 * * * * ?
每隔1分钟执行一次:0 */1 * * * ?
每天23点执行一次:0 0 23 * * ?
每天凌晨1点执行一次:0 0 1 * * ?
每月1号凌晨1点执行一次:0 0 1 1 * ?
每月最后一天23点执行一次:0 0 23 L * ?
每周星期天凌晨1点实行一次:0 0 1 ? * L
在26分、29分、33分执行一次:0 26,29,33 * * * ?
每天的0点、13点、18点、21点都执行一次:0 0 0,13,18,21 * * ?

验证

验证一:启动项目

启动项目后可以在elastic-job-lite-console运维平台我们可以看到我们注册的 SimpleProducerJob

在控制台我们可以看定时任务的日志打印信息

验证二:采用主动的方式去触发任务

一般情况下,设置节点jobName/instance/实例ID的值为TRIGGER时,可以触发实例立即执行任务,但任务正在执行中除外。事件通知后,会把该节点的值重置为空("")。

@Slf4j
public class TriggerJobUtils {public static boolean triggerElasticNodesJob(String jobName){try{JobNodePath jobNodePath = new JobNodePath(jobName);Iterator iterator = JobRegistry.getInstance().getRegCenter(jobName).getChildrenKeys(jobNodePath.getInstancesNodePath()).iterator();while(iterator.hasNext()) {String each = (String)iterator.next();log.info("触发的elastic-job 的节点是:{}",each);JobRegistry.getInstance().getRegCenter(jobName).persist(jobNodePath.getInstanceNodePath(each), "TRIGGER");}}catch (Exception e){log.error("触发elastic-job:{}失败",jobName);return false;}return true;}
}

UserTest.class:测试类

@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringDubboProducerAPP.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)public class UserTest {@Testpublic void seshi(){System.out.println("start 主动主动触发定时任务");TriggerJobUtils.triggerElasticNodesJob("simpleProducerJob");System.out.println("end 主动主动触发定时任务");}
}

在控制台我们可以看定时任务的日志打印信息

Elastic-job实现分布式定时任务相关推荐

  1. 使用elastic job管理调度定时任务

    使用elastic job管理调度定时任务 elastic-job elastic-job 是由当当网基于quartz 二次开发之后的分布式调度解决方案 , 由两个相对独立的子项目Elastic-Jo ...

  2. 几种主流的分布式定时任务,你知道哪些?

    欢迎关注方志朋的博客,回复"666"获面试宝典 单点定时任务 JDK原生 自从JDK1.5之后,提供了ScheduledExecutorService代替TimerTask来执行定 ...

  3. quartz 分布式_6大分布式定时任务对比

    作者 | sharedCode 来源 | blog.csdn.net/u012394095/article/details/79470904 分布式定时任务简介 把分散的,可靠性差的计划任务纳入统一的 ...

  4. 【解决方案】分布式定时任务解决方案

    [解决方案]分布式定时任务解决方案 参考文章: (1)[解决方案]分布式定时任务解决方案 (2)https://www.cnblogs.com/fonxian/p/10858101.html 备忘一下 ...

  5. 聊聊分布式定时任务中间件架构及其实现--转

    原文来自微信公众号:聊聊架构 在互联网应用中,各式各样的定时任务存于系统各个角落.我们希望由一个平台统一将这些作业管理起来.通过这个系统,作业的宕机.崩溃等状态就可收入运维同学掌控,直接对接报警系统, ...

  6. 3分钟掌握Quartz.net分布式定时任务的姿势

    长话短说,今天聊一聊分布式定时任务,我的流水账笔记: ASP.NET Core+Quartz.Net实现web定时任务 AspNetCore结合Redis实践消息队列 细心朋友稍一分析,就知道还有问题 ...

  7. 6大分布式定时任务对比

    作者 | sharedCode 来源 | blog.csdn.net/u012394095/article/details/79470904 分布式定时任务简介 把分散的,可靠性差的计划任务纳入统一的 ...

  8. 基于spring+quartz的分布式定时任务框架

    http://www.cnblogs.com/aaronfeng/p/5537177.html 问题背景 我公司是一个快速发展的创业公司,目前有200人,主要业务是旅游和酒店相关的,应用迭代更新周期比 ...

  9. 【redis】分布式锁实现,与分布式定时任务

    如果你还不知道redis的基本命令与基本使用方法,请看 [redis]redis基础命令学习集合 写在前面 redis辣么多数据结构,这么多命令,具体一点,都可以应用在什么场景呢?用来解决什么具体的问 ...

最新文章

  1. python3基础知识点总结_python基础知识点总结
  2. AI先驱、A*算法发明者Nils Nilsson去世
  3. aida64 extreme 序列号_基于SN序列号管理 轮胎行业仓储管理解决方案
  4. 切换账户、切换命令行和图形界面
  5. Time profile 使用
  6. ZYNQ UARTLite接收不定长数据
  7. css如何让不确定宽度的div水平居中
  8. 扎心!全国6.5亿网民月收入不足5000元
  9. 《我们不一样团队》项目需求分析改进
  10. CEF与JavaScript交互读取电脑信息
  11. 无法访问_win10纯净版提示无法访问文件或目录损坏的问题
  12. python手机号码替换代码_手机号码中间部分替换成星号
  13. 使用Linux修复Windows PC的10种最聪明的方法
  14. Mac 配置远程服务器 - 免密登陆
  15. Uncaught TypeError: XXX is not a function解决
  16. 快排 找第k大的数字
  17. 端粒效应《The Telemere Effect》程序员的养生指南(二)情绪、思维模式与健康
  18. Mysql基础面试题及查询联系
  19. c4d怎么导入fbx_c4d场景中怎么导入其他模型
  20. Python - Tips

热门文章

  1. 怎么在firefox 里面看维语,哈语等复杂字体的网页
  2. 数学建模part(2):整数,非线性规划
  3. Mac常见问题|Safari下载文件没有反应的解决方法
  4. 管家婆辉煌版7.2版,超级用户密码忘记了
  5. 启动nacos-server报错:java.io.IOException: java.lang.IllegalArgumentException: db.num is null
  6. 欧姆龙PLC项目程序NJ系列模切机程序 12轴EtherCAT总线伺服运动控制,包含回零、点动、定位、速度控制
  7. ztree java 异步_使用 zTree 异步加载
  8. EasyGBS摄像机网页直播之问题解决:海康设备通过TCP接入到EasyGBS, 设备不推流问题解析
  9. 立创封装怎么导入到cadence
  10. 2021全球人才竞争力指数排名:瑞士居首,中国跻身前40;德科集团与北京外企成立合资公司LHH FESCO | 美通社头条...