kafka Streams实例
本文主要描述kafka Streams的三个流实例
一. Pipe 二. line Split 三. word count
启动kafka服务
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties创建输入输出流
bin/kafka-topics.sh --create
--bootstrap-server localhost:9092
--replication-factor 1
--partitions 1
--topic streams-plaintext-inputbin/kafka-topics.sh --create
--bootstrap-server localhost:9092
--replication-factor 1
--partitions 1
--topic streams-wordcount-output
--config cleanup.policy=compact利用JAVA IDE工具Idea 或者Eclipse创建一个maven项目。引入jar包
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.2.0</version> </dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>2.2.0</version> </dependency> 复制代码
所用的客户端版本与kafka服务器版本保持一致,本文采用的服务器版本是kafka_2.12-2.2.0
在新建的maven项目中新增Pipe处理类
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology;import java.util.Properties; import java.util.concurrent.CountDownLatch;public class Pipe {public static void main(String[] args) throws Exception {Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());final StreamsBuilder builder = new StreamsBuilder();builder.stream("streams-plaintext-input").to("streams-pipe-output");final Topology topology = builder.build();final KafkaStreams streams = new KafkaStreams(topology, props);final CountDownLatch latch = new CountDownLatch(1);// attach shutdown handler to catch control-cRuntime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {@Overridepublic void run() {streams.close();latch.countDown();}});try {streams.start();latch.await();} catch (Throwable e) {System.exit(1);}System.exit(0);} } 复制代码
启动该main方法,然后在输入流输入,输出流就有对应的输出。
在新增的maven的项目中新增LineSplit处理类
import org.apache.kafka.common.serialization.Serdes;import org.apache.kafka.streams.KafkaStreams;import org.apache.kafka.streams.StreamsBuilder;import org.apache.kafka.streams.StreamsConfig;import org.apache.kafka.streams.Topology;import org.apache.kafka.streams.kstream.KStream;import java.util.Arrays;import java.util.Properties;import java.util.concurrent.CountDownLatch;public class LineSplit {public static void main(String[] args) throws Exception {Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());final StreamsBuilder builder = new StreamsBuilder();KStream<String, String> source = builder.stream("streams-plaintext-input");source.flatMapValues(value -> Arrays.asList(value.split("\\W+"))).to("streams-linesplit-output");final Topology topology = builder.build();final KafkaStreams streams = new KafkaStreams(topology, props);final CountDownLatch latch = new CountDownLatch(1);// attach shutdown handler to catch control-cRuntime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {@Overridepublic void run() {streams.close();latch.countDown();}});try {streams.start();latch.await();} catch (Throwable e) {System.exit(1);}System.exit(0);} 复制代码
}
启动该main方法, 输出流结果为
6.在新增的maven项目中新增word count处理类
import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.common.serialization.Serdes;import org.apache.kafka.streams.KafkaStreams;import org.apache.kafka.streams.StreamsBuilder;import org.apache.kafka.streams.StreamsConfig;import org.apache.kafka.streams.kstream.KStream;import org.apache.kafka.streams.kstream.KTable;import org.apache.kafka.streams.kstream.Produced;import java.util.Arrays;import java.util.Locale;import java.util.Properties;import java.util.concurrent.CountDownLatch;public final class WordCount {public static void main(final String[] args) {final Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data// Note: To re-run the demo, you need to use the offset reset tool:// https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Toolprops.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");final StreamsBuilder builder = new StreamsBuilder();final KStream<String, String> source = builder.stream("streams-plaintext-input");final KTable<String, Long> counts = source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "))).groupBy((key, value) -> value).count();// need to override value serde to Long typecounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));final KafkaStreams streams = new KafkaStreams(builder.build(), props);final CountDownLatch latch = new CountDownLatch(1);// attach shutdown handler to catch control-cRuntime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {@Overridepublic void run() {streams.close();latch.countDown();}});try {streams.start();latch.await();} catch (final Throwable e) {System.exit(1);}System.exit(0);}}
复制代码
启动该main方法,查看输出结果命令: bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
--topic streams-wordcount-output
--from-beginning
--formatter kafka.tools.DefaultMessageFormatter
--property print.key=true
--property print.value=true
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
kafka Streams实例相关推荐
- kafka streams学习笔记
流式处理 流式处理是利用连续计算来处理无限数据流的能力,因为数据流是流动的.所以无须收集或存储数据以对其进行操作 这个弹珠图是流式处理的一个简单表示.图中每个圆圈代表某一特定时间点的某些信息或发生的事 ...
- Kafka Streams简介: 让流处理变得更简单
Introducing Kafka Streams: Stream Processing Made Simple 这是Jay Kreps在三月写的一篇文章,用来介绍Kafka Streams.当时Ka ...
- Kafka Streams的容错机制
Kafka Streams构建于Kafka本地集成的容错功能上.kafka分区具有高可用性和复制,因此当流数据持久保存到Kafka时,即使应用程序失败并需要重新处理时也可用.Kafka Streams ...
- kafka Streams
目录 一.简介 1.概述 2.批处理和流计算 3.Kafka Streams介绍 特点 概念介绍 二.Kafka Streams示例 1.单词统计 2.求和 3.窗口操作 一.简介 1.概述 Kafk ...
- Kafka Streams开发者指南
Kafka Streams 1.1 概述 Kafka Streams是一个客户端程序库,用于处理和分析存储在Kafka中的数据,并将得到的数据写回Kafka或发送到外部系统.Kafka Stream基 ...
- 大数据Hadoop之——Kafka Streams原理介绍与简单应用示例
文章目录 一.Kafka Streams概述 1)Kafka Streams是什么 2)流式计算与批量计算区别 3)Kafka Streams特点 二.Kafka Streams流处理拓扑 1)相关概 ...
- Kafka Streams 核心讲解
Kafka Stream 的特点如下: •Kafka Stream 提供了一个非常简单而轻量的 Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署•除了 Kafka 外, ...
- Kafka Streams(三十)
Kafka Streams Kafka 一直被认为是一个强大的消息中间件,它实现了高吞吐.高可用和低延时的消息传输能力,这让它成为流式处理系统中完美的数据来源.目前通用的一些流式处理框架如 Apach ...
- Kafka Streams流式原理解析
前言 本篇文章会从Kafka的核心流式计算原理进行分析,Kafka Streams Low-level processor API 和 核心概念,以及常见的应用场景分析 流式计算 通过业务场景去分析流 ...
最新文章
- 笑哭了,科研版《后浪》,那些人类积攒了几百年的文献,像是人类专门为你们准备的礼物...
- C++一天一个程序(三)
- javascript 动态修改css样式
- 几款优秀的点播、RTSP/RTMP直播播放器介绍
- 信息学奥赛一本通 1083:计算星期几 | OpenJudge NOI 小学奥数 7831
- Matplotlib 中文用户指南 8.1 屏幕截图
- 二叉树的应用 表达式处理_【每日编程208期】2018年408应用题41题
- html尾部代码_3分钟短文:Laravel Form,让你不再写 HTML 的好“库”
- 持久化数据结构(笔记)
- 4、组件注册-自定义TypeFilter指定过滤规则
- 啦啦外卖40.7 APP小程序三端 独立开源版本
- 兄弟9150cdn换硒鼓清零抹粉_兄弟打印机粉盒硒鼓怎么清零?这里有详细的步骤...
- 基于SpringBoot的房屋租赁管理系统
- 自动化测试工具Selenium Appium
- {电脑救助站}常用知识2
- 图片标注问题image_caption
- python爬虫爬取豆瓣电影为啥内容有缺失-Python爬虫之抓取豆瓣影评数据
- pure-ftpd安装与使用
- C/C++中的逻辑右移和算术右移
- js操作元素相关案例