storm-stream(2)
2121SC@SDUSC
storm-stream(2)
SingleStream
默认情况下:Spout发送到下游Bolt的stream-id,以及Bolt发送到下游Bolt或者接收上游Spout/Bolt的stream-id都是default。
可以对Spout/Bolt在发送消息时自定义stream-id,同时必须在声明输出字段时,指定对应的stream-id。
代码说明:发射时指定一个stream-id,声明流时指定一个stream-id,topology设置Bolt时除了通过Group的component-id,还会指定上游组件的stream-id,代码如下:
class RandomSentenceSpout {
public void nextTuple() {Utils.sleep(1000);String sentence = sentences[rand.nextInt(sentences.length)];System.out.println("\n" + sentence);this.collector.emit("split-stream", new Values(sentence));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declareStream("split-stream", new Fields("sentence"));
}
}class SplitSentenceBolt {
public void execute(Tuple tuple) {String sentence = tuple.getStringByField("sentence");String[] words = sentence.split(" ");for (String word : words) {this.collector.emit("count-stream", new Values(word));}this.collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declareStream("count-stream", new Fields("word"));
}
}class WordCountBolt {
public void execute(Tuple tuple) {String word = tuple.getString(0);Integer count = counts.get(word);if (count == null) count = 0;count++;counts.put(word, count);collector.emit("print-stream", new Values(word, count));}
public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declareStream("print-stream", new Fields("word", "count"));
}}class Topology {
main(){TopologyBuilder builder = new TopologyBuilder();builder.setSpout("spout", new RandomSentenceSpout(), 1);builder.setBolt("split", new SplitSentenceBolt(), 2).shuffleGrouping("spout", "split-stream");builder.setBolt("count", new WordCountBolt(), 2).fieldsGrouping("split", "count-stream", new Fields("word"));builder.setBolt("print", new PrinterBolt(), 1).shuffleGrouping("count", "print-stream");
}
}
MultiStream
Spout/Bolt发射时可以指定多个stream-id,同样要在声明输出字段时指定所有在发射过程指定的stream-id。
虽然每条消息的输出消息流并不一定会用到所有的stream,比如下面示例中一条消息发射到stream1和stream3,
另外一条消息发射到stream2和stream3,stream1和stream2是互斥的,不可能同时发送到这两个stream。
但是可以看到在declareStream中,要同时指定所有的stream-id。
public void execute(Tuple input) {String word = input.getString(0);//小于j的word发送给stream1; 大于j的word发送给stream2;if(word.compareTo("j") < 0){collector.emit("stream1", new Values(word));}else if(word.compareTo("j") > 0){collector.emit("stream2", new Values(word));}//不管什么都发送给stream3collector.emit("stream3", new Values(word));
}
public void declareOutputFields(final OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declareStream("stream1", new Fields("word"));outputFieldsDeclarer.declareStream("stream2", new Fields("word"));outputFieldsDeclarer.declareStream("stream3", new Fields("word"));
}
stream grouping就是用来定义一个stream应该如果分配给Bolts上面的多个Tasks。
storm里面有6种类型的stream grouping:
Shuffle Grouping: 随机分组, 随机派发stream里面的tuple, 保证每个bolt接收到的tuple数目相同。轮询,平均分配,下游每个bolt均衡接收到上游的tuple。
Fields Grouping:按字段分组, 比如按userid来分组, 具有同样userid的tuple会被分到相同的Bolts, 而不同的userid则会被分配到不同的Bolts。
例如,如果流是按“user-id”字段分组的,具有相同“user-id”的元组将总是进入相同的任务,但是具有不同“user-id”的元组可能会进入不同的任务。All Grouping: 广播发送, 对于每一个tuple, 所有的Bolts都会收到。 流在bolt的所有任务中被复制,小心使用这个分组。
Global Grouping: 全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。 整个流指向bolt的一个任务。具体来说,它将转到具有最低id的任务。所有tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。
Non Grouping: 不分组, 这个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是一样的效果,不平均分配。此分组指定不关心流如何分组。目前还没有一个分组等同于shuffle分组。最终,Storm会把没有分组的螺栓按在同一个线程中执行,就像他们订阅的螺栓或喷口一样(如果可能的话)。
Direct Grouping: 直接分组, 这是一种比较特别的分组方法,用这种分组意味着消息的发送者举鼎由消息接收者的哪个task处理这个消息。 只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。
元组的生产者决定消费者的哪个任务将接收这个元组。直接分组只能在已声明为直接流的流上声明。发出到直接流的元组必须使用emitDirect方法之一发出。
用这种分组意味着消息的发送者指定优消息接收者的某个task处理这个消息,只有被声明为DirectStream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的taskid(OutputCollector.emit方法也会返回taskid)。
storm-stream(2)相关推荐
- 流式处理框架storm浅析(下篇)
本文来自网易云社区 作者:汪建伟 举个栗子 1 实现的目标 设计一个系统,来实现对一个文本里面的单词出现的频率进行统计. 2 设计Topology结构: 这是一个简单的例子,topology也非常简单 ...
- 简洁又快速地处理集合——Java8 Stream(下)
上一篇文章我讲解 Stream 流的基本原理,以及它与集合的区别关系,讲了那么多抽象的,本篇文章我们开始实战,讲解流的各个方法以及各种操作 没有看过上篇文章的可以先点击进去学习一下 简洁又快速地处理集 ...
- CUDA编程之:Stream(流)
CUDA Stream(流):指在设备(Device)上按主机(Host)代码发出的顺序执行的一系列异步的CUDA操作.Stream封装这些操作,管理它们的顺序,允许在所有先前操作之后在流中排队执行操 ...
- java8 —— Stream( 流 )
文章目录 一.Stream( 流 )是什么? 二.Stream 的操作三个步骤 三.创建Stream 四.Stream 的中间操作 4.1.筛选与切片 4.2. 映射:(重点) 4.3. 排序: 五. ...
- 苹果上的Http Live Stream(HLS)技术初探
最近在做RTMP转HLS的流媒体服务器项目.需要重新了解Http Live Stream(HLS)技术,于是,总结了一些相关技术细节,如下. 苹果的视频数据流播放技术要求 一如苹果的强 ...
- Storm入门(九)Storm常见模式之流聚合
流聚合(stream join)是指将具有共同元组(tuple)字段的数据流(两个或者多个)聚合形成一个新的数据流的过程. 从定义上看,流聚合和SQL中表的聚合(table join)很像,但是二者有 ...
- Storm入门(一)原理介绍
问题导读: 1.hadoop有master与slave,Storm与之对应的节点是什么? 2.Storm控制节点上面运行一个后台程序被称之为什么? 3.Supervisor的作用是什么? 4.Topo ...
- Node 深入Stream(2)
1. Node.js 中有四种基本的流类型: Readable - 可读的流 (例如 fs.createReadStream()). Writable - 可写的流 (例如 fs.createWrit ...
- Storm入门(七)可靠性机制代码示例
一.关联代码 使用maven,代码如下. pom.xml 参考 http://www.cnblogs.com/hd3013779515/p/6970551.html MessageTopology. ...
- Storm学习(一)---storm的安装及简单介绍
1.何为storm Apache Storm是一个免费的开源分布式实时计算系统.Storm可以轻松可靠地处理无限数据流,实时处理Hadoop为批处理所做的工作.风暴很简单,可以与任何编程语言一起使用, ...
最新文章
- 《深入理解Hadoop(原书第2版)》——1.3大数据的编程模型
- GNS3做交换实验使用感受
- spring boot: GlobalDefaultExceptionHandler方法内的友好错误提示,全局异常捕获
- 全球及中国教育行业投资动态与发展决策建议报告2022版
- 20145234黄斐《java程序设计》第六周
- 本地编译Hadoop2.8.0源码总结和问题解决(转自:http://blog.csdn.net/young_kim1/article/details/50324345)
- win10pin不可用进不去系统_解决win7系统下连接网络打印机不可用的处理方法
- springboot elasticsearch vue ik中文分词器 实现百度/京东全文搜索
- linux 软件 tar deb rmp,专业编剧软件Fade In Linux版提供deb、rpm、tar.gz包下载
- 高通的快充协议_高通QC5.0快充发布:百瓦级时代,高通被国产厂商牵着鼻子走了?...
- linux 内核/proc
- pg批量插入_在PostgreSQL中批量/批量更新/提升
- android获取有线、wifi、3G(4G)的IP
- linux系统中连接两个网桥,Linux 网桥代码分析 (二)
- PHP base64转图片
- camera成像能力-清晰度(Resolution,Sharpen)
- CopyPasteCharacter 快打輸入打勾、愛心、數學、表情特殊符號
- 英语词根词缀记忆法(全集)_语言学习 | 英语词根词缀学习参考
- 社会学与计算机哪个考研容易,国内几所较热的社会学系考研难度比较
- word对齐表格不在一行的文字