什么是Storm?从入门到上手使用
Storm
离线计算
离线计算就是批量的处理数据,周期性的批量计算数据。
代表技术:
Sqoop – 批量导入数据
HDFS – 批量存储数据
MapReduce – 批量计算数据
Hive – 批量计算数据
流式/在线计算
流式/在线计算就是实时的数据传输,数据计算,实时的展示,就跟水龙头一样,不断的流出。
代表技术:
Flume – 实时获取数据
Kafka – 实时存储数据
Redis – 实时结果缓存
Storm/JStorm – 实时计算数据
Spark Stream – 实时计算数据
实时计算和离线计算区别:实时计算是要求在秒级或更小的时间单位内出来结果,离线一般是以小时、天为单位出来结果。
常见企业应用流程:数据源 -> Flume(获取数据) -> Kafka(缓存数据) -> Storm(消费数据并计算数据) - > Redis(保存数据,缓存) -> MySQL(持久化) - > 业务部门
什么是Storm?
Storm最初是Backtype公司的团队创建的,这个公司后面给Twitter收购了,这个项目就开源捐给了Apache,Apache Storm是在Eclipse public License下进行孵化开发的。
Apache Storm是一个免费开源的流式分布式实时计算框架,主要是Clojure和Java语言编写的,Storm能轻松、可靠的处理无界数据流,就像Hadoop对数据进行批处理。
特点
应用场景广泛:可以实时处理消息,更新数据库,持续计算等场景
高伸缩性:Storm的伸缩性可以让Stom每秒处理的消息量达到很高,扩展一个实时计算任务,如果你想要加机器并提高任务的并行度,Storm使用Zookeeper来作为协调机器内的各种配置,使得Storm的集群可以很容易得到扩展。
保证无数据丢失:Storm保证所有数据都被处理,数据传到了肯定能被运算的保证。
异常健壮性:Storm集群非常容易管理,轮流重启节点不影响应用。
容错性好:在消息处理的过程中出现异常,Storm会重试
应用场景
Storm用来实时计算分析源源不断的数据,如同流水线生产,所以Storm可以应用到很多场景,在线机器学习,连续计算等
推荐系统:实时推荐,根据下单或加入购物车,马上在页面下方推荐相关商品。
金融系统:实时分析股票信息数据。
预警系统:根据实时采集的数据,判断是否到了预警的阈值。
网站统计:双十一的实时销量等
Storm核心组件与运行过程
Nimbus:是整个集群的管控核心,是一个主控节点,负责Topology的提交,运行状态的监控,任务重新分配、监控主机故障等工作。
Supervisor:就是领取Nimbus分配的任务与作业、管理Nimbus已经分配的工作进程(Worker)等。
Worker:是Spout和Bolt中具体处理逻辑的进程,一个Worker就是一个进程,一个进程要包含有一个或多个线程。
Execute:任务的执行者之一,一个线程就一个executor,一个线程会处理一个或多个Task。
Task:就是具体要干的任务,一个任务一个Task。
Zookeeper:就是一个监控、监督者,告知Supervisor要领取什么任务,任务执行完成没有等。
元组(Tuple):是消息传递的基本单元,是一个命名值列表,元组中的字段可以是任何的类型对象,Storm使用元组作为其数据模型,元组支持所有基本类型、字符串和字节数组作为字段值,只要实现了类型的序列化接口就可以使用该类型对象进行传递,元组本来应该是Key – Value的Map,但是由于各组件间传递的元组的字段名称已经事先定义好,所以只要按照顺序把元组填入各个Value即可,所以元组是一个Value的List。
流(Stream):流是Storm的核心抽象,是一个无界的元组系列,源源不断的传递元组就组成了流,在分布式环境中并行的进行创建和处理。
拓扑(Topology):是一个逻辑上的网络拓扑结构,是Storm中运行的一个实时应用程序,因为是由各个组装件间的消息流动而形成的逻辑上拓扑,所以也可以理解为是一个jar包。
类似MapReduce作业(job),主要区别在于MapReduce作业最终会完成,而一个拓扑永远运行知道他被kill掉,就是相当于网络节点,会不断地运行、交互数据,除非把骨干网给干掉。
就是数据流来流去,记录下这些数据在各个组件之间的流向,形成的一张逻辑图。
水龙头(Spout):是拓扑的流的来源是,是一个拓扑中产生元数据的组件,通常情况下Spout会从外部数据源中读取数据,然后转换为拓扑内部的元数据。
Spout是可靠的,也可以是不可靠的,如果Storm处理元组失败,可靠的Spout能够重新发送,而不可靠的Spout就尽快忘记发出的元组,另外Spout可以发出超过一个流。
Spout的主要方法是nextTyple(),这个方法会发出一个Tuple到拓扑,如果没有新的元组发出,则简单返回。
Spout的其他方法是ack()和fail(),当Storm检测到一个元组从Spout发出时,ack和fail()会被调用,要么成功完成,通过拓扑,要么未能完成,ack()和fail()只被可靠的Spout调用,也就是数据是发出去了。
ISpout的Java Api是Spout的顶级接口。
转接头(Bolt):在拓扑中所有处理都在Bolt中完成,Bolt是流的处理节点,从一个拓扑接收数据,然后进行处理的组件,Bolt可以完成过滤、业务处理、连接运算、连接和访问数据库等任何操作。
Bolt是一个被动的角色,其接口中有一个execute方法,在接收到消息后会调用这个方法,用户可以在其中执行自己希望的操作
Bolt可以完成简单流的转换,而完成复杂的流转换通常需要多个步骤,因此需要多个Bolt进行完成。
Bolt可以发出超过一个流。
Bolt的Java Api顶级接口是IBolt;
Storm和Hadoop区别
Storm |
Hadoop |
描述 |
|
应用名称 |
Topology |
Job |
任务名称 |
角色 |
Nimbus |
JobTracker |
任务管理者 |
Supervisor |
TaskTracker |
任务执行小组 |
|
Worker |
Child |
小组成员 |
|
编程接口 |
Spout/Bolt |
Mapper/Reducer |
小组成员干活角色,数据获取、数据处理、数据收集等 |
应用场景 |
实时计算 |
离线计算 |
接收的任务类型 |
处理数据存放 |
内存 |
文件系统 |
执行任务的办公区域 |
数据处理 |
流式处理(来一些处理一些) |
批量处理 |
任务处理方式 |
Storm常用命令
启动Nimbus:bin/storm nimbus &
启动Supervisor:bin/storm supervisor &
启动UI界面:bin/storm ui &
查看运行拓扑:bin/storm list
帮助:bin/storm help
启动Web页面日志查看功能:bin/storm logviewer &
运行jar包:storm jar storm- word-count.jar com.levi.storm.WordCountMain wc
jar包路径 包名+类名 拓扑名
杀死运行的拓扑:bin/storm kill wc
激活指定拓扑:bin/storm activate wc
禁用指定的拓扑Spout:bin/storm deactivate wc
分组策略
分组是用来定义Stream应该分配给Bolts上的哪个execute来去执行,因为多线程、并发的情况下则会有多个人去干execute的事,Storm内部提供了7种分组策略。
分组的前期是线程数要大于1。
Shuffle Grouping:随机分组、轮询、平均分配、随机分配Stream里面的Tuple,保证每个Bolt接收到的Tuple数量大致相同,这个是比较常用的。
Fileds Grouping:按照字段分组,比如按照user id进行分组,相同的Id Tuple则分到同一个Bolts里面的task,不同的id则分配到不同的Task,这个也是比较常用的。
All Grouping:广播发送,对于每一个Tuple,所有Bolt都会接收到。
Global Grouping:全局分组,这个Tuple被分配id值最低的那个Taks(线程Id最小的)。
Non Grouping:不分组,Stream不关心到底谁会收到他的Tuple,目前这种分组和Shuffle Grouping是一样的效果,在多线程的情况不平均分配。
Direct Grouping:直接分组,这是一种特殊的分组,发送者指定消息接收者的哪个Task来处理,只有声明为direct stream的消息流可以声明这种方法,发射Tuple也要用emitDirect
Local Or Shuffle Grouping:如果目标Bolt有一个或多个Task在同一个工作进程中,Tuple会随机的发送给这些Tasks,否则和普通的Shuffle Grouping行为一致
并发度
并发度是用户指定一个任务,可以被多个线程执行,并发度的数量等于线程execute数量。
Task就是具体的处理业务逻辑对象,一个executor线程可以执行一个或多个tasks,一般默认每个executor只执行一个task,所以我们往往认为task就是执行线程,其实不是。
Task代表最大并发度,一个Component的task数量不改变的,但是一个Component的execute数量会发生改变。
Task数 >= execute数量,executor数代表实际并发数。
一个executor一个线程。
Worker(进程) -> executor(线程)
Java API – WordCount
WordSpout
public class WordSpout extends BaseRichSpout { /** * 水龙头对象,打开nextTuple()则输水给下一个接收者 */ private SpoutOutputCollector spoutOutputCollector; @Override public void nextTuple() { spoutOutputCollector.emit(new Values("I am go to shanghai beijing shanghai")); try { Thread.sleep(100000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector arg2) { spoutOutputCollector = arg2; } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("sentence")); } } |
WordSplitBolt
public class WordSplitBolt extends BaseRichBolt { //继续输出 private OutputCollector outputCollector; @Override public void execute(Tuple tuple) { String [] strings = tuple.getString(0).split(" "); //发送给下一级 for (String string : strings) { outputCollector.emit(new Values(string,1)); } } @Override public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) { outputCollector = arg2; } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { //定义发送给下一级的Key outputFieldsDeclarer.declare(new Fields("word","num")); System.err.println("切割好数据!"); } } |
WordCountBolt
public class WordCountBolt extends BaseRichBolt { private Map<String, Integer> map = new HashMap<>(); @Override public void execute(Tuple tuple) { String word = tuple.getString(0); int num = tuple.getInteger(1); //去重,统计总数量 if(map.containsKey(word)) { map.put(word, map.get(word) + num); }else { map.put(word,num); } System.out.println(Thread.currentThread().getId() + "-----" + word + "---" + map.get(word)); } @Override public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) { } @Override public void declareOutputFields(OutputFieldsDeclarer arg0) { } } |
WordCountMain
public class WordMain { public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { //创建拓扑 TopologyBuilder topologyBuilder = new TopologyBuilder(); //水龙头 topologyBuilder.setSpout("WordSpout", new WordSpout(),1); //转接头1,切分数据,开启4个线程,分组策略用fieldsGrouping topologyBuilder.setBolt("WordSplitBolt", new WordSplitBolt(),4) //按照字段进行分组处理 .fieldsGrouping("WordSpout", new Fields("sentence")); //转接头2,统计切分后的数据,数据来源于WordSplitBolt topologyBuilder.setBolt("WordCountBolt", new WordCountBolt(),1) .fieldsGrouping("WordSplitBolt", new Fields("word")); //配置对象 Config config = new Config(); //提交 if(args.length > 0) {//集群执行 StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology()); }else {//本地 new LocalCluster().submitTopology("WordSplitCount", config, topologyBuilder.createTopology()); } } } |
什么是Storm?从入门到上手使用相关推荐
- 从入门到入土:Python爬虫学习|Selenium自动化模块学习|简单入门|轻松上手|自动操作浏览器进行处理|chrome|PART01
此博客仅用于记录个人学习进度,学识浅薄,若有错误观点欢迎评论区指出.欢迎各位前来交流.(部分材料来源网络,若有侵权,立即删除) 本人博客所有文章纯属学习之用,不涉及商业利益.不合适引用,自当删除! 若 ...
- 一篇“从入门到上手”的Solidworks机械设计教程
一篇"从入门到上手"的Solidworks机械设计教程 机器人专业的学生,多多少少总需要和机械结构打点交道.有时候可能是做个安装设备的架子,有时候可以需要个简单的运动机构,这些简单 ...
- 一篇“从入门到上手”的PCB设计教程
一篇"从入门到上手"的PCB设计教程 这是一篇面向神马都不懂的小白玩家的PCB设计教程.希望能帮助大家快速上手PCB的设计. 1 预备知识 1.1 常用工具 (1)做图工具:Alt ...
- Storm学习入门视频教程
Storm流计算从入门到精通之技术篇(高并发策略.批处理事务.Trident精解.运维监控.企业场景) 课程讲师:Cloudy 课程分类:大数据 适合人群:初级 课时数量:28课时 用到技术:Stor ...
- Vue入门到上手(10)—— VueJs 填坑日记之在项目中使用Amaze UI
Vue入门到上手(10)-- VueJs 填坑日记之在项目中使用Amaze UI 上一篇博文,我们把jQuery集成到了项目中,今天我们来集成Amaze ui(妹子UI).先来介绍一下妹子UI.Ama ...
- Python学习路线图(从入门到上手)
这是我刚开始学习python时的一套学习路线,从入门到上手.(不敢说精通,哈哈~) 希望对大家有帮助哈~ 大家需要高清得python学习路线可以 一.Python入门.环境搭建.变量.数据类型 二.P ...
- 视频教程-跟风舞烟学大数据可视化-Echarts从入门到上手实战-JavaScript
跟风舞烟学大数据可视化-Echarts从入门到上手实战 网名风舞烟,中国科技大学计算机专业.微软认证讲师(MCE).微软数据分析讲师.10多年软件行业从业经验,参与过数百万的企业级ERP系统,在大数据 ...
- python从入门到_Python学习路线从入门到上手,如何快速Python学习?
因为清晰易读的风格,广泛的适用性,Python已经成为最受欢迎的编程语言之一.在TIOBE 排行榜中位居第四,是名副其实的人工智能第一语言. 风靡的另一个原因是,Python有非常多的第三方库.比如用 ...
- apache Storm之一-入门学习
准备工作 这个教程使用storm-starter项目里面的例子.我推荐你们下载这个项目的代码并且跟着教程一起做.先读一下:配置storm开发环境和新建一个strom项目这两篇文章把你的机器设置好. 一 ...
最新文章
- 自然语言处理:网购商品评论情感判定
- Arduino可穿戴教程之第一个程序——Blink(一)
- pandorabox php7,【恐惧交响4之潘多拉】ProjectSAM Symphobia 4 Pandora v1.0.7
- 关于Juniper ScreenOS MIP/VIP地址说明
- java actor和线程有什么区别_Scala Actor与java并发编程的区别
- F - Heron and His Triangle UVALive - 8206
- c语言与64位windows不兼容_微软发布可模拟 64 位 x86 程序的 ARM 版 Windows 10
- 微课|中学生可以这样学Python(例8.21):选择法排序
- C语言中INT数据类型的最小值和最大值
- java过滤器应用实例_Servlet过滤器Filter的简单介绍(附示例)
- SSM整合配置文件总结
- 删除数组对象 相同的值 制定数组对象
- VB2010(18)_各种对话框的使用
- mysql 批量插入 性能_MySQL批量插入数据性能比较
- redis集群搭建管理入门
- 使用<details>标签在网页里面添加脚注
- 使用iperf测试网速
- AIX磁盘管理基础知识
- html如何缩进对齐,你如何缩进你的HTML?
- 如何格式化输出JSON数据