案例要实现的目标

在Kafka的shell 客户端中输入内容,通过Storm实时去kafka中取数据并进行计算单词出现的次数,并且实时把这些数据信息存储到redis中。

代码编写

编写Pom文件,代码如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.toto.storm.kafkastormredis</groupId><artifactId>kafkastormredis</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><!--<scope>provided</scope>--><version>1.1.0</version></dependency><dependency><groupId>org.apache.storm</groupId><artifactId>storm-kafka</artifactId><version>1.1.0</version></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.7.3</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.8.2</artifactId><version>0.8.1</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions></dependency></dependencies><build><plugins><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><!--告诉运行的主类是哪个,注意根据自己的情况,下面的包名做相应的修改--><mainClass>cn.toto.strom.wordcount.StormTopologyDriver</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.7</source><target>1.7</target></configuration></plugin></plugins></build>
</project>

在strom案例中需要有spout接收数据。在一些常规学习用的案例中通常从一个文件中获取数据。通常的代码如下:

package cn.toto.storm.kafkastormredis;/*** Created by toto on 2017/6/20.*/import org.apache.commons.lang.StringUtils;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;/*** 这个类是模拟从文件中读取数据的代码。在本案例的strom + kafka + redis的案例中将用不到。** @author tuzq* @create 2017-06-20 23:41*/
public class MyLocalFileSpout extends BaseRichSpout {private SpoutOutputCollector collector;private BufferedReader bufferedReader;/*** 初始化方法* @param map* @param context* @param collector*/@Overridepublic void open(Map map, TopologyContext context, SpoutOutputCollector collector) {this.collector = collector;try {this.bufferedReader = new BufferedReader(new FileReader(new File("E:/wordcount/input/1.txt")));} catch (Exception e) {e.printStackTrace();}}/*** Strom实时计算的特性就是对数据一条一条的处理* while(true) {*     this.nextTuple();* }*/@Overridepublic void nextTuple() {//每被调用一次就会发送一条数据出去try {String line = bufferedReader.readLine();if (StringUtils.isNotBlank(line)) {List<Object> arrayList = new ArrayList<Object>();arrayList.add(line);collector.emit(arrayList);}} catch(Exception e) {e.printStackTrace();}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("juzi"));}}

在spout编写完成之后,通常通过Bolt来进行文本的切割。在下面的切割代码中,模拟的是从kafka中获取数据,并进行切割。代码如下:

package cn.toto.storm.kafkastormredis;/*** Created by toto on 2017/6/21.*/import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;/*** 这个Bolt模拟从kafkaSpout接收数据,并把数据信息发送给MyWordCountAndPrintBolt的过程。** @author tuzq* @create 2017-06-21 9:14*/
public class MySplitBolt extends BaseBasicBolt {@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {//1、数据如何获取//如果StormTopologyDriver中的spout配置的是MyLocalFileSpout,则用的是declareOutputFields中的juzi这个key//byte[] juzi = (byte[]) input.getValueByField("juzi");//2、这里用这个是因为StormTopologyDriver这个里面的spout用的是KafkaSpout,而KafkaSpout中的declareOutputFields返回的是bytes,所以下面用bytes,这个地方主要模拟的是从kafka中获取数据byte[] juzi = (byte[]) input.getValueByField("bytes");//2、进行切割String[] strings = new String(juzi).split(" ");//3、发送数据for (String word : strings) {//Values对象帮我们生成一个listcollector.emit(new Values(word,1));}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word","num"));}
}

对文本信息进行切割之后,需要对数据进行统计,这里使用另外一个Bolt来完成,代码如下:

package cn.toto.storm.kafkastormredis;/*** Created by toto on 2017/6/21.*/import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
import redis.clients.jedis.Jedis;import java.util.HashMap;
import java.util.Map;/*** 用于统计分析,并且把统计分析的结果存储到redis中。** @author tuzq* @create 2017-06-21 9:22*/
public class MyWordCountAndPrintBolt extends BaseBasicBolt {private Jedis jedis;private Map<String,String> wordCountMap = new HashMap<String,String>();@Overridepublic void prepare(Map stormConf, TopologyContext context) {//连接redis---代表可以连接任何事物jedis = new Jedis("hadoop11",6379);super.prepare(stormConf,context);}@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {String word = (String) input.getValueByField("word");Integer num = (Integer) input.getValueByField("num");//1、查看单词对应的value是否存在Integer integer = wordCountMap.get(word) == null ? 0 : Integer.parseInt(wordCountMap.get(word));if (integer == null || integer.intValue() == 0) {wordCountMap.put(word,num + "");} else {wordCountMap.put(word,(integer.intValue() + num) + "");}//2、保存到redisSystem.out.println(wordCountMap);//redis key wordcount:-->Mapjedis.hmset("wordcount",wordCountMap);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {//todo 不需要定义输出的字段}
}

接下来通过一个Driver串联起Spout、Bolt实现实时计算,代码如下:

package cn.toto.storm.kafkastormredis;/*** Created by toto on 2017/6/21.*/import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.topology.TopologyBuilder;/*** 这个Driver使Kafka、strom、redis进行串联起来。** 这个代码执行前需要创建kafka的topic,创建代码如下:* [root@hadoop1 kafka]# bin/kafka-topics.sh --create --zookeeper hadoop11:2181 --replication-factor 1 -partitions 3 --topic wordCount** 接着还要向kafka中传递数据,打开一个shell的producer来模拟生产数据* [root@hadoop1 kafka]# bin/kafka-console-producer.sh --broker-list hadoop1:9092 --topic wordCount* 接着输入数据** @author tuzq* @create 2017-06-21 9:39*/
public class StormTopologyDriver {public static void main(String[] args) throws Exception {//1、准备任务信息TopologyBuilder topologyBuilder = new TopologyBuilder();topologyBuilder.setSpout("KafkaSpout",new KafkaSpout(new SpoutConfig(new ZkHosts("hadoop11:2181"),"wordCount","/wordCount","wordCount")),2);topologyBuilder.setBolt("bolt1",new MySplitBolt(),4).shuffleGrouping("KafkaSpout");topologyBuilder.setBolt("bolt2",new MyWordCountAndPrintBolt(),2).shuffleGrouping("bolt1");//2、任务提交//提交给谁?提交内容Config config = new Config();config.setNumWorkers(2);StormTopology stormTopology = topologyBuilder.createTopology();//本地模式LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("wordcount",config,stormTopology);//集群模式//StormSubmitter.submitTopology("wordcount1",config,stormTopology);}
}

运行程序
1、启动Kafka集群,启动方式参考博文:
http://blog.csdn.net/tototuzuoquan/article/details/73430874
2、启动redis,启动和安装方式参考博文:
http://blog.csdn.net/tototuzuoquan/article/details/43611535
3、在kafka上创建topic,参考博文:
http://blog.csdn.net/tototuzuoquan/article/details/73432256
这里我们使用:

//创建kafka的topic

[root@hadoop1 ~]# cd $KAFKA_HOME
[root@hadoop1 kafka]# bin/kafka-topics.sh --create --zookeeper hadoop11:2181 --replication-factor 1 -partitions 3 --topic wordCount

接下来创建producer,来发送数据到kafka:

[root@hadoop1 kafka]# bin/kafka-console-producer.sh --broker-list hadoop1:9092 --topic wordCount

在上面输入数据。

4、运行程序,进入StormTopologyDriver,右键run.最后的效果如下:

5、最后如果想看MyWordCountAndPrintBolt中记录到redis的wordcount内容,可以编写如下代码案例:

package cn.toto.storm.kafkastormredis;/*** Created by toto on 2017/6/21.*/import redis.clients.jedis.Jedis;import java.util.Map;/*** 代码说明** @author tuzq* @create 2017-06-21 10:13*/
public class TestRedis {public static void main(String[] args) {Jedis jedis = new Jedis("hadoop11",6379);Map<String,String> wordcount = jedis.hgetAll("wordcount");System.out.println(wordcount);}
}

运行后的结果如下:

Strom+Kafka + redis实时计算单词出现频率的案例相关推荐

  1. Spark Streaming实现实时WordCount,DStream的使用,updateStateByKey(func)实现累计计算单词出现频率

    一. 实战 1.用Spark Streaming实现实时WordCount 架构图: 说明:在hadoop1:9999下的nc上发送消息,消费端接收消息,然后并进行单词统计计算. * 2.安装并启动生 ...

  2. 用户行为分析大数据系统(实时统计每个分类被点击的次数,实时计算商品销售额,统计网站PV、UV )

    Spark Streaming实战对论坛网站动态行为pv,uv,注册人数,跳出率的多维度分析_小强签名设计 的博客-CSDN博客_spark streaming uv 实时统计每天pv,uv的spar ...

  3. 滴滴实时计算平台在运营监控方面的应用

    项目背景 这几年随着滴滴业务的不断发展,用户(乘客.司机)在体验的改善上不断提出新的要求.不仅如此,滴滴内部数据和运营团队越来越实时.丰富和精准的营销策略对底层技术支撑平台日益严格和高效的要求,不仅要 ...

  4. vivo 实时计算平台建设实践

    作者:vivo 互联网实时计算团队- Chen Tao 本文根据"2022 vivo开发者大会"现场演讲内容整理而成. vivo 实时计算平台是 vivo 实时团队基于 Apach ...

  5. 实时计算 Flink 版应用场景解读

    简介:本文由阿里巴巴高级产品专家陈守元老师分享,详细讲解实时计算 Flink 的具体业务场景并分享实时计算 Flink 的相关应用案例. 作者:陈守元(巴真),阿里巴巴高级产品专家 摘要:本文由阿里巴 ...

  6. 实时计算 Flink 版应用场景与产品介绍

    摘要:本文由阿里巴巴高级产品专家陈守元老师分享,详细讲解实时计算 Flink 的具体业务场景并分享实时计算 Flink 的相关应用案例. 内容分为以下四部分: 技术原理 技术应用 应用场景 行业案例 ...

  7. 【号外】实时计算征集用户案例,看看谁是实时计算幸运锦鲤!

    阿里云 实时计算今年4月份正式商业化之后,短短2个月,已增加100+付费用户.截止目前,使用用户已经超过2000家.在已有的用户中,实时计算主要应用于实时互联网数据分析.实时数据大屏.实时金融风控.电 ...

  8. 流式计算strom,Strom解决的问题,实现实时计算系统要解决那些问题,离线计算是什么,流式计算什么,离线和实时计算区别,strom应用场景,Strorm架构图和编程模型(来自学习资料)

    1.背景-流式计算与storm 2011年在海量数据处理领域,Hadoop是人们津津乐道的技术,Hadoop不仅可以用来存储海量数据,还以用来计算海量数据.因为其高吞吐.高可靠等特点,很多互联网公司都 ...

  9. php单词出现频率,PHP编程计算文件或数组中单词出现频率的方法

    本文实例讲述了PHP编程计算文件或数组中单词出现频率的方法.分享给大家供大家参考,具体如下: 如果是小文件,可以一次性读入到数组中,使用方便的数组计数函数进行词频统计(假设文件中内容都是空格隔开的单词 ...

最新文章

  1. 图解 CSS (9): 列表
  2. 【slighttpd】基于lighttpd架构的Server项目实战(5)—TCP的TIME_WAIT状态
  3. 台湾印象之八:海角七号
  4. 各大厂抢招WPF,小米这回是下了血本啊...
  5. android jobb工具,android – 是什么导致jobb工具抛出FAT Full IOException?
  6. 【Vue2.0学习】—Vuex工作原理图(二十五)
  7. 您如何合并两个Git存储库?
  8. Codeforces Edu Round 68 (Rated for Div. 2)
  9. WPF依赖属性(续)(4)依赖属性与数据绑定
  10. 初探Nacos(四)-- SpringBoot下使用Nacos作为配置中心
  11. php 同时登录怎么办,php 实现同一个账号同时只能一个人登录
  12. 第7章 航空公司客户价值分析
  13. 第九届蓝桥杯国赛C++B组口胡题解
  14. 使用Feign调用服务接口
  15. 服务器推送微信订阅消息,微信小程序-订阅消息服务通知
  16. 四旋翼飞行器14——无人机中的OSD、数传、图传、FPV是什么?
  17. IndProp章节中pumping lemma的证明
  18. 周总结2022.1.10-2022.1.16
  19. 玩乐购与京东天猫深度合作 打造云购全网最低价
  20. 英语发音规则---Y字母

热门文章

  1. VTK:快速傅里叶变换用法实战
  2. VTK:细胞定位器可视化用法实战
  3. wxWidgets:wxGridSizeEvent类用法
  4. wxWidgets:wxFrame类用法
  5. boost::safe_numerics::safe_unsigned_range相关的测试程序
  6. boost::reverse相关的测试程序
  7. boost::mp11::mp_flatten相关用法的测试程序
  8. boost::mp11::mp_append相关用法的测试程序
  9. boost::fibers::future用法的测试程序
  10. GDCM:gdcm::Coder的测试程序