storm的消息格式分析
storm的消息格式分析
@(STORM)[storm]
- storm的消息格式分析
- 一ITuple接口
- 二core-storm的消息格式
- 三trident的消息格式
- 一trident中tuple的基本用法调用全流程
- 1用户代码
- 2TridentTupleView的源码
- 二trident的tuple基本架构
- 1主要涉及的个类
- 2数据定位基本流程
- 3其它说明
- 三TridentTuple接口
- 四TridentTupleView
- 1成员变量
- 2主要方法
- 3内部类
- 4ProjectionFactory
- 5FreshOutputFactory
- 6OperationOutputFactory
- 7RootFactory
- 五ValuePointer
- 1成员变量构造函数
- 22个方法
- 一trident中tuple的基本用法调用全流程
Tuple是storm中的消息,本文从用户调用到最终源码介绍了一个tuple的获取与创建过程。
一、ITuple接口
storm的消息称为一个Tuple,所有的消息格式都必须实现Ituple接口。它主要定义了如何获取消息中的内容的各种方法,它的完整定义如下:
public interface ITuple {public int size();public boolean contains(String field);public Fields getFields();public int fieldIndex(String field);public List<Object> select(Fields selector);public Object getValue(int i);public String getString(int i);public Integer getInteger(int i);public Long getLong(int i);public Boolean getBoolean(int i);public Short getShort(int i);public Byte getByte(int i);public Double getDouble(int i);public Float getFloat(int i);public byte[] getBinary(int i);public Object getValueByField(String field);public String getStringByField(String field);public Integer getIntegerByField(String field);public Long getLongByField(String field);public Boolean getBooleanByField(String field);public Short getShortByField(String field);public Byte getByteByField(String field);public Double getDoubleByField(String field);public Float getFloatByField(String field);public byte[] getBinaryByField(String field);public List<Object> getValues();
}
ITuple接口的实现类主要有2个:
* Tuple:用于core-storm
* TridentTuple:用于trident
下面2部分分别介绍这2种实现。
二、core-storm的消息格式
三、trident的消息格式
在Trident中,一个Bolt节点中可能含有多个操作,各个操作之间需要进行消息传递。通常,操作或者产生新的域或者对原来的域进行过滤,若每次对输入的消息进行复制,则效率不高。
Trident利用TridentTupleView对象对消息进行封装。例如,新产生的消息由2部分组成,一部分来自输入,另一部分则由计算得到。TridentTupleView并不会创建一个新的消息,而是将这2部分合并,通过更新内部索引使得从外部看到如同一个消息一样。这样便节省了消息的拷贝和新对象创建等方面的负担,从而提高了效率。
(一)trident中tuple的基本用法&调用全流程
1、用户代码
String sentence = tuple.getString(0);
String sentence = tuple.getStringByField("sentence");
用户可以通过上面2种方式中的一种来取得TridentTuple中的某一个field的值。
下面我们以第1种方法为例继续分析。
2、TridentTupleView的源码
(1)其实就是调用getValue(i)方法,只是做了个类型转换。
@Override
public String getString(int i) {return (String) getValue(i);
}
//取得ValuePointer对象,然后调用getValueByPointer()方法
@Override
public Object getValue(int i) {return getValueByPointer(_index[i]);
}
//真正通过索引来找到数据的方法。
private Object getValueByPointer(ValuePointer ptr) {return ((List<Object>)_delegates.nth(ptr.delegateIndex)).get(ptr.index);
}
(二)trident的tuple基本架构
1、主要涉及的个类
- TridentTuple接口:继承自ITuple接口及List接口。
- TridentTupleView类:实现了TridentTuple类,并继承自AbstractList。
- ValuePointer:TridentTupleView的索引数据结构,用于指定哪个位置或者哪个field对应哪个实际的数据(IPersistentVector数据)。
- IPersistentVector:实际的数据对象。注意这是一个集合,位于clojure自带的clojure.lang.PersistentVector包中。
2、数据定位基本流程
(1)用户代码通过get(i)
或者getValueByField("fieldName")
来取得TridentTupleView中的数据。如果确定类型的话,还可以使用getString(i), getStringByField(“fieldName”)等方法。
(2)对于前者,从ValuePointer[] _index
来取得ValuePointer对象。对于后者,通过Map<String, ValuePointer> _fieldIndex
来取得ValuePointer对象。
(3)ValuePointer就是实际数据的索引,它先根据public int delegateIndex
取得这是在_哪个IPersistentVector对象中,然后通过protected int index
,或者protected String field
来定位到IPersistentVector中的某个元素,这就是实际的数据。
3、其它说明
(1)上面的流程简单的说就是就是通过delegagteIndex,以及index/field来定位到一个ValuePointer对象,而ValuePointer对象是实际数据的索引,所以可以通过这个索引找到实际的数据。
(2)一个TridentTupleView可能有多个IPersistentVector对象,而每个IPersistentVector对象有多个元素,每个元素对应一个field的实际数据。
一个TridentTupleView只有一个_delegates的,但它包括多个delegate,可以通过_delegate.nth(i)来定位。ValuePointer中的_delegateIndex就是作这个使用的。其具体的数据结构由clojure来实现,就不细说了。
(三)TridentTuple接口
public interface TridentTuple extends ITuple, List<Object> {public static interface Factory extends Serializable {Map<String, ValuePointer> getFieldIndex();List<String> getOutputFields();int numDelegates();}
}
它有一个内部接口,内部接口的三个方法分别表示:
* field和ValuePointer的映射关系
* 所有的输出filed组成的List
* 有多少个IPersistentVector对象。注意一个TridentTupleView可能有多个IPersistentVector对象。
(四)TridentTupleView
TridentTupleView主要定义了(1)如何构建一个消息(2)如何读取一个消息内的具体内容。
1、成员变量
ValuePointer[] _index;
Map<String, ValuePointer> _fieldIndex;
IPersistentVector _delegates;
每个TridentTupleView都会保存着索引信息ValuePointer,以及实现的数据 IPersistentVector。其它方法主要就是通过ValuePointer的索引信息如何在IPersistentVector中找到实际的数据。
其中_delegates可以理解为有多个delegate,然后通过nth(i)的方法定位到具体的一个,这具体的一个delegate本身也是一个集合。这是clojure.lang包自带的类,不分析其实现了。
2、主要方法
正如上面据说的,TridentTupleView中的方法主要用于获取TridentTupleView中的数据。比如:
@Overridepublic Integer getInteger(int i) {return (Integer) getValue(i);}
不管是使用哪种方法获取消息内容,最终都是调用这个方法:
private Object getValueByPointer(ValuePointer ptr) {return ((List<Object>)_delegates.nth(ptr.delegateIndex)).get(ptr.index);
}
即使用一个VauluePorint对象作索引查找PersistenceVector中的实际数据。先在_delegates内部找定位到一个具体的delegate,然后再定位到具体的元素。这里使用的nth()方法大致意思就是在_delegates中定位到一个delegate,其实现未查看clojure代码。
3、内部类
TridentTupleView有4个内部类,它们均是实现了Factory接口,用于创建Trident消息。这些Factory子类的create()方法会被spout/bolt的各种操作调用来创建一个TridentTuple,我们以后再介绍是谁在调用这些方法。目前我们只需要知道:
* ProjectionFactory:它不会创建一个新的消息,而只是保留parent的部分字段(由projectFields定义)
* FreshOutputFactory:根据输入的字段名和值来产生一个新的消息
* OperationOutputFactory:通过创建selfFields创建一个新的_delegate,然后与parent一起组成一个新的TridentTupleView。因此它的_delegates数量会+1.
* RootFactory:为操作的入口工厂,对输入的消息起适配作用
4、ProjectionFactory
ProjectionFactory根据输入的parent以及projectFields重新构建一下TridentTupleView,它不会创建一个新的消息,而只是保留parent的部分字段(由projectFields定义)。
public ProjectionFactory(Factory parent, Fields projectFields) {_parent = parent;if(projectFields==null) projectFields = new Fields();Map<String, ValuePointer> parentFieldIndex = parent.getFieldIndex();_fieldIndex = new HashMap<>();for(String f: projectFields) {_fieldIndex.put(f, parentFieldIndex.get(f));} _index = ValuePointer.buildIndex(projectFields, _fieldIndex);}public TridentTuple create(TridentTuple parent) {if(_index.length==0) return EMPTY_TUPLE;else return new TridentTupleView(((TridentTupleView)parent)._delegates, _index, _fieldIndex);}
它返回的delegate的数量和parent相同,因此再次证明它不会产生新的delegate:
@Overridepublic int numDelegates() {return _parent.numDelegates();}
5、FreshOutputFactory
FreshOutputFactory根据输入的字段名和值来产生一个新的消息。
public FreshOutputFactory(Fields selfFields) {_fieldIndex = new HashMap<>();for(int i=0; i<selfFields.size(); i++) {String field = selfFields.get(i);_fieldIndex.put(field, new ValuePointer(0, i, field));}_index = ValuePointer.buildIndex(selfFields, _fieldIndex);}public TridentTuple create(List<Object> selfVals) {return new TridentTupleView(PersistentVector.EMPTY.cons(selfVals), _index, _fieldIndex);}
(1)先是在构建函数中通过field的值构建一个ValuePointer对象ValuePointer(0, i, field)
,也就是说这是第0个_delegate的第i个field,field的名称是field。
(2)然后通过调用create()方法来创建一个PersistentVector对象,并与ValuePointer一起创建一个TridentTupleView。
FreshOutputFactory返回的_delegate对象永远是1:
@Overridepublic int numDelegates() {return 1;}
6、OperationOutputFactory
OperationOutputFactory通过创建selfFields创建一个新的_delegate,然后与parent一起组成一个新的TridentTupleView。因此它的_delegates数量会+1.
这里也证明了一个TridentTupleView会有多个_delegate的。这里指的多个是numDelegates()返回的数量,而不是指多个IPersistentVector对象。事实上每个TridentTupleView只有一个IPersistentVector对象。
public OperationOutputFactory(Factory parent, Fields selfFields) {_parent = parent;_fieldIndex = new HashMap<>(parent.getFieldIndex());int myIndex = parent.numDelegates();for(int i=0; i<selfFields.size(); i++) {String field = selfFields.get(i);_fieldIndex.put(field, new ValuePointer(myIndex, i, field));}List<String> myOrder = new ArrayList<>(parent.getOutputFields());Set<String> parentFieldsSet = new HashSet<>(myOrder);for(String f: selfFields) {if(parentFieldsSet.contains(f)) {throw new IllegalArgumentException("Additive operations cannot add fields with same name as already exists. "+ "Tried adding " + selfFields + " to " + parent.getOutputFields());}myOrder.add(f);}_index = ValuePointer.buildIndex(new Fields(myOrder), _fieldIndex);}public TridentTuple create(TridentTupleView parent, List<Object> selfVals) {IPersistentVector curr = parent._delegates;curr = (IPersistentVector) RT.conj(curr, selfVals);return new TridentTupleView(curr, _index, _fieldIndex);}
再确认一下_delegate的数量:
@Overridepublic int numDelegates() {return _parent.numDelegates() + 1;}
没错,就是parent的数量再加1.
7、RootFactory
RootFactory为操作的入口工厂,对输入的消息起适配作用。它会根据输入消息产生一个TridentTupleView类型的消息,这个产生的消息可以被其他工作方法使用。
(五)ValuePointer
1、成员变量&构造函数
public int delegateIndex;
protected int index;
protected String field;public ValuePointer(int delegateIndex, int index, String field) {this.delegateIndex = delegateIndex;this.index = index;this.field = field;
}
ValuePointer有3个成员变量,分别表示:
(1)delegateIndex表示TridentTupleView中的哪个IPersistentVector对象。正如前面据说的,TridentTupleView可能有多个IPersistentVector对象。
(2)index表示IPersistentVector这个集合中的哪个元素。
(3)field表示这个field的名称。
因此,通过ValuePointer可以定位到哪个IPersistentVector对象,然后是IPersistentVector对象的哪个元素,以及这个元素对应的field的名称是什么
2、2个方法
这2个方法主要用于ValuePointer[]与Map
storm的消息格式分析相关推荐
- CORBA GIOP消息格式学习
想要深入理解ORB的工作过程与原理,学习与了解GIOP消息格式必不可少.我们知道GIOP是独立于具体通信的更高级别的抽象,因此这里针对GIOP在TCP/IP上的实现IIOP协议进行学习与分析(IIOP ...
- HTTP/1.1(消息格式、连接管理、条件请求、范围请求、缓存、身份验证)
文章目录 概述 消息格式 开始行 请求方法 请求目标 状态码 头部字段 消息体 连接管理 条件请求(Conditional Requests) 范围请求(Range Requests) 缓存(Cach ...
- NOKIA、MOTOROLA、SIMENS及ERICCSON铃声和图片格式分析(转)
NOKIA.MOTOROLA.SIMENS及ERICCSON铃声和图片格式分析 最近一直有些朋友们要求总结一下主流手机铃声和图片的问题,由于本人在这方面基本没有什么尝试,只能将有些朋友们发表过的东西总 ...
- 一文看懂Kafka消息格式的演变
欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...
- 【Android 安装包优化】资源混淆 ( resources.arsc 资源映射表混淆 | resources.arsc 资源映射表二进制格式分析 | 混淆全局字符串池和资源名称字符串池 )
文章目录 一.resources.arsc 资源映射表 混淆 二.resources.arsc 资源映射表二进制格式分析 三.参考资料 资源混淆时 , 需要修改混淆 resources.arsc 资源 ...
- 【Android RTMP】RTMP 数据格式 ( FLV 视频格式分析 | 文件头 Header 分析 | 标签 Tag 分析 | 视频标签 Tag 数据分析 )
文章目录 安卓直播推流专栏博客总结 一. RTMP 格式解析 二. 文件头 Header 分析 三. 标签 Tag 分析 四. 视频标签 Tag 数据分析 安卓直播推流专栏博客总结 Android R ...
- Kafka的消息格式
Commit Log Kafka储存消息的文件被它叫做log,按照Kafka文档的说法是: Each partition is an ordered, immutable sequence of me ...
- Apache Kafka消息格式的演变(0.7.x~0.10.x)
用 Kafka 这么久,从来都没去了解 Kafka 消息的格式.今天特意去网上搜索了以下,发现这方面的资料真少,很多资料都是官方文档的翻译:而且 Kafka 消息支持压缩,对于压缩消息的格式的介绍更少 ...
- ack是什么,如何使用Ack机制,如何关闭Ack机制,基本实现,STORM的消息容错机制,Ack机制
1.ack是什么 ack 机制是storm整个技术体系中非常闪亮的一个创新点. 通过Ack机制,spout发送出去的每一条消息,都可以确定是被成功处理或失败处理, 从而可以让开发者采取动作.比如在Me ...
最新文章
- Global.asax详解
- ACMNO.2 输入一个华氏温度,要求输出摄氏温度。公式为 c=5(F-32)/9 输出要求有文字说明,取位2小数。 输入 一个华氏温度,浮点数 输出 摄氏温度,浮点两位小数
- [Swift]LeetCode388. 文件的最长绝对路径 | Longest Absolute File Path
- 预处理命令[#define]说明
- 关于视频光端机调制方式及介质特点的介绍
- PyTorch的Tensor(张量)
- 语音识别相关书籍抖音十大先看哪一本最好
- 11月3日 迅雷白金会员vip账号分享 91freevip 23:00更新
- 什么是 Hadoop 生态系统
- ibm system x服务器重装系统,IBM X346服务器重装系统_xSeries 346阵列配置
- docker中安装Nexus3
- oracle中imp命令详解
- Iphone连接Openwrt的IPSEC服务器
- 绝密计划:我在阿里打黑工
- linux定时清理临时目录,tmp临时文件目录自动清理
- SpringBoot项目目录结构(工程结构)
- CartoonShader
- 中秋国庆旅游 App 市场竞争激烈!工具类 App 更易被苹果推荐!
- 解决:Tomcat启动提示At least one JAR was scanned for TLDs yet contained no TLDs
- 复变函数与积分变换(五)学习笔记[孤立奇点,留数,零点与奇点,无穷远点的留数,留数计算的应用]
热门文章
- oracle主从表分离怎么实时更新数据_高可用数据库主从复制延时的解决方案
- python跳转到程序顶部_python-如何使Tkinter窗口跳到最前面?
- html开启页面离线缓存,HTML5 离线缓存
- 学校不用考直接过计算机一级,全国计算机等级考试1级是不是必须要考的啊
- 字典统计排序python123_按值对字典进行排序Python(3级Dict)
- PHP 运动会,运动会成绩管理系统
- linux笔记之 rpm常用参数 ,yum安装编译器,httpd服务的开关
- 液晶弹性自由能计算_自由能方法应用(一)开放计算平台BRIDGE的介绍及使用案例...
- nslookup type值_nslookup查询
- linux软件包管理工具,Linux 软件包管理器-----yum配置详解一