2121SC@SDUSC

storm-stream(2)

SingleStream

默认情况下:Spout发送到下游Bolt的stream-id,以及Bolt发送到下游Bolt或者接收上游Spout/Bolt的stream-id都是default。

可以对Spout/Bolt在发送消息时自定义stream-id,同时必须在声明输出字段时,指定对应的stream-id。

代码说明:发射时指定一个stream-id,声明流时指定一个stream-id,topology设置Bolt时除了通过Group的component-id,还会指定上游组件的stream-id,代码如下:

class RandomSentenceSpout {
public void nextTuple() {Utils.sleep(1000);String sentence = sentences[rand.nextInt(sentences.length)];System.out.println("\n" + sentence);this.collector.emit("split-stream", new Values(sentence));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declareStream("split-stream", new Fields("sentence"));
}
}class SplitSentenceBolt {
public void execute(Tuple tuple) {String sentence = tuple.getStringByField("sentence");String[] words = sentence.split(" ");for (String word : words) {this.collector.emit("count-stream", new Values(word));}this.collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declareStream("count-stream", new Fields("word"));
}
}class WordCountBolt {
public void execute(Tuple tuple) {String word = tuple.getString(0);Integer count = counts.get(word);if (count == null) count = 0;count++;counts.put(word, count);collector.emit("print-stream", new Values(word, count));}
public void declareOutputFields(OutputFieldsDeclarer     declarer) {declarer.declareStream("print-stream", new Fields("word", "count"));
}}class Topology {
main(){TopologyBuilder builder = new TopologyBuilder();builder.setSpout("spout", new RandomSentenceSpout(), 1);builder.setBolt("split", new SplitSentenceBolt(), 2).shuffleGrouping("spout", "split-stream");builder.setBolt("count", new WordCountBolt(), 2).fieldsGrouping("split", "count-stream", new Fields("word"));builder.setBolt("print", new PrinterBolt(), 1).shuffleGrouping("count", "print-stream");
}
}
MultiStream

Spout/Bolt发射时可以指定多个stream-id,同样要在声明输出字段时指定所有在发射过程指定的stream-id。
虽然每条消息的输出消息流并不一定会用到所有的stream,比如下面示例中一条消息发射到stream1和stream3,
另外一条消息发射到stream2和stream3,stream1和stream2是互斥的,不可能同时发送到这两个stream。
但是可以看到在declareStream中,要同时指定所有的stream-id。

public void execute(Tuple input) {String word = input.getString(0);//小于j的word发送给stream1; 大于j的word发送给stream2;if(word.compareTo("j") < 0){collector.emit("stream1", new Values(word));}else if(word.compareTo("j") > 0){collector.emit("stream2", new Values(word));}//不管什么都发送给stream3collector.emit("stream3", new Values(word));
}
public void declareOutputFields(final OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declareStream("stream1", new Fields("word"));outputFieldsDeclarer.declareStream("stream2", new Fields("word"));outputFieldsDeclarer.declareStream("stream3", new Fields("word"));
}

stream grouping就是用来定义一个stream应该如果分配给Bolts上面的多个Tasks。
storm里面有6种类型的stream grouping:

  1. Shuffle Grouping: 随机分组, 随机派发stream里面的tuple, 保证每个bolt接收到的tuple数目相同。轮询,平均分配,下游每个bolt均衡接收到上游的tuple。

  2. Fields Grouping:按字段分组, 比如按userid来分组, 具有同样userid的tuple会被分到相同的Bolts, 而不同的userid则会被分配到不同的Bolts。
    例如,如果流是按“user-id”字段分组的,具有相同“user-id”的元组将总是进入相同的任务,但是具有不同“user-id”的元组可能会进入不同的任务。

  3. All Grouping: 广播发送, 对于每一个tuple, 所有的Bolts都会收到。 流在bolt的所有任务中被复制,小心使用这个分组。

  4. Global Grouping: 全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。 整个流指向bolt的一个任务。具体来说,它将转到具有最低id的任务。所有tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。

  5. Non Grouping: 不分组, 这个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是一样的效果,不平均分配。此分组指定不关心流如何分组。目前还没有一个分组等同于shuffle分组。最终,Storm会把没有分组的螺栓按在同一个线程中执行,就像他们订阅的螺栓或喷口一样(如果可能的话)。

  6. Direct Grouping: 直接分组, 这是一种比较特别的分组方法,用这种分组意味着消息的发送者举鼎由消息接收者的哪个task处理这个消息。 只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。
    元组的生产者决定消费者的哪个任务将接收这个元组。直接分组只能在已声明为直接流的流上声明。发出到直接流的元组必须使用emitDirect方法之一发出。
    用这种分组意味着消息的发送者指定优消息接收者的某个task处理这个消息,只有被声明为DirectStream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的taskid(OutputCollector.emit方法也会返回taskid)。

storm-stream(2)相关推荐

  1. 流式处理框架storm浅析(下篇)

    本文来自网易云社区 作者:汪建伟 举个栗子 1 实现的目标 设计一个系统,来实现对一个文本里面的单词出现的频率进行统计. 2 设计Topology结构: 这是一个简单的例子,topology也非常简单 ...

  2. 简洁又快速地处理集合——Java8 Stream(下)

    上一篇文章我讲解 Stream 流的基本原理,以及它与集合的区别关系,讲了那么多抽象的,本篇文章我们开始实战,讲解流的各个方法以及各种操作 没有看过上篇文章的可以先点击进去学习一下 简洁又快速地处理集 ...

  3. CUDA编程之:Stream(流)

    CUDA Stream(流):指在设备(Device)上按主机(Host)代码发出的顺序执行的一系列异步的CUDA操作.Stream封装这些操作,管理它们的顺序,允许在所有先前操作之后在流中排队执行操 ...

  4. java8 —— Stream( 流 )

    文章目录 一.Stream( 流 )是什么? 二.Stream 的操作三个步骤 三.创建Stream 四.Stream 的中间操作 4.1.筛选与切片 4.2. 映射:(重点) 4.3. 排序: 五. ...

  5. 苹果上的Http Live Stream(HLS)技术初探

    最近在做RTMP转HLS的流媒体服务器项目.需要重新了解Http Live Stream(HLS)技术,于是,总结了一些相关技术细节,如下. 苹果的视频数据流播放技术要求         一如苹果的强 ...

  6. Storm入门(九)Storm常见模式之流聚合

    流聚合(stream join)是指将具有共同元组(tuple)字段的数据流(两个或者多个)聚合形成一个新的数据流的过程. 从定义上看,流聚合和SQL中表的聚合(table join)很像,但是二者有 ...

  7. Storm入门(一)原理介绍

    问题导读: 1.hadoop有master与slave,Storm与之对应的节点是什么? 2.Storm控制节点上面运行一个后台程序被称之为什么? 3.Supervisor的作用是什么? 4.Topo ...

  8. Node 深入Stream(2)

    1. Node.js 中有四种基本的流类型: Readable - 可读的流 (例如 fs.createReadStream()). Writable - 可写的流 (例如 fs.createWrit ...

  9. Storm入门(七)可靠性机制代码示例

    一.关联代码 使用maven,代码如下. pom.xml  参考 http://www.cnblogs.com/hd3013779515/p/6970551.html MessageTopology. ...

  10. Storm学习(一)---storm的安装及简单介绍

    1.何为storm Apache Storm是一个免费的开源分布式实时计算系统.Storm可以轻松可靠地处理无限数据流,实时处理Hadoop为批处理所做的工作.风暴很简单,可以与任何编程语言一起使用, ...

最新文章

  1. 《深入理解Hadoop(原书第2版)》——1.3大数据的编程模型
  2. GNS3做交换实验使用感受
  3. spring boot: GlobalDefaultExceptionHandler方法内的友好错误提示,全局异常捕获
  4. 全球及中国教育行业投资动态与发展决策建议报告2022版
  5. 20145234黄斐《java程序设计》第六周
  6. 本地编译Hadoop2.8.0源码总结和问题解决(转自:http://blog.csdn.net/young_kim1/article/details/50324345)
  7. win10pin不可用进不去系统_解决win7系统下连接网络打印机不可用的处理方法
  8. springboot elasticsearch vue ik中文分词器 实现百度/京东全文搜索
  9. linux 软件 tar deb rmp,专业编剧软件Fade In Linux版提供deb、rpm、tar.gz包下载
  10. 高通的快充协议_高通QC5.0快充发布:百瓦级时代,高通被国产厂商牵着鼻子走了?...
  11. linux 内核/proc
  12. pg批量插入_在PostgreSQL中批量/批量更新/提升
  13. android获取有线、wifi、3G(4G)的IP
  14. linux系统中连接两个网桥,Linux 网桥代码分析 (二)
  15. PHP base64转图片
  16. camera成像能力-清晰度(Resolution,Sharpen)
  17. CopyPasteCharacter 快打輸入打勾、愛心、數學、表情特殊符號
  18. 英语词根词缀记忆法(全集)_语言学习 | 英语词根词缀学习参考
  19. 社会学与计算机哪个考研容易,国内几所较热的社会学系考研难度比较
  20. word对齐表格不在一行的文字

热门文章

  1. Uber上市:谁成了高富帅,谁成了接盘侠?
  2. 一篇短文让你知道软件测试中的测试用例是啥
  3. 机器学习 数据挖掘 数据集划分 训练集 验证集 测试集
  4. php文件post跨域,【php】跨域post请求
  5. 64位eclipse免安装版下载
  6. 【PSU】AIX 11g RAC自动打GI PSU5
  7. sqlserver创建数据库的sql语句
  8. Java中this关键字及this()方法的使用
  9. 【2023最新】Postman安装教程
  10. 基于生态系统服务(InVEST模型)的人类活动、重大工程生态成效评估、论文写作等具体应用