Storm的一些通用的Topology的模式
原文地址:http://storm.apache.org/documentation/Common-patterns.html
本文列举了Storm Topology的一些通用的模式:
- 流式聚合
- 批处理
- BasicBolt
- 内存中的缓存 + fields grouping的组合
- 流式的top N计算
- 使用TimeCacheMap来高效保存最近更新的数据的缓存
- 分布式RPC:CoordinatedBolt和KeyedFairBolt
聚合(Joins)
流聚合基于一些共同的字段把两个或者多个数据流聚合在一起。然而一个普通的数据库聚合的输入是有限的,语意也很明确,但流聚合的输入是无限的,语意并不明确。
每种应用的聚合方式也不尽相同。一些应用始终都把两个流的tuple聚合在一起,一些应用只想根据特定的字段进行聚合,而其他的应用的聚合逻辑可能完全不一样。在所有的聚合类型中,有一种通用的模式,就是用相同的方式对多个输入流进行划分(partitioning)。在Storm中可以在一些字段上使用fields grouping,这样可以轻松的把多个输入流聚合到joiner bolt,例如:
builder.setBolt("join", new MyJoiner(), parallelism).fieldsGrouping("1", new Fields("joinfield1", "joinfield2")).fieldsGrouping("2", new Fields("joinfield1", "joinfield2")).fieldsGrouping("3", new Fields("joinfield1", "joinfield2"));
当然,不同的数据流中的“相同”字段可以有不一样的名字。
批处理(Batching)
有时候为了性能或者一些别的原因,你想把一组tuple来个批处理,而不是一个个单独处理。例如,你可能想批量更新数据库或者以某种方式做一个流的聚合(aggregation)。
如果你想可靠的进行数据处理,正确的方式是保存这些tuple对象的引用,直到bolt批处理完成。一旦批处理完成,再对这些tuple做ack操作。
如果bolt发射(emit)tuple,那么你可能想使用multi-anchoring来确保可靠性。这个需要具体情况具体分析。参考Guaranteeing message processing来了解如何可靠工作的更多细节。
BasicBolt
许多bolt遵从一种简单的模式:
- 读取一个输入的tuple
- 根据这个输入的tuple,发射0个或者多个tuple
- 在execute方法的最后立即ack这个输入的tuple
内存中的缓存 + fields grouping组合(In-memory caching + fields grouping combo)
在Storm的bolt中保存一些缓存是很常见的。当你使用fields grouping来进行合并(combine)时,缓存变得特别有用。例如,假如你有一个bolt来把短链接转换成长链接(比如bit.ly,t.co之类)。你可以使用一个LRU缓存来维护短链接到长链接的映射关系来提高性能,防止同一个HTTP请求过于频繁。假设“urls”组件发射短链接,“expand”组件把短链接转换成长链接,并在内部维护一个缓存。看一下下面两段代码的不同之处:
builder.setBolt("expand", new ExpandUrl(), parallelism).shuffleGrouping(1);
builder.setBolt("expand", new ExpandUrl(), parallelism).fieldsGrouping("urls", new Fields("url"));
第2种方式使用缓存的效率比第1种要高得多,因为同样的URL始终会被发送到同一个task。这样会避免一个缓存会存在于多个task中,同时也能提高缓存的命中率(hit rate)。
流式Top N计算(Streaming top N)
Storm有一种常见的模式称为持续计算,Storm中的流式Top N计算正属于这个模式。假如你有一个bolt发射["value","count"]这种形式的tuple,并且有一个bolt来根据计数来发射top N的tuple。最简单的方式是有一个bolt在数据流上做一个global grouping,并维护一个top N的列表。
这种方式很明显对于数据量大的流没有扩展性,因为所有的流数据都会被发到同一个task。一种更好的方法是在多台机器上并行计算这个流中的一部分的top N,并且还有一个bolt来合并每一部分的top N中间计算结果,最终算出最后的top N(MR思想),这种模式看起来像是这样的:
builder.setBolt("rank", new RankObjects(), parallellism).fieldsGrouping("objects", new Fields("value"));
builder.setBolt("merge", new MergeObjects()).globalGrouping("rank");
这种模式可以工作,是由于第一个bolt做了fields grouping使得这种并行算法的语意正确。
你可以在storm-starter项目中找到一个示例程序here。
使用TimeCacheMap来高效保存最近更新的数据的缓存(TimeCacheMap for efficiently keeping a cache of things that have been recently updated)
你有时候想在内存中缓存最近活跃的对象,并想让那些一段时间内不活跃的对象自动过期。TimeCacheMap是一个高效的数据结构,适用于这样的需求。并提供了钩子(hook),所以你可以添加回调函数,当一个对象过期会被自动调用。(关于TimeCacheMap为什么高效,可以看看这篇分析文章)
分布式RPC:CoordinatedBolt和KeyedFairBolt(CoordinatedBolt and KeyedFairBolt for Distributed RPC)
在Storm之上构建分布式RPC应用时,通常会使用2种通用的模式。它们被封装为CoordinatedBolt和KeyedFairBolt中,同时也是Storm的代码库中的标准库的一部分。
CoordinatedBolt
包装你的bolt,其中包含了你的逻辑,并且确定何时你的bolt收到某个特定的请求对应的所有请求。主要在direct stream中使用。
KeyedFairBolt
也是用于包装你的bolt,其中包含了你的逻辑,并且保证你的topology同时处理多个DRPC调用,而不是串行的一个个处理。
更多分布式RPC请参考Distributed RPC。
Storm的一些通用的Topology的模式相关推荐
- 使用hive和sqoop来实现统计24小时每个时段的PV和UV,storm计算网站UV(去重计算模式)
[案例]使用hive和sqoop来实现网站基本指标,PV和UV 1.PV统计网页浏览总量 2.UV去重 ->[需求]统计24小时每个时段的PV和UV ->建分区表,按天一级,按小时一级,多 ...
- 第四节: EF调用存储过程的通用写法和DBFirst模式子类调用的特有写法
一. 背景 上一个章节,介绍了EF调用两类SQL语句,主要是借助 ExecuteSqlCommand 和 SqlQuery 两个方法来完成,在本章节主要是复习几类存储过程的写法和对应的EF调用这几类 ...
- Kafka+Storm+HBase项目Demo(5)--topology,spout,bolt使用
相关概念 1.Topologies 一个topology是spouts和bolts组成的图, 通过stream groupings将图中的spouts和bolts连接起来. 2.Streams 消息流 ...
- GRASP通用职责分配软件模式
1. 概述 它的核心思想是"职责分配(Responsibility Assignment)".GRASP提出了几个基本原则,用来解决面向对象设计的一些问题. Craig Larm ...
- storm计算网站UV(去重计算模式)
需求分析: UV统计 方案分析: 1,传统的方式是把session_id放入Set实现自动去重,Set.size()获得UV,但是这种方式只能在单机上有效 2,可行的方案(类似WordCount的计算 ...
- ASP.net 中的页面继承实现和通用页面的工厂模式的实现
最近用.Net做web项目的时候遇到了一些问题,就是很多的页面的处理一样的,不一样的就是我们写的存储过程不同,为了考虑代码的重复利用和可维护性和可 扩展性,于是写了一个对于单据页面的工厂模式,采用界面 ...
- STM32之通用定时器输入捕获模式
#include "stm32f10x.h" /* RCC时钟配置 */ void RCC_config() { ErrorStatus HSEStartUpStatus; /* ...
- STM32之通用定时器输出比较模式
#include "stm32f10x.h" /* RCC时钟配置 */ void RCC_config() { ErrorStatus HSEStartUpStatus; ...
- Storm的本地运行模式示例
以word count为例,本地化运行模式(不需要安装zookeeper.storm集群),maven工程, pom.xml文件如下: <project xmlns="http://m ...
最新文章
- Writing a FilterUnloadCallback Routine for a Minifilter Driver 为一个微过滤驱动写一个过滤器卸载回调例程...
- vue+ssr+axios
- jquery选择器之基本筛选器
- 用Go开发支持百万级数据量的高性能缓存服务
- 应届生是这辈子最大的一次优势,也是最后一次!
- Ambari安装client报错OSError:[Error 17] File exists
- tensorflow之regularizer
- python图像边缘检测报告_python计算机视觉2:图像边缘检测
- C语言实现顺序表(数据结构)
- 服装免费收银系统哪个好-云上铺会员管理软件
- 病毒名称:Hacktool (正当追杀+旁门左道)
- Java 常量字符串过长
- 使用Cesium的 Material (材质)绘制警戒线
- 博客左侧导航栏添加打赏功能(添加微信赞赏码)
- 前端性能优化 24 条建议
- 转载 - Linux使用技巧锦集
- 服务器安装Anaconda
- 某微型计算机指令格式如图,计算机组成原理期末复习试题2套不含答案
- 照片损坏只显示一半怎么修复?
- Excel数据分析常用函数②——统计函数(sumproduct,sumif,sumifs,count,countif,countifs,counta…)
热门文章
- 微信小程序将unicode格式内容转为中文
- SourceTec.Sothink.SWF.Decompiler.v5.1.516.Cracked-NGEN
- 华为matebook d装双系统
- AES-128加密解密方式(逐步更新)
- 【沧海拾昧】C# .Net SplitContainer(分割器)控件的使用笔记
- XML是什么,它可以做什么?——写给XML入门者
- fdisk in minix 源代码分析
- 评析VANCL与PPG的营销
- 【uni-app项目】瑞幸咖啡小程序
- 最新JAVA安装教程(Mac版)