模型图

代码

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.zhyoulun</groupId><artifactId>storm_study</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>0.9.7</version></dependency></dependencies></project>

MainTopology.java

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;public class MainTopology {public void runLocal(int waitSeconds) {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("sentenceSpout", new SentenceSpout(), 1);builder.setBolt("splitBolt", new SplitBolt(), 1).shuffleGrouping("sentenceSpout");builder.setBolt("countBolt", new CountBolt(), 1).shuffleGrouping("splitBolt");Config config = new Config();LocalCluster cluster = new LocalCluster();cluster.submitTopology("word_count", config, builder.createTopology());try {Thread.sleep(waitSeconds * 1000);} catch (InterruptedException e) {e.printStackTrace();}cluster.killTopology("word_count");cluster.shutdown();}public static void main(String[] args) {MainTopology topology = new MainTopology();topology.runLocal(60);}
}

SentenceSpout.java

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;import java.util.Map;public class SentenceSpout extends BaseRichSpout {private SpoutOutputCollector collector;private String[] sentences = {"hello world", "study storm"};private int index = 0;public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {this.collector = spoutOutputCollector;}public void nextTuple() {//将一句话拆分成单词,发送每一个词this.collector.emit(new Values(this.sentences[index]));index++;if (index >= sentences.length) {index = 0;}//等待500mstry {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}}public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("sentence"));}
}

SplitBolt.java

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;import java.util.Map;public class SplitBolt extends BaseRichBolt {private OutputCollector collector;public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {this.collector = outputCollector;}public void execute(Tuple tuple) {String sentence = tuple.getStringByField("sentence");String[] words = sentence.split(" ");//将一句话拆分成单词,发送每一个词for (String word : words) {this.collector.emit(new Values(word));}}public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("word"));}
}

CountBolt.java

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;import java.util.HashMap;
import java.util.Map;
import java.util.Set;public class CountBolt extends BaseRichBolt {private HashMap<String, Integer> wordMap = new HashMap<String, Integer>();public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {}public void execute(Tuple tuple) {//从tuple中读取单词String word = tuple.getStringByField("word");//计数int num;if (wordMap.containsKey(word)) {num = wordMap.get(word);} else {num = 0;}wordMap.put(word, 1 + num);//输出展示Set<String> keys = wordMap.keySet();for (String key : keys) {System.out.print(key + ":" + wordMap.get(key) + ",");}System.out.println();}public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {}
}

运行结果

...
5978 [Thread-9-countBolt] INFO  backtype.storm.daemon.executor - Prepared bolt countBolt:(2)
hello:1,
world:1,hello:1,
study:1,world:1,hello:1,
study:1,world:1,storm:1,hello:1,
study:1,world:1,storm:1,hello:2,
study:1,world:2,storm:1,hello:2,
study:2,world:2,storm:1,hello:2,
study:2,world:2,storm:2,hello:2,
study:2,world:2,storm:2,hello:3,
...
study:57,world:58,storm:57,hello:58,
study:58,world:58,storm:57,hello:58,
study:58,world:58,storm:58,hello:58,
study:58,world:58,storm:58,hello:59,
study:58,world:59,storm:58,hello:59,
64444 [main] INFO  backtype.storm.daemon.nimbus - Delaying event :remove for 30 secs for word_count-1-1511510371
study:59,world:59,storm:58,hello:59,
study:59,world:59,storm:59,hello:59,
64490 [main] INFO  backtype.storm.daemon.nimbus - Updated word_count-1-1511510371 with status {:type :killed, :kill-time-secs 30}
...

提交到storm

修改MainTopology.java

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;public class MainTopology {private TopologyBuilder builder;private Config config;public MainTopology() {this.builder = new TopologyBuilder();this.builder.setSpout("sentenceSpout", new SentenceSpout(), 1);this.builder.setBolt("splitBolt", new SplitBolt(), 1).shuffleGrouping("sentenceSpout");this.builder.setBolt("countBolt", new CountBolt(), 1).shuffleGrouping("splitBolt");this.config = new Config();}public void runCluster() {try {StormSubmitter.submitTopology("word_count",this.config,this.builder.createTopology());} catch (Exception e) {e.printStackTrace();}}public void runLocal(int waitSeconds) {LocalCluster cluster = new LocalCluster();cluster.submitTopology("word_count", this.config, this.builder.createTopology());try {Thread.sleep(waitSeconds * 1000);} catch (InterruptedException e) {e.printStackTrace();}cluster.killTopology("word_count");cluster.shutdown();}public static void main(String[] args) {MainTopology topology = new MainTopology();
//        topology.runLocal(60);topology.runCluster();}
}

修改pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.zhyoulun</groupId><artifactId>storm_study</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>0.9.7</version><!-- 不需要将这个依赖打入jar包中 --><scope>provided</scope></dependency></dependencies><build><plugins><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><archive><manifest><mainClass>MainTopology</mainClass></manifest></archive><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>

打包

mvn package assembly:single

target文件夹中会生成文件storm_study-1.0-SNAPSHOT-jar-with-dependencies.jar

上传到storm

storm jar storm_study-1.0-SNAPSHOT-jar-with-dependencies.jar MainTopology

查看运行状态

查看日志

tail -f apache-storm/logs/worker-6700.png
2017-11-24T08:27:37.260+0000 STDIO [INFO] storm:113,
2017-11-24T08:27:37.260+0000 STDIO [INFO] hello:113,
2017-11-24T08:27:37.760+0000 STDIO [INFO] study:113,
2017-11-24T08:27:37.760+0000 STDIO [INFO] world:113,
2017-11-24T08:27:37.760+0000 STDIO [INFO] storm:113,
2017-11-24T08:27:37.760+0000 STDIO [INFO] hello:114,
2017-11-24T08:27:37.760+0000 STDIO [INFO] study:113,
2017-11-24T08:27:37.760+0000 STDIO [INFO] world:114,
2017-11-24T08:27:37.760+0000 STDIO [INFO] storm:113,
2017-11-24T08:27:37.760+0000 STDIO [INFO] hello:114,
...

参考

  • Storm分布式实时计算模式
  • Storm实时数据处理

apache-storm例子:统计句子中的单词数量相关推荐

  1. PTA12、 统计文字中的单词数量并按出现次数排序 (10 分)

    12. 统计文字中的单词数量并按出现次数排序 (10 分) 现在需要统计若干段文字(英文)中的单词数量,并且还需统计每个单词出现的次数. 注1:单词之间以空格(1个或多个空格)为间隔. 注2:忽略空行 ...

  2. Python实现统计文本中各单词数量

    Python实现统计文本中各单词数量 代码 运行结果 代码 import strings = 'not talk,not helo show me your code hello helo' for ...

  3. Python 零基础入门到实战(一)笔记:内置对象、浅拷贝、深拷贝、计算圆面积、凯撒密码、英文大小写转换、输入国家打印出国家名和首都、输入数字英文输出、统计句子中的字母数量、猜随机数

    Python入门到实战(一) 为什么写这篇文章 Part 1 入门部分 什么是编程语言? Python的优势 目前应用 特点 Part 2 6种内置对象 1 整数.浮点数 2 字符串 3 列表 4 元 ...

  4. 统计文章中的单词数量

    #打开并读取文件 file = open(r"C:\Users\Administrator\Desktop\Walden.txt","r") lines = f ...

  5. python输入一个英文句子、统计单词个数_C语言编程求一个英文句子中的单词数和最长单词的位置、长度及输出这个单词。c++编程 从键盘输入一个英文...

    C语言编程求一个英文句子中的单词数和最长单词的位置.长度及输出这个单词. c++编程 从键盘输入一个英文 www.zhiqu.org     时间: 2020-11-23 我刚做了一关于英文句子里面每 ...

  6. python统计句子中单词个数_【python统计单词数量】作文写作问答 - 归教作文网

    如何用python统计一个txt文件中某个单词出现的次数 1.首先,定义一个变量,保存要统计的英文文章. 2.接着,定义两个数组,保存文章中的单词,以及各单词的词频. 3.从文章中分割出所有的单词,保 ...

  7. python统计句子中单词个数_python练习:一行搞定-统计一句话中每个单词出现的个数...

    原博文 2018-10-04 19:42 − 一行搞定-统计一句话中每个单词出现的个数 >>> s'i am a boy a bood boy a bad boy' 方式一:> ...

  8. 统计英文句子中的单词个数,并且输出每个单词

    package new_test; import java.util.Scanner; import java.util.StringTokenizer; //任务:统计英文句子中的单词个数,并且输出 ...

  9. java统计单词出现次数_java-统计一段句子中各单词出现的次数

    问题:统计一段句子中各单词出现的次数. 思路: 1.使用split方法将文章进行分割,我们这里以空格.逗号和句点为分隔符,然后存到一个字符串数组中. 2.创建一个hashMap集合,key是字符串类型 ...

  10. [c]输入一个英文句子,统计句子中单词的个数

    #include <stdio.h> #include <stdlib.h> //输入一个英文句子,统计句子中单词的个数 void main() { char s[256],p ...

最新文章

  1. 链表问题14——在单链表种删除指定值的节点
  2. 网络营销——网络营销专员表明网站地图助力网站收录进一步提升
  3. linux内核单独安装,Linux内核编译与安装
  4. 英特尔核芯显卡控制面板没有了_【有趣】第41期:被英特尔取消发售的CPU长啥样?...
  5. C#中深拷贝对象的简单方法
  6. 记录奥运-当今五大Java记录框架之间的竞赛
  7. d3 svg path添加文本_数据可视化——D3展现数据最炫丽的一面
  8. 2021-2025年中国电动门锁行业市场供需与战略研究报告
  9. 数据结构笔记(十)-- 循环队列
  10. Python多分支实现四则运算器
  11. 十种日常食物比砒霜还毒!
  12. 耐人寻味的 8 幅Java技术图
  13. 计算机控制技术课程设计温度控制系统,计算机控制技术课程设计PWM温度自动控制系统的设计...
  14. 企业微信有网页版登录吗?
  15. 京东面试Java后台--2018年9月16号(星期日)
  16. 嵌入式linux操作系统的移植 实验报告,嵌入式linux系统移植试题
  17. 基于微信小程序的健身小助手小程序
  18. 根据硬件ID查看摄像头型号方案,可查任何一款摄像头芯片来源
  19. 比心app源码,vue 视频播放
  20. 万卷书 - 21世纪的投资 21st Century Investing

热门文章

  1. a deep leaning
  2. mysql odbc 没有_如何解决mysql odbc安装丢失的问题
  3. hmcl整合包导入_我只用了一篇文章就完成了Spring+SpringMVC+MyBatis详细整合教程
  4. matlab连续时间系统复频域分析,实验五连续时间信号与系统的复频域分析的MATLAB实现.doc...
  5. mysql 通过ssh通道安全连接数据库
  6. jquery 发送验证码60s倒计时,后重新发送
  7. linux nginx源码安装
  8. springboot启动 fastDFS启动报错 mbeanExporter: defined by method 'mbeanExporter' in class path
  9. spring cloud分布式微服务-配置中心git示例
  10. 【转】90后还过五四吗?这些“脸熟”的过来人送给青年10句忠告