LTS 分布式任务调度框架- TaskTracker配置自定义分发jobRunner
1.Spring扫描初始化配置所在包
<context:annotation-config />
<context:component-scan base-package="com.ithzk.lts" />
2.配置文件 lts-ittracker.properties
lts.tasktracker.cluster-name=it_lts_cluster
lts.tasktracker.registry-address=zookeeper://127.0.0.1:2181
lts.tasktracker.node-group=it_trade_tasktracker
lts.tasktracker.configs.job.fail.store=mapdb
3.编写自定义JobRunner
package com.ithzk.lts.runner;import com.github.ltsopensource.core.domain.Action;
import com.github.ltsopensource.core.logger.Logger;
import com.github.ltsopensource.core.logger.LoggerFactory;
import com.github.ltsopensource.tasktracker.Result;
import com.github.ltsopensource.tasktracker.logger.BizLogger;
import com.github.ltsopensource.tasktracker.runner.JobContext;
import com.github.ltsopensource.tasktracker.runner.JobRunner;
import com.github.ltsopensource.tasktracker.runner.LtsLoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;public class JobRunnerDispatcherimplements JobRunner {private static final Logger LOGGER = LoggerFactory.getLogger(JobRunnerDispatcher.class);@Overridepublic Result run(JobContext jobContext) throws Throwable {try {//业务逻辑LOGGER.info("执行:" + jobContext);BizLogger bizLogger = LtsLoggerFactory.getBizLogger();// 会发送到 LTS JobTracker上bizLogger.info("测试");} catch (Exception e) {LOGGER.info("Run job failed!", e);return new Result(Action.EXECUTE_LATER, e.getMessage());}return new Result(Action.EXECUTE_SUCCESS, "Run job success!");}}
4.初始化taskTracker
package com.ithzk.lts.config;import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import com.github.ltsopensource.spring.TaskTrackerAnnotationFactoryBean;
import com.github.ltsopensource.tasktracker.TaskTracker;
import com.ithzk.lts.runner.JobRunnerDispatcher;@Configuration
public class TaskTrackerConfig implements ApplicationContextAware {private ApplicationContext applicationContext;@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}@Bean(name = "taskTracker")public TaskTracker getTaskTracker() throws Exception {TaskTrackerAnnotationFactoryBean factoryBean = new TaskTrackerAnnotationFactoryBean();factoryBean.setApplicationContext(applicationContext);factoryBean.setJobRunnerClass(JobRunnerDispatcher.class);factoryBean.setLocations("lts-ittracker.properties");factoryBean.setWorkThreads(64);factoryBean.setShardField("taskId");factoryBean.setBindIp("INFO");factoryBean.afterPropertiesSet();factoryBean.start();return factoryBean.getObject();}}
启动即可开始调度任务
附:很多小伙伴使用LTS的时候,会存在需要调用各种类型任务的情况,是否需要多个TaskTracker去执行不同的任务。官方给出的建议是当某种任务量非常大或者非常耗时的时候,可以将任务独立出一个或者多个TaskTracker去执行。任务量不大的时候可以使用一个TaskTracker来解决这种问题
主要通过Job中自定义参数实现,可以在Job中指定参数去处理调度各种任务处理不同业务逻辑,下面给出一个参考例子
首先需要不同业务逻辑的JobRunner
package com.ithzk.lts.runner;import java.util.HashMap;
import java.util.Map;import com.github.ltsopensource.core.domain.Action;
import com.github.ltsopensource.core.domain.Job;
import com.github.ltsopensource.core.logger.Logger;
import com.github.ltsopensource.core.logger.LoggerFactory;
import com.github.ltsopensource.tasktracker.Result;
import com.github.ltsopensource.tasktracker.logger.BizLogger;
import com.github.ltsopensource.tasktracker.runner.JobContext;
import com.github.ltsopensource.tasktracker.runner.JobRunner;
import com.github.ltsopensource.tasktracker.runner.LtsLoggerFactory;public class AJobRunner implements JobRunner{private static final Logger logger = LoggerFactory.getLogger(AJobRunner.class);/*** 启动新的线程去执行具体业务*/class newJobThread implements Runnable{Map<String, String> params = new HashMap<String,String>();newJobThread(Map<String, String> params){this.params = params;}@Overridepublic void run() {try {Thread.sleep(60000);} catch (InterruptedException e) {e.printStackTrace();}//调用具体业务 }}@Overridepublic Result run(JobContext jobContext) throws Throwable {try { Job job = jobContext.getJob();String params = job.getExtParams().get("params");Map<String, String> param = new HashMap<String,String>();new Thread(new newJobThread(param)).start();BizLogger bizLogger = LtsLoggerFactory.getBizLogger();bizLogger.info("AJobRunner.success.......");} catch (Exception e) {logger.info("Run job failed!", e);return new Result(Action.EXECUTE_FAILED, e.getMessage());}return new Result(Action.EXECUTE_SUCCESS, "Run job success!");}}
package com.ithzk.lts.runner;import java.util.concurrent.ConcurrentHashMap;import com.github.ltsopensource.core.domain.Action;
import com.github.ltsopensource.core.domain.Job;
import com.github.ltsopensource.core.logger.Logger;
import com.github.ltsopensource.core.logger.LoggerFactory;
import com.github.ltsopensource.tasktracker.Result;
import com.github.ltsopensource.tasktracker.runner.JobContext;
import com.github.ltsopensource.tasktracker.runner.JobRunner;/*** 任务执行分发*/
public class JobRunnerDispatcher implements JobRunner {private static final Logger LOGGER = LoggerFactory.getLogger(JobRunnerDispatcher.class);private static final ConcurrentHashMap<String, JobRunner> JOB_RUNNER_MAP = new ConcurrentHashMap<String, JobRunner>();static {JOB_RUNNER_MAP.put("aJobRunner", new AJobRunner());}@Overridepublic Result run(JobContext jobContext) throws Throwable {try {//根据type选择对应的JobRunner运行Job job = jobContext.getJob();String taskType= job.getParam("taskType");LOGGER.warn("IT_Trade_Tasktracker 正在匹配对应任务.........");JobRunner jobRunner = JOB_RUNNER_MAP.get(taskIdentifier);LOGGER.warn("IT_Trade_Tasktracker 进入任务中......... TaskType:"+taskType);return jobRunner.run(jobContext);} catch (Exception e) {LOGGER.info("Run job failed!", e);return new Result(Action.EXECUTE_LATER, e.getMessage());}//return new JobDispatcher().run(jobContext);}}
这样就可以解决一个taskTracker处理不同任务的问题了,希望能帮到大家
LTS 分布式任务调度框架- TaskTracker配置自定义分发jobRunner相关推荐
- 分布式任务调度框架XXL-JOB --配置部署
配置部署"调度中心" 调度中心项目:xxl-job-admin 作用:统一管理任务调度平台上调度任务,负责触发调度执行,并且提供任务管理平台. 步骤一:调度中心配置: 调度中心配置 ...
- LTS 轻量级分布式任务调度框架(Light Task Scheduler)
框架概况: LTS是一个轻量级分布式任务调度框架.有三种角色, JobClient, JobTracker, TaskTracker.各个节点都是无状态的,可以部署多个,来实现负载均衡,实现更大的负载 ...
- LTS 轻量级分布式任务调度框架(Light Task Schedule) - 推酷
LTS 轻量级分布式任务调度框架(Light Task Schedule) - 推酷
- 自己动手实现分布式任务调度框架
前段时间,公司要改造现有的单节点调度为分布式任务调度,然后就研究了目前市面上主流的开源分布式任务调度框架,用起来就一个感觉:麻烦!特别是之前在一个类里写了好多个调度任务,改造起来更加麻烦.我这人又比较 ...
- 分布式任务调度框架Power-Job
分布式任务调度框架的由来及对比 在大型业务业务系统中,不可避免会出现一些需要定时执行需求的场景,例如定时同步数据,定时清洗数据,定时生成报表,大量机器一同执行某个任务,甚至有些需要分布式处理的任务例如 ...
- XXL-Job分布式任务调度框架-- 介绍和调度中心的搭建启动1
一 xxl-job介绍 1.1 xxl-job介绍 xxl-job是轻量级的分布式任务调度框架,目标是开发迅速.简单.清理.易扩展; 老版本是依赖quartz的定时任务触发,在v2.1.0版本开始 移 ...
- 【niubi-job——一个开源的分布式任务调度框架】-----安装教程
niubi-job是什么 niubi-job是LZ耗时三个星期,费尽心血打造的一个具备高可靠性以及水平扩展能力的分布式任务调度框架,采用quartz作为底层的任务调度管理器,zookeeper做集群的 ...
- 分布式任务调度框架和微服务的区别
一.前言 分布式大行其下的时代,让大家彻底的抛弃了传统陈旧的技术框架.几乎每一个技术人都知道和掌握了微服务架构,微服务自然有它的美,但是所以技术框架都必须服务于业务,结合自身业务选取甚至自研适合自身的 ...
- 分布式任务调度框架(Temporal)介绍
分布式任务调度框架基本能力: 任务管理能力(增删改查.执行.定时执行.延时执行.健康监控) 集群管理能力(扩展简单.效率高) 编程能力(运行代码) Web界面管理 目前市面上有很多可用于处理分布式任务 ...
最新文章
- 2016年IoT和新的逃逸技术将会引领威胁态势
- ibase4j nginx配置
- grub配置文件丢失的情况下修复
- 【BZOJ-1458】士兵占领 最大流
- java使用序列化实现深克隆
- 九年级数学解方程50道_【初中数学】北师大版九年级上册数学知识点总结
- 1.3 编程基础之算术表达式与顺序执行 08 温度表达转化
- 2021年六月中旬推荐文章
- 将多个文件绑在一起执行
- 数据可视化详解+代码演练
- CSS3+JS制作的一款图标任意拖动,并且可以放在文件夹中
- MAX232芯片的引脚图和电脑串口的连接电路及RS232引脚定义详细说明
- C++程序设计:字符图形输出(数字三角形)
- html 视频类插件,科技常识:HTML5视频播放插件 video.js介绍
- MGTV提取Ticket-保姆级教程
- grub引导项修复详解_Grub引导项修复详解
- 《抡语》 kong子 (收集)
- Adnroid 使用安卓自带的人脸识别API
- 网站域名在微信端被封禁了怎么办?这样几步就能解封!
- Android大型实战:《亲信,优雅的从入门到进阶》-刘桂林-专题视频课程
热门文章
- 【图片验证码识别】使用深度学习来 识别captcha 验证码
- 两台计算机无路由共享网络连接,路由器如何连接两台计算机: 网络共享方案
- e电帮app,开拓未来
- 怎么才能减肥最快 健康瘦身是首选
- 微服务架构 面向服务型架构_微服务架构听起来像面向服务的架构
- Netty 实现HTTP重定向
- Java(封装、继承、多态、接口)组成一个继承树,表示游戏中的角色练习题
- 再无铁饭碗?一高校启动教师全员「末位淘汰」:取消终身教职,所有人一起卷
- 计算机网络技术专业认识ppt,计算机网络技术实验实训指导 教学课件 作者 於建 第1章 网络硬件设备认识实验.ppt...
- vue的一键换肤功能