2019独角兽企业重金招聘Python工程师标准>>>

Twitter Storm的新利器Pluggable Scheduler

发表于 2012 年 05 月 21 日 由 xumingming
作者: xumingming | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明
网址: http://xumingming.sinaapp.com/854/twitter-storm-pluggable-scheduler/

最近把Twitter Storm的新特性:可插拔式的任务分配器(Pluggable Scheduler)给实现了,将在0.8.0版本里面跟大家见面。这篇文章先给大家尝尝鲜,介绍下这个新特性。

在Pluggable Scheduler之前,Twitter Storm里面对于用户提交的每个Topology进行任务分配是由nimbus来做的,nimbus的任务分配算法可是非常牛逼的哦,主要特点如下

  • 在slot充沛的情况下,能够保证所有topology的task被均匀的分配到整个机器的所有机器上
  • 在slot不足的情况下,它会把topology的所有的task分配到仅有的slot上去,这时候其实不是理想状态,所以。。
    • 在nimbus发现有多余slot的时候,它会重新分配topology的task分配到空余的slot上去以达到理想状态。
  • 在没有slot的时候,它什么也不做

一般情况下,用这种默认的task分配机制就已经足够了。但是也会有一些应用场景是默认的task分配机制所搞定不了的,比如

  • 如果你想你的spout分配到固定的机器上去 — 比如你的数据就在那上面
  • 如果你有两个topology都很耗CPU,你不想他们运行在同一台机器上

这些情况下我们默认的task分配机制虽然强大,却是搞不定的,因为它根本就不考虑这些。所以我们设计了新的Pluggable Scheduler机制,使得用户可以编写自己的task分配算法 — Scheduler来实现自己特定的需求。下面我们就来亲自动手来看看怎么才能实现上面提到的默认Scheduler搞不定的第一个场景,为了后面叙述的方便,我们来细化一下这个需求:让我们的名为special-spout的组件分配到名为special-supervisor的supervisor上去

要实现一个Scheduler其实很简单,只要实现IScheduler

 
public interface IScheduler {/*** Set assignments for the topologies which needs scheduling. The new assignments is available* through <code>cluster.getAssignments()</code>**@param topologies all the topologies in the cluster, some of them need schedule. Topologies object here*       only contain static information about topologies. Information like assignments, slots are all in*       the <code>cluster</code>object.*@param cluster the cluster these topologies are running in. <code>cluster</code> contains everything user*       need to develop a new scheduling logic. e.g. supervisors information, available slots, current*       assignments for all the topologies etc. User can set the new assignment for topologies using*       <code>cluster.setAssignmentById</code>*/publicvoid schedule(Topologies topologies, Cluster cluster);
}

这个接口会提供两个参数,其中Topologies包含当前集群里面运行的所有Topology的信息:StormTopology对象,配置信息,以及从task到组件(bolt, spout)id的映射信息。Cluster对象则包含了当前集群的所有状态信息:对于系统所有Topology的task分配信息,所有的supervisor信息等等 — 已经足够我们实现上面的那个需求了,让我们动手吧

找出我们的目标Topology

首先我们要确定我们的topology是否已经提交到集群了,很简单,到topologies对象里面找找看,找到了的话就说明已经提交了。

帮助
 
// Gets the topology which we want to schedule
TopologyDetails topology = topologies.getByName( "special-topology" );

只要这个topology不为null的话就说明这个topology已经提交了。

目标Topology是否需要分配

紧接着我们要看看这个topology需不需要进行task分配 — 有可能之前分配过了。怎么弄呢?很简单,Cluster对象已经提供了api可以使用

帮助
 
boolean needsScheduling = cluster.needsScheduling(topology);

这里要说明的一点是,有关Scheduler编写的几乎所有api都是定义在Cluster类里面,大家只要把这个类搞熟悉,编写起Scheduler起来应该就得心应手了。如果这个topology需要进行task分配我们还要看下有那些task需要进行分配 — 因为可能有部分task已经被分配过了

帮助
 
// find out all the needs-scheduling components of this topology
Map<String, List<Integer>> componentToTasks = cluster.getNeedsSchedulingComponentToTasks(topology);

   

我们的目标spout是否需要分配?

因为我们的目标是让名为special-spout的组件运行在名为special-supervisor的supervisor上,所以我们要看看这些task里面有没有是属于special-spout的task,很简单,上面返回的componentToTasks就是从component-idtask-ids的一个映射。所以要找出special-spout就很简单了

帮助
 
List<Integer> tasks = componentToTasks.get( "special-spout" );

找出目标supervisor

找到我们要分配的task之后,我们还要把我们的special-supervisor找出来,Cluster同样提供了方便的方法:

// find out the our "special-supervisor" from the supervisor metadata
Collection<SupervisorDetails> supervisors = cluster.getSupervisors().values();
SupervisorDetails specialSupervisor =null;
for (SupervisorDetails supervisor : supervisors) {Map meta = (Map) supervisor.getSchedulerMeta();if(meta.get("name").equals("special-supervisor")) {specialSupervisor = supervisor;break;}
}

   

这里要特别说明一下Map meta = (Map) supervisor.getSchedulerMeta();, 我们前面说名为special-supervisor的supevisor,其实在storm里面supervisor是没有名字的,这里我们所谓的名字是从supervisor.getSchedulerMeta里面找出来的,这个schedulerMeta是supervisor上面配置的给scheduler使用的一些meta信息,你可以配置任意信息!比如在这个例子里面,我在storm.yaml里面配置了:

 
supervisor.scheduler.meta:
   name: "special-supervisor"

这样我们才能用meta.get("name").equals("special-supervisor")找到我们的special-supervisor到这里我们就找到了我们的special-supervisor,但是要记住一点的是,我们的集群里面有很多topology,这个supervisor的slot很可能已经被别的topology占用掉了。所以我们要检查下有没有slot了

List<WorkerSlot> availableSlots = cluster.getAvailableSlots(specialSupervisor);

判断上面的availableSlots是不是空就知道有没有空余的slot了,如果没有slot了怎么办?没别的topology占用掉了怎么办?很简单!把它赶走

帮助
 
// if there is no available slots on this supervisor, free some.
if (availableSlots.isEmpty() && !tasks.isEmpty()) {
     for (Integer task : specialSupervisor.getAllPorts()) {
         cluster.freeSlot( new WorkerSlot(specialSupervisor.getId(), task));
     }
}

最后一步:分配

到这里为止呢,我们要分配的tasks已经有了,要分配到的slot也搞定了,剩下的就分配下就好了(注意,这里因为为了保持例子简单,代码做了简化)

帮助
 
// re-get the aviableSlots
availableSlots = cluster.getAvailableSlots(specialSupervisor);
  
// since it is just a demo, to keep things simple, we assign all the
// tasks into one slot.
cluster.assign(availableSlots.get( 0 ), topology.getId(), tasks);

我们的目标实现了! 随着cluster.assign的调用,我们已经把我们的special-spout分配到special-supervisor上去了。不难吧

别的任务谁来分配?

不过有件事情别忘了,我们只给special-spout分配了task, 别的task谁来分配啊?你可能会说我不关心啊,没关系,把这个交给系统默认的分配器吧:我们已经把系统的默认分配器包装到backtype.storm.scheduler.EvenScheduler里面去了,所以你简单调用下就好了

帮助
 
new backtype.storm.scheduler.EvenScheduler().schedule(topologies, cluster);

让Storm知道我们的Scheduler

哦,有一件事情忘记说了,我们完成了我们的自定义Scheduler,怎么让storm知道并且使用我们的Scheduler呢?两件事情:

  • 把包含这个Scheduler的jar包放到$STORM_HOME/lib下面去
  • 在storm.yaml 里面作如下配置:
    帮助
     
    storm.scheduler: "storm.DemoScheduler"

这样Storm在做任务分配的时候就会用你的storm.DemoScheduler, 而不会使用默认的系统Scheduler

这个例子的完整代码可以在这里看到。

转载于:https://my.oschina.net/u/2326085/blog/391240

Strom 可定制任务调度策略(Pluggable Scheduler)相关推荐

  1. The evolution of cluster scheduler architectures--转

    原文地址:http://www.firmament.io/blog/scheduler-architectures.html cluster schedulers are an important c ...

  2. 快手超大规模集群调度优化实践

    导读:随着公司业务的快速发展,离线计算集群规模和提交的作业量持续增长,如何支撑超大规模集群,如何满足不同场景的调度需求成为必须要解决的问题.基于以上问题,快手大数据团队基于YARN做了大量的定制和优化 ...

  3. 面经总结(大数据开发相关)

    数据仓库 综合 1. OneData方法论的标准: 3. 缓慢变化维(SCD)常见的解决方案 重写维度值 在维度表中, 仅需以当前值重写先前存在的值, 不需要触碰事实表 缺点: 如果业务需要准确的跟踪 ...

  4. 几种任务调度的 Java 实现方法与比较

    综观目前的 Web 应用,多数应用都具备任务调度的功能.本文由浅入深介绍了几种任务调度的 Java 实现方法,包括 Timer,Scheduler, Quartz 以及 JCron Tab,并对其优缺 ...

  5. 几种任务调度的 Java 实现方法与比较--转载

    前言 任务调度是指基于给定时间点,给定时间间隔或者给定执行次数自动执行任务.本文由浅入深介绍四种任务调度的 Java 实现: Timer ScheduledExecutor 开源工具包 Quartz ...

  6. java调度:(四) spring中使用quartz的配置文件.

    quartz主要是三个部分:Scheduler Job Trigger,其中,Job 负责定义需要执行的任务,Trigger 负责设置调度策略,Scheduler 将二者组装在一起,并触发任务开始执行 ...

  7. java调度:(一)几种任务调度的 Java 实现方法与比较

    转载自:http://www.oschina.net/question/129540_28053 综观目前的 Web 应用,多数应用都具备任务调度的功能.本文由浅入深介绍了几种任务调度的 Java 实 ...

  8. 专访腾讯产品总监邬沛君:TStack斩获OSCAR技术创新奖的背后

    导读:作为中国云计算开源领域最专业.最高端.最具规模的行业盛会,2018云计算开源产业大会(全球云计算开源大会)由工业和信息化部指导,中国信息通信研究院主办.云计算开源产业联盟承办.大会于2018年3 ...

  9. Openstack面试题和知识点总结

    文章目录 知识点 云计算 起源 定义 特点 分类 服务类型 平台分类 应用 虚拟化 虚拟化技术 定义 分类 云计算和虚拟化的关系 虚拟化的优点 OpenStack 简介 核心架构 Openstack组 ...

  10. springboot整合elasticjob

    任务调度来源 考虑下面的几种场景: 每天凌晨1点,需要对系统的订单表数据根据SKU的维度进行汇总并生成报表 每隔半个小时,需要将数据库中的那些超时未支付的订单数据进行统计并归档 银行系统需在信用卡到期 ...

最新文章

  1. Several ports (8005, 80, 8009) required by Tomcat v6.0 Server at localhost are already in use
  2. UE 手游在 iOS 平台运行时内存占用太高?试试这样着手优化
  3. 直播预告:与九位专家聊聊Codec和AOM Summit
  4. 使用Bean验证扩展PrimeFaces CSV
  5. 【Android】Listview返回顶部,快速返回顶部的功能实现,详解代码。
  6. Redis的持久化 RDB AOF
  7. 组态软件哪个好_组态软件推荐
  8. 玩转ansys——悬臂梁质量块的实体建模与仿真
  9. 把图片放大清晰度不变的方法
  10. 日志:实现微信公众号自动问答机器人(待整理)
  11. android 投屏 版本号,Android开源投屏软件——Scrcpy
  12. 实例讲解EasyLanguage入门
  13. lol人物模型提取(二)
  14. linux 的tac命令,每天学一个 Linux 命令(102):tac
  15. python 32bit? 64bit?
  16. 驱动程序和触摸屏的下载
  17. mysql如何查看事务日记_Mysql事务和Mysql 日志
  18. 6种上市公司数据的采集方法和工具
  19. C++入门——神奇的引用
  20. 发顶会论文,怎么就那么难?10个带你一起“收割”顶会论文的...

热门文章

  1. 使用POI完成 EXCEL的 导出和导入
  2. iPhone为何优越过 Android呢
  3. Microsoft SQL Server 2000 索引碎片整理最佳实践(上)
  4. End Game----OO最后一次博客作业
  5. Jmeter中主要管理器功用
  6. QuickWebApi2:使用Lambda方式,完成对WebApi的开发和调用-文档的生成
  7. IO编程——转自廖雪峰博客
  8. video上传架构设计与实现
  9. mac下chrome浏览器设置ajax跨域调试
  10. 2018-11-18站立会议内容