本文主要描述kafka Streams的三个流实例
一. Pipe 二. line Split 三. word count

  1. 启动kafka服务
    bin/zookeeper-server-start.sh config/zookeeper.properties
    bin/kafka-server-start.sh config/server.properties

  2. 创建输入输出流

    bin/kafka-topics.sh --create
    --bootstrap-server localhost:9092
    --replication-factor 1
    --partitions 1
    --topic streams-plaintext-input

    bin/kafka-topics.sh --create
    --bootstrap-server localhost:9092
    --replication-factor 1
    --partitions 1
    --topic streams-wordcount-output
    --config cleanup.policy=compact

  3. 利用JAVA IDE工具Idea 或者Eclipse创建一个maven项目。引入jar包

    <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.2.0</version>
    </dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>2.2.0</version>
    </dependency>
    复制代码

所用的客户端版本与kafka服务器版本保持一致,本文采用的服务器版本是kafka_2.12-2.2.0

  1. 在新建的maven项目中新增Pipe处理类

    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.Topology;import java.util.Properties;
    import java.util.concurrent.CountDownLatch;public class Pipe {public static void main(String[] args) throws Exception {Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());final StreamsBuilder builder = new StreamsBuilder();builder.stream("streams-plaintext-input").to("streams-pipe-output");final Topology topology = builder.build();final KafkaStreams streams = new KafkaStreams(topology, props);final CountDownLatch latch = new CountDownLatch(1);// attach shutdown handler to catch control-cRuntime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {@Overridepublic void run() {streams.close();latch.countDown();}});try {streams.start();latch.await();} catch (Throwable e) {System.exit(1);}System.exit(0);}
    }
    复制代码

    启动该main方法,然后在输入流输入,输出流就有对应的输出。

  1. 在新增的maven的项目中新增LineSplit处理类

     import org.apache.kafka.common.serialization.Serdes;import org.apache.kafka.streams.KafkaStreams;import org.apache.kafka.streams.StreamsBuilder;import org.apache.kafka.streams.StreamsConfig;import org.apache.kafka.streams.Topology;import org.apache.kafka.streams.kstream.KStream;import java.util.Arrays;import java.util.Properties;import java.util.concurrent.CountDownLatch;public class LineSplit {public static void main(String[] args) throws Exception {Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());final StreamsBuilder builder = new StreamsBuilder();KStream<String, String> source = builder.stream("streams-plaintext-input");source.flatMapValues(value -> Arrays.asList(value.split("\\W+"))).to("streams-linesplit-output");final Topology topology = builder.build();final KafkaStreams streams = new KafkaStreams(topology, props);final CountDownLatch latch = new CountDownLatch(1);// attach shutdown handler to catch control-cRuntime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {@Overridepublic void run() {streams.close();latch.countDown();}});try {streams.start();latch.await();} catch (Throwable e) {System.exit(1);}System.exit(0);}
    复制代码

    }

启动该main方法, 输出流结果为

6.在新增的maven项目中新增word count处理类

    import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.common.serialization.Serdes;import org.apache.kafka.streams.KafkaStreams;import org.apache.kafka.streams.StreamsBuilder;import org.apache.kafka.streams.StreamsConfig;import org.apache.kafka.streams.kstream.KStream;import org.apache.kafka.streams.kstream.KTable;import org.apache.kafka.streams.kstream.Produced;import java.util.Arrays;import java.util.Locale;import java.util.Properties;import java.util.concurrent.CountDownLatch;public final class WordCount {public static void main(final String[] args) {final Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data// Note: To re-run the demo, you need to use the offset reset tool:// https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Toolprops.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");final StreamsBuilder builder = new StreamsBuilder();final KStream<String, String> source = builder.stream("streams-plaintext-input");final KTable<String, Long> counts = source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "))).groupBy((key, value) -> value).count();// need to override value serde to Long typecounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));final KafkaStreams streams = new KafkaStreams(builder.build(), props);final CountDownLatch latch = new CountDownLatch(1);// attach shutdown handler to catch control-cRuntime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {@Overridepublic void run() {streams.close();latch.countDown();}});try {streams.start();latch.await();} catch (final Throwable e) {System.exit(1);}System.exit(0);}}
复制代码

启动该main方法,查看输出结果命令: bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
--topic streams-wordcount-output
--from-beginning
--formatter kafka.tools.DefaultMessageFormatter
--property print.key=true
--property print.value=true
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

kafka Streams实例相关推荐

  1. kafka streams学习笔记

    流式处理 流式处理是利用连续计算来处理无限数据流的能力,因为数据流是流动的.所以无须收集或存储数据以对其进行操作 这个弹珠图是流式处理的一个简单表示.图中每个圆圈代表某一特定时间点的某些信息或发生的事 ...

  2. Kafka Streams简介: 让流处理变得更简单

    Introducing Kafka Streams: Stream Processing Made Simple 这是Jay Kreps在三月写的一篇文章,用来介绍Kafka Streams.当时Ka ...

  3. Kafka Streams的容错机制

    Kafka Streams构建于Kafka本地集成的容错功能上.kafka分区具有高可用性和复制,因此当流数据持久保存到Kafka时,即使应用程序失败并需要重新处理时也可用.Kafka Streams ...

  4. kafka Streams

    目录 一.简介 1.概述 2.批处理和流计算 3.Kafka Streams介绍 特点 概念介绍 二.Kafka Streams示例 1.单词统计 2.求和 3.窗口操作 一.简介 1.概述 Kafk ...

  5. Kafka Streams开发者指南

    Kafka Streams 1.1 概述 Kafka Streams是一个客户端程序库,用于处理和分析存储在Kafka中的数据,并将得到的数据写回Kafka或发送到外部系统.Kafka Stream基 ...

  6. 大数据Hadoop之——Kafka Streams原理介绍与简单应用示例

    文章目录 一.Kafka Streams概述 1)Kafka Streams是什么 2)流式计算与批量计算区别 3)Kafka Streams特点 二.Kafka Streams流处理拓扑 1)相关概 ...

  7. Kafka Streams 核心讲解

    Kafka Stream 的特点如下: •Kafka Stream 提供了一个非常简单而轻量的 Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署•除了 Kafka 外, ...

  8. Kafka Streams(三十)

    Kafka Streams Kafka 一直被认为是一个强大的消息中间件,它实现了高吞吐.高可用和低延时的消息传输能力,这让它成为流式处理系统中完美的数据来源.目前通用的一些流式处理框架如 Apach ...

  9. Kafka Streams流式原理解析

    前言 本篇文章会从Kafka的核心流式计算原理进行分析,Kafka Streams Low-level processor API 和 核心概念,以及常见的应用场景分析 流式计算 通过业务场景去分析流 ...

最新文章

  1. 笑哭了,科研版《后浪》,那些人类积攒了几百年的文献,像是人类专门为你们准备的礼物...
  2. C++一天一个程序(三)
  3. javascript 动态修改css样式
  4. 几款优秀的点播、RTSP/RTMP直播播放器介绍
  5. 信息学奥赛一本通 1083:计算星期几 | OpenJudge NOI 小学奥数 7831
  6. Matplotlib 中文用户指南 8.1 屏幕截图
  7. 二叉树的应用 表达式处理_【每日编程208期】2018年408应用题41题
  8. html尾部代码_3分钟短文:Laravel Form,让你不再写 HTML 的好“库”
  9. 持久化数据结构(笔记)
  10. 4、组件注册-自定义TypeFilter指定过滤规则
  11. 啦啦外卖40.7 APP小程序三端 独立开源版本
  12. 兄弟9150cdn换硒鼓清零抹粉_兄弟打印机粉盒硒鼓怎么清零?这里有详细的步骤...
  13. 基于SpringBoot的房屋租赁管理系统
  14. 自动化测试工具Selenium Appium
  15. {电脑救助站}常用知识2
  16. 图片标注问题image_caption
  17. python爬虫爬取豆瓣电影为啥内容有缺失-Python爬虫之抓取豆瓣影评数据
  18. pure-ftpd安装与使用
  19. C/C++中的逻辑右移和算术右移
  20. js操作元素相关案例

热门文章

  1. 《蜗居》里唯一的真男人只有宋思明
  2. 3dmax:3dmax经典案例详细步骤图文教程之飞出镜头的字母效果动画
  3. element Tooltip背景颜色样式修改
  4. WMV格式的文件很大
  5. 广域网技术——SRv6隧道类型及数据转发
  6. 《提问的艺术:如何快速获得答案》(精读版)
  7. 学Python竟是为了防脱发!!你入坑Python的理由是什么?
  8. 舞钢LY225钢板低屈服抗震钢板介绍
  9. Awake/Start/OnEnable 辨析
  10. 电能表中的四象限解释