storm的消息格式分析

@(STORM)[storm]

  • storm的消息格式分析
  • 一ITuple接口
  • 二core-storm的消息格式
  • 三trident的消息格式
    • 一trident中tuple的基本用法调用全流程

      • 1用户代码
      • 2TridentTupleView的源码
    • 二trident的tuple基本架构
      • 1主要涉及的个类
      • 2数据定位基本流程
      • 3其它说明
    • 三TridentTuple接口
    • 四TridentTupleView
      • 1成员变量
      • 2主要方法
      • 3内部类
      • 4ProjectionFactory
      • 5FreshOutputFactory
      • 6OperationOutputFactory
      • 7RootFactory
    • 五ValuePointer
      • 1成员变量构造函数
      • 22个方法

Tuple是storm中的消息,本文从用户调用到最终源码介绍了一个tuple的获取与创建过程。

一、ITuple接口

storm的消息称为一个Tuple,所有的消息格式都必须实现Ituple接口。它主要定义了如何获取消息中的内容的各种方法,它的完整定义如下:

public interface ITuple {public int size();public boolean contains(String field);public Fields getFields();public int fieldIndex(String field);public List<Object> select(Fields selector);public Object getValue(int i);public String getString(int i);public Integer getInteger(int i);public Long getLong(int i);public Boolean getBoolean(int i);public Short getShort(int i);public Byte getByte(int i);public Double getDouble(int i);public Float getFloat(int i);public byte[] getBinary(int i);public Object getValueByField(String field);public String getStringByField(String field);public Integer getIntegerByField(String field);public Long getLongByField(String field);public Boolean getBooleanByField(String field);public Short getShortByField(String field);public Byte getByteByField(String field);public Double getDoubleByField(String field);public Float getFloatByField(String field);public byte[] getBinaryByField(String field);public List<Object> getValues();
}

ITuple接口的实现类主要有2个:
* Tuple:用于core-storm
* TridentTuple:用于trident

下面2部分分别介绍这2种实现。

二、core-storm的消息格式

三、trident的消息格式

在Trident中,一个Bolt节点中可能含有多个操作,各个操作之间需要进行消息传递。通常,操作或者产生新的域或者对原来的域进行过滤,若每次对输入的消息进行复制,则效率不高。

Trident利用TridentTupleView对象对消息进行封装。例如,新产生的消息由2部分组成,一部分来自输入,另一部分则由计算得到。TridentTupleView并不会创建一个新的消息,而是将这2部分合并,通过更新内部索引使得从外部看到如同一个消息一样。这样便节省了消息的拷贝和新对象创建等方面的负担,从而提高了效率。

(一)trident中tuple的基本用法&调用全流程

1、用户代码

String sentence = tuple.getString(0);
String sentence = tuple.getStringByField("sentence");

用户可以通过上面2种方式中的一种来取得TridentTuple中的某一个field的值。
下面我们以第1种方法为例继续分析。

2、TridentTupleView的源码

(1)其实就是调用getValue(i)方法,只是做了个类型转换。
@Override
public String getString(int i) {return (String) getValue(i);
}
//取得ValuePointer对象,然后调用getValueByPointer()方法
@Override
public Object getValue(int i) {return getValueByPointer(_index[i]);
}
//真正通过索引来找到数据的方法。
private Object getValueByPointer(ValuePointer ptr) {return ((List<Object>)_delegates.nth(ptr.delegateIndex)).get(ptr.index);
}

(二)trident的tuple基本架构

1、主要涉及的个类

  • TridentTuple接口:继承自ITuple接口及List接口。
  • TridentTupleView类:实现了TridentTuple类,并继承自AbstractList。
  • ValuePointer:TridentTupleView的索引数据结构,用于指定哪个位置或者哪个field对应哪个实际的数据(IPersistentVector数据)。
  • IPersistentVector:实际的数据对象。注意这是一个集合,位于clojure自带的clojure.lang.PersistentVector包中。

2、数据定位基本流程

(1)用户代码通过get(i)或者getValueByField("fieldName")来取得TridentTupleView中的数据。如果确定类型的话,还可以使用getString(i), getStringByField(“fieldName”)等方法。
(2)对于前者,从ValuePointer[] _index来取得ValuePointer对象。对于后者,通过Map<String, ValuePointer> _fieldIndex来取得ValuePointer对象。
(3)ValuePointer就是实际数据的索引,它先根据public int delegateIndex取得这是在_哪个IPersistentVector对象中,然后通过protected int index,或者protected String field来定位到IPersistentVector中的某个元素,这就是实际的数据。

3、其它说明

(1)上面的流程简单的说就是就是通过delegagteIndex,以及index/field来定位到一个ValuePointer对象,而ValuePointer对象是实际数据的索引,所以可以通过这个索引找到实际的数据。
(2)一个TridentTupleView可能有多个IPersistentVector对象,而每个IPersistentVector对象有多个元素,每个元素对应一个field的实际数据。

一个TridentTupleView只有一个_delegates的,但它包括多个delegate,可以通过_delegate.nth(i)来定位。ValuePointer中的_delegateIndex就是作这个使用的。其具体的数据结构由clojure来实现,就不细说了。

(三)TridentTuple接口

public interface TridentTuple extends ITuple, List<Object> {public static interface Factory extends Serializable {Map<String, ValuePointer> getFieldIndex();List<String> getOutputFields();int numDelegates();}
}

它有一个内部接口,内部接口的三个方法分别表示:
* field和ValuePointer的映射关系
* 所有的输出filed组成的List
* 有多少个IPersistentVector对象。注意一个TridentTupleView可能有多个IPersistentVector对象。

(四)TridentTupleView

TridentTupleView主要定义了(1)如何构建一个消息(2)如何读取一个消息内的具体内容。

1、成员变量

ValuePointer[] _index;
Map<String, ValuePointer> _fieldIndex;
IPersistentVector _delegates;

每个TridentTupleView都会保存着索引信息ValuePointer,以及实现的数据 IPersistentVector。其它方法主要就是通过ValuePointer的索引信息如何在IPersistentVector中找到实际的数据。

其中_delegates可以理解为有多个delegate,然后通过nth(i)的方法定位到具体的一个,这具体的一个delegate本身也是一个集合。这是clojure.lang包自带的类,不分析其实现了。

2、主要方法

正如上面据说的,TridentTupleView中的方法主要用于获取TridentTupleView中的数据。比如:

@Overridepublic Integer getInteger(int i) {return (Integer) getValue(i);}

不管是使用哪种方法获取消息内容,最终都是调用这个方法:

private Object getValueByPointer(ValuePointer ptr) {return ((List<Object>)_delegates.nth(ptr.delegateIndex)).get(ptr.index);
}

即使用一个VauluePorint对象作索引查找PersistenceVector中的实际数据。先在_delegates内部找定位到一个具体的delegate,然后再定位到具体的元素。这里使用的nth()方法大致意思就是在_delegates中定位到一个delegate,其实现未查看clojure代码。

3、内部类

TridentTupleView有4个内部类,它们均是实现了Factory接口,用于创建Trident消息。这些Factory子类的create()方法会被spout/bolt的各种操作调用来创建一个TridentTuple,我们以后再介绍是谁在调用这些方法。目前我们只需要知道:
* ProjectionFactory:它不会创建一个新的消息,而只是保留parent的部分字段(由projectFields定义)
* FreshOutputFactory:根据输入的字段名和值来产生一个新的消息
* OperationOutputFactory:通过创建selfFields创建一个新的_delegate,然后与parent一起组成一个新的TridentTupleView。因此它的_delegates数量会+1.
* RootFactory:为操作的入口工厂,对输入的消息起适配作用

4、ProjectionFactory

ProjectionFactory根据输入的parent以及projectFields重新构建一下TridentTupleView,它不会创建一个新的消息,而只是保留parent的部分字段(由projectFields定义)。

    public ProjectionFactory(Factory parent, Fields projectFields) {_parent = parent;if(projectFields==null) projectFields = new Fields();Map<String, ValuePointer> parentFieldIndex = parent.getFieldIndex();_fieldIndex = new HashMap<>();for(String f: projectFields) {_fieldIndex.put(f, parentFieldIndex.get(f));}            _index = ValuePointer.buildIndex(projectFields, _fieldIndex);}public TridentTuple create(TridentTuple parent) {if(_index.length==0) return EMPTY_TUPLE;else return new TridentTupleView(((TridentTupleView)parent)._delegates, _index, _fieldIndex);}

它返回的delegate的数量和parent相同,因此再次证明它不会产生新的delegate:

    @Overridepublic int numDelegates() {return _parent.numDelegates();}

5、FreshOutputFactory

FreshOutputFactory根据输入的字段名和值来产生一个新的消息。

    public FreshOutputFactory(Fields selfFields) {_fieldIndex = new HashMap<>();for(int i=0; i<selfFields.size(); i++) {String field = selfFields.get(i);_fieldIndex.put(field, new ValuePointer(0, i, field));}_index = ValuePointer.buildIndex(selfFields, _fieldIndex);}public TridentTuple create(List<Object> selfVals) {return new TridentTupleView(PersistentVector.EMPTY.cons(selfVals), _index, _fieldIndex);}

(1)先是在构建函数中通过field的值构建一个ValuePointer对象ValuePointer(0, i, field),也就是说这是第0个_delegate的第i个field,field的名称是field。
(2)然后通过调用create()方法来创建一个PersistentVector对象,并与ValuePointer一起创建一个TridentTupleView。

FreshOutputFactory返回的_delegate对象永远是1:

    @Overridepublic int numDelegates() {return 1;}

6、OperationOutputFactory

OperationOutputFactory通过创建selfFields创建一个新的_delegate,然后与parent一起组成一个新的TridentTupleView。因此它的_delegates数量会+1.

这里也证明了一个TridentTupleView会有多个_delegate的。这里指的多个是numDelegates()返回的数量,而不是指多个IPersistentVector对象。事实上每个TridentTupleView只有一个IPersistentVector对象。

    public OperationOutputFactory(Factory parent, Fields selfFields) {_parent = parent;_fieldIndex = new HashMap<>(parent.getFieldIndex());int myIndex = parent.numDelegates();for(int i=0; i<selfFields.size(); i++) {String field = selfFields.get(i);_fieldIndex.put(field, new ValuePointer(myIndex, i, field));}List<String> myOrder = new ArrayList<>(parent.getOutputFields());Set<String> parentFieldsSet = new HashSet<>(myOrder);for(String f: selfFields) {if(parentFieldsSet.contains(f)) {throw new IllegalArgumentException("Additive operations cannot add fields with same name as already exists. "+ "Tried adding " + selfFields + " to " + parent.getOutputFields());}myOrder.add(f);}_index = ValuePointer.buildIndex(new Fields(myOrder), _fieldIndex);}public TridentTuple create(TridentTupleView parent, List<Object> selfVals) {IPersistentVector curr = parent._delegates;curr = (IPersistentVector) RT.conj(curr, selfVals);return new TridentTupleView(curr, _index, _fieldIndex);}

再确认一下_delegate的数量:

    @Overridepublic int numDelegates() {return _parent.numDelegates() + 1;}

没错,就是parent的数量再加1.

7、RootFactory

RootFactory为操作的入口工厂,对输入的消息起适配作用。它会根据输入消息产生一个TridentTupleView类型的消息,这个产生的消息可以被其他工作方法使用。

(五)ValuePointer

1、成员变量&构造函数

public int delegateIndex;
protected int index;
protected String field;public ValuePointer(int delegateIndex, int index, String field) {this.delegateIndex = delegateIndex;this.index = index;this.field = field;
}

ValuePointer有3个成员变量,分别表示:
(1)delegateIndex表示TridentTupleView中的哪个IPersistentVector对象。正如前面据说的,TridentTupleView可能有多个IPersistentVector对象。
(2)index表示IPersistentVector这个集合中的哪个元素。
(3)field表示这个field的名称。

因此,通过ValuePointer可以定位到哪个IPersistentVector对象,然后是IPersistentVector对象的哪个元素,以及这个元素对应的field的名称是什么

2、2个方法

这2个方法主要用于ValuePointer[]与Map

storm的消息格式分析相关推荐

  1. CORBA GIOP消息格式学习

    想要深入理解ORB的工作过程与原理,学习与了解GIOP消息格式必不可少.我们知道GIOP是独立于具体通信的更高级别的抽象,因此这里针对GIOP在TCP/IP上的实现IIOP协议进行学习与分析(IIOP ...

  2. HTTP/1.1(消息格式、连接管理、条件请求、范围请求、缓存、身份验证)

    文章目录 概述 消息格式 开始行 请求方法 请求目标 状态码 头部字段 消息体 连接管理 条件请求(Conditional Requests) 范围请求(Range Requests) 缓存(Cach ...

  3. NOKIA、MOTOROLA、SIMENS及ERICCSON铃声和图片格式分析(转)

    NOKIA.MOTOROLA.SIMENS及ERICCSON铃声和图片格式分析 最近一直有些朋友们要求总结一下主流手机铃声和图片的问题,由于本人在这方面基本没有什么尝试,只能将有些朋友们发表过的东西总 ...

  4. 一文看懂Kafka消息格式的演变

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  5. 【Android 安装包优化】资源混淆 ( resources.arsc 资源映射表混淆 | resources.arsc 资源映射表二进制格式分析 | 混淆全局字符串池和资源名称字符串池 )

    文章目录 一.resources.arsc 资源映射表 混淆 二.resources.arsc 资源映射表二进制格式分析 三.参考资料 资源混淆时 , 需要修改混淆 resources.arsc 资源 ...

  6. 【Android RTMP】RTMP 数据格式 ( FLV 视频格式分析 | 文件头 Header 分析 | 标签 Tag 分析 | 视频标签 Tag 数据分析 )

    文章目录 安卓直播推流专栏博客总结 一. RTMP 格式解析 二. 文件头 Header 分析 三. 标签 Tag 分析 四. 视频标签 Tag 数据分析 安卓直播推流专栏博客总结 Android R ...

  7. Kafka的消息格式

    Commit Log Kafka储存消息的文件被它叫做log,按照Kafka文档的说法是: Each partition is an ordered, immutable sequence of me ...

  8. Apache Kafka消息格式的演变(0.7.x~0.10.x)

    用 Kafka 这么久,从来都没去了解 Kafka 消息的格式.今天特意去网上搜索了以下,发现这方面的资料真少,很多资料都是官方文档的翻译:而且 Kafka 消息支持压缩,对于压缩消息的格式的介绍更少 ...

  9. ack是什么,如何使用Ack机制,如何关闭Ack机制,基本实现,STORM的消息容错机制,Ack机制

    1.ack是什么 ack 机制是storm整个技术体系中非常闪亮的一个创新点. 通过Ack机制,spout发送出去的每一条消息,都可以确定是被成功处理或失败处理, 从而可以让开发者采取动作.比如在Me ...

最新文章

  1. Global.asax详解
  2. ACMNO.2 输入一个华氏温度,要求输出摄氏温度。公式为 c=5(F-32)/9 输出要求有文字说明,取位2小数。 输入 一个华氏温度,浮点数 输出 摄氏温度,浮点两位小数
  3. [Swift]LeetCode388. 文件的最长绝对路径 | Longest Absolute File Path
  4. 预处理命令[#define]说明
  5. 关于视频光端机调制方式及介质特点的介绍
  6. PyTorch的Tensor(张量)
  7. 语音识别相关书籍抖音十大先看哪一本最好
  8. 11月3日 迅雷白金会员vip账号分享 91freevip 23:00更新
  9. 什么是 Hadoop 生态系统
  10. ibm system x服务器重装系统,IBM X346服务器重装系统_xSeries 346阵列配置
  11. docker中安装Nexus3
  12. oracle中imp命令详解
  13. Iphone连接Openwrt的IPSEC服务器
  14. 绝密计划:我在阿里打黑工
  15. linux定时清理临时目录,tmp临时文件目录自动清理
  16. SpringBoot项目目录结构(工程结构)
  17. CartoonShader
  18. 中秋国庆旅游 App 市场竞争激烈!工具类 App 更易被苹果推荐!
  19. 解决:Tomcat启动提示At least one JAR was scanned for TLDs yet contained no TLDs
  20. 复变函数与积分变换(五)学习笔记[孤立奇点,留数,零点与奇点,无穷远点的留数,留数计算的应用]

热门文章

  1. oracle主从表分离怎么实时更新数据_高可用数据库主从复制延时的解决方案
  2. python跳转到程序顶部_python-如何使Tkinter窗口跳到最前面?
  3. html开启页面离线缓存,HTML5 离线缓存
  4. 学校不用考直接过计算机一级,全国计算机等级考试1级是不是必须要考的啊
  5. 字典统计排序python123_按值对字典进行排序Python(3级Dict)
  6. PHP 运动会,运动会成绩管理系统
  7. linux笔记之 rpm常用参数 ,yum安装编译器,httpd服务的开关
  8. 液晶弹性自由能计算_自由能方法应用(一)开放计算平台BRIDGE的介绍及使用案例...
  9. nslookup type值_nslookup查询
  10. linux软件包管理工具,Linux 软件包管理器-----yum配置详解一