TridentTopology创建过程详解

从用户层面来看TridentTopology,有两个重要的概念一是Stream,另一个是作用于Stream上的各种Operation。在实现层面来看,无论是stream,还是后续的operation都会转变成为各个Node,这些Node之间的关系通过重要的数据结构来维护具体到TridentTopology,实现图的各种操作的组件是jgrapht。

说到图,两个基本的概念会闪现出来,一是结点,二是描述结点之间关系的边。要想很好的理解TridentTopology就需要紧盯图中结点和边的变化。

TridentTopology在转换成为普通的StormTopology时,需要将原始的图分成各个group,每个group将运行于一个独立的bolt中。TridentTopology又是如何知道哪些node应该在同一个group,哪些应该处在另一个group中的呢;如何来确定每个group的并发度(parallismHint)的呢。这些问题的解决都与jgrapht分不开。

关于jgrapht的更多信息,请参考其官方网站 http://jgrapht.org

概要

在TridentTopology中向图中添加结点的api有三种:

  1. addNode
  2. addSourcedNode
  3. addSourcedStateNode

其中addNode在创建stream是使用,addSourcedStateNode在partitionPersist时使用到,其它的operation使用到的是addSourcedNode.

addNode与其它两个方法的一个重要区别还在于,addNode是不需要添加边(Edge),而其它两个API需要往图中添加edge,以确定该node的源是哪个。

TridentTopology

1
2
3
4
public TridentTopology() {
        _graph = new DefaultDirectedGraph(new ErrorEdgeFactory());
        _gen = new UniqueIdGen();
    }

在TridentTopology的构造函数中,创建了DAG(有向无环图)。利用这个_graph来作为容器以存储后续过程中创建的各个node及它们之间的关系。

newStream

newStream会为DAG(有向无环图)中创建源结点,其调用关系如下所示。

  • newStream

    • addNode

      • registerNode
 1 protected void registerNode(Node n) {
 2         _graph.addVertex(n);
 3         if(n.stateInfo!=null) {
 4             String id = n.stateInfo.id;
 5             if(!_colocate.containsKey(id)) {
 6                 _colocate.put(id, new ArrayList());
 7             }
 8             _colocate.get(id).add(n);
 9         }
10     }

each

作用于stream上的Operation有很多,以each为例来看新的operation是如何转换成为node添加到_graph中的。

//Stream.javapublic Stream each(Fields inputFields, Function function, Fields functionFields) {projectionValidation(inputFields);return _topology.addSourcedNode(this,new ProcessorNode(_topology.getUniqueStreamId(),_name,TridentUtils.fieldsConcat(getOutputFields(), functionFields),functionFields,new EachProcessor(inputFields, function)));}

调用关系描述如下

  • Stream::each
  • TridentTopology::addSourcedNode
  • TridentTopology::registerSourcedNode

registerSourcedNode的实现如下

protected void registerSourcedNode(List<Stream> sources, Node newNode) {registerNode(newNode);int streamIndex = 0;for(Stream s: sources) {_graph.addEdge(s._node, newNode, new IndexedEdge(s._node, newNode, streamIndex));streamIndex++;}        }

注意此处添加edge是,是有索引的,这样可以区别处理的先后顺序。

在Stream中含有成员变量_node,表示stream最近停泊的node,有了该变量添加edge才成为了可能。

partitionPersist

public TridentState partitionPersist(StateSpec stateSpec, Fields inputFields, StateUpdater updater, Fields functionFields) {projectionValidation(inputFields);String id = _topology.getUniqueStateId();ProcessorNode n = new ProcessorNode(_topology.getUniqueStreamId(),_name,functionFields,functionFields,new PartitionPersistProcessor(id, inputFields, updater));n.committer = true;n.stateInfo = new NodeStateInfo(id, stateSpec);return _topology.addSourcedStateNode(this, n);}

调用关系

  • Stream::partitionPersist
  • TridentTopology::addSourcedStateNode
  • TridentTopology::registerSourcedNode

与addNode及addSourcedNode不同的是,addSourcedStateNode返回的是TridentState而非Stream

既然谈到了TridentState就不得不谈到其另一面Stream::stateQuery,

public Stream stateQuery(TridentState state, Fields inputFields, QueryFunction function, Fields functionFields) {projectionValidation(inputFields);String stateId = state._node.stateInfo.id;Node n = new ProcessorNode(_topology.getUniqueStreamId(),_name,TridentUtils.fieldsConcat(getOutputFields(), functionFields),functionFields,new StateQueryProcessor(stateId, inputFields, function));_topology._colocate.get(stateId).add(n);return _topology.addSourcedNode(this, n);}

从此处可以看出stateQueryNode最起码有两个inputStream,一是从TridentState而来表示状态已经改变,另一个是处于drpcStream这个方面的上一跳结点。

build

TridentTopology::build是将TridentTopology转变为StormTopology的过程,这一过程中最重要的一环就是将_graph中含有的node进行分组。

grouping

算法逻辑概述

  • 将boltNodes中的每个boltNode作为一个group加入全部加入initialGroups
  • 以graph和initialGroups作为入参创建GraphGrouper
  • 分组的过程其实就是进行合并的过程,详见GraphGrouper::mergeFully()
    • 如果从当前group1的输出目的地都是属于group2,则将group1,group2合并
    • 如果当前group1的所有输入源都是来自于group2,则将group1,group2合并
    • 将需要合并的group1,group2作为入参创建新的group,同时将group1,group2从已有的集合出移除
   public void mergeFully() {boolean somethingHappened = true;while(somethingHappened) {somethingHappened = false;for(Group g: currGroups) {Collection<Group> outgoingGroups = outgoingGroups(g);if(outgoingGroups.size()==1) {Group out = outgoingGroups.iterator().next();if(out!=null) {merge(g, out);somethingHappened = true;break;}}Collection<Group> incomingGroups = incomingGroups(g);if(incomingGroups.size()==1) {Group in = incomingGroups.iterator().next();if(in!=null) {merge(g, in);somethingHappened = true;break;}}                }}}

GraphGrouper::merge()

  private void merge(Group g1, Group g2) {Group newGroup = new Group(g1, g2);currGroups.remove(g1);currGroups.remove(g2);currGroups.add(newGroup);for(Node n: newGroup.nodes) {groupIndex.put(n, newGroup);}}

在group之间添加partitionNode

// add identity partitions between groupsfor(IndexedEdge<Node> e: new HashSet<IndexedEdge>(graph.edgeSet())) {if(!(e.source instanceof PartitionNode) && !(e.target instanceof PartitionNode)) {                Group g1 = grouper.nodeGroup(e.source);Group g2 = grouper.nodeGroup(e.target);// g1 being null means the source is a spout nodeif(g1==null && !(e.source instanceof SpoutNode))throw new RuntimeException("Planner exception: Null source group must indicate a spout node at this phase of planning");if(g1==null || !g1.equals(g2)) {graph.removeEdge(e);PartitionNode pNode = makeIdentityPartition(e.source);graph.addVertex(pNode);graph.addEdge(e.source, pNode, new IndexedEdge(e.source, pNode, 0));graph.addEdge(pNode, e.target, new IndexedEdge(pNode, e.target, e.index));                    }}}

_graph中所有的node在变换过后,变成两组元素,一是spoutNodes,另一个是合并后的mergedGroup.

spoutNodes中的每个元素作为spout添加到TridentTopologyBuilder的_spouts数组中,mergedGroup中的每个group添加到TridentTopologyBuilder的_bolt数组中。在TridentTopologyBuilder::build()中最主要的事情是为每个_spouts和_bolts数组中的成员添加grouping关系。

小结

到目前为止,通过两篇文章分析了TridentTopology的创建过程及其运行时在每个TridentBoltExecutor中的消息传递情况。接下来会分析TridentTopology提供的API实现及其作用场景。

twitter storm源码走读(五)相关推荐

  1. twitter storm源码走读之2 -- tuple消息发送场景分析

    欢迎转载,转载请注明出处源自徽沪一郎.本文尝试分析tuple发送时的具体细节,本博的另一篇文章<bolt消息传递路径之源码解读>主要从消息接收方面来阐述问题,两篇文章互为补充. worke ...

  2. twitter storm源码走读(二)

    topology提交过程分析 概要 storm cluster可以想像成为一个工厂,nimbus主要负责从外部接收订单和任务分配.除了从外部接单,nimbus还要将这些外部订单转换成为内部工作分配,这 ...

  3. twitter storm源码走读之1 -- nimbus启动场景分析

    欢迎转载,转载时请注明作者徽沪一郎及出处,谢谢. 本文详细介绍了twitter storm中的nimbus节点的启动场景,分析nimbus是如何一步步实现定义于storm.thrift中的servic ...

  4. Apache Storm源码阅读笔记

    欢迎转载,转载请注明出处. 楔子 自从建了Spark交流的QQ群之后,热情加入的同学不少,大家不仅对Spark很热衷对于Storm也是充满好奇.大家都提到一个问题就是有关storm内部实现机理的资料比 ...

  5. Webrtc从理论到实践七: 官方demo源码走读(peerconnection_server)

    系列文章目录 Webrtc从理论到实践一:初识 Webrtc从理论到实践二: 架构 Webrtc从理论到实践三: 角色 Webrtc从理论到实践四: 通信 Webrtc从理论到实践五: 编译webrt ...

  6. 【原】storm源码之一个class解决nimbus单点问题

    [原]storm源码之一个class解决nimbus单点问题 参考文章: (1)[原]storm源码之一个class解决nimbus单点问题 (2)https://www.cnblogs.com/yu ...

  7. JStorm与Storm源码分析(八)--计时器工具-mk-timer

    Storm使用计时器线程来处理一些周期性调度事件. 与计时器相关的操作主要有:创建计时器线程.查看线程是否活跃.向线程中加入新的待调度事件.取消计时器线程 mk-timer方法用于创建一个计时器线程. ...

  8. JStorm与Storm源码分析(七)--BasicBoltExecutor与装饰模式

    在Storm中IBasicBolt的主要作用是为用户提供一种更为简单的Bolt编写方式,更为简单体现在Storm框架本身帮你处理了所发出消息的Ack.Fail和Anchor操作,而这部分操作是由执行器 ...

  9. JStorm与Storm源码分析(六)--收集器 IOutputCollector 、OutputCollector

    在Storm中,多个地方使用了IOutputCollector收集器接口,收集器OutputCollector的接口就是IOutputCollector.所以有必要对接口IOutputCollecto ...

最新文章

  1. 区块链行业报告|从交易流程到Token经济的全方位解析
  2. 关于.h .lib .dll的总结
  3. c++ vector clear()清除容器中所有数据
  4. 【杂谈】2020年如何长期、系统,全面地学习深度学习和计算机视觉,这是有三AI的完整计划...
  5. SpiderData 2019年2月22日 DApp数据排行榜
  6. 发布Disruptor 3.0.0
  7. 什么是机器人的五点校正法_机器人校正方法
  8. SemVLP 单流和双流Transformer哪个好?阿里:我全都要!提出带可插拔模块的Transformer结构...
  9. SimVLM:拒绝各种花里胡哨!CMUGoogle提出弱监督极简VLP模型,在多个多模态任务上性能SOTA...
  10. 从未在一起更让人遗憾_从未在一起和最终没有在一起,哪个更遗憾?
  11. Mybatis: 接口编程的实现
  12. 触摸传感器的电路图符号_如何看懂汽车电路常用图形符号,看完这篇文章就懂了...
  13. USB 协议分析(含基本协议和 USB 请求和设备枚举)
  14. python实现r树存储地理位置_空间索引R树研究_回顾与展望_张明波
  15. Shader入门精要-1-渲染流水线数学基础
  16. Camera 图像处理原理分析- 色彩篇 二
  17. 游戏资讯平台APP项目计划书
  18. 如何快速设计一款智能窗帘开关产品?APP即可控制窗帘动态
  19. Bitlocker加密到一半怎么停止?
  20. 抖音一个好的标题让你轻松上热门,该怎么写好抖音标题。

热门文章

  1. 笔记本移交_创建完美的设计移交
  2. 10 个你可能还不知道 VS Code 使用技巧
  3. java spring cloud版b2b2c社交电商spring cloud分布式微服务:服务注册与发现(Eureka、Consul)...
  4. Kernel Newbies内核开发新手的资源
  5. 转:shell awk
  6. OpenStack 计算节点删除
  7. Socket编程:之双机通信
  8. EIGRP个人学习笔记
  9. 【转】服务器维护工程师悲惨的一个星期
  10. 在VHD文件上安装Windows 7或Windows 2008 R2