需求

从kafka读取数据,用Struct Streaming处理,然后保存到kafka
过滤出含有Comedy的行,再送给kafka
kafka数据格式如下

1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
4::Waiting to Exhale (1995)::Comedy|Drama
5::Father of the Bride Part II (1995)::Comedy
6::Heat (1995)::Action|Crime|Thriller
7::Sabrina (1995)::Comedy|Romance
8::Tom and Huck (1995)::Adventure|Children's
9::Sudden Death (1995)::Action

案例

启动kafka生产者

kafka-console-producer.sh \
--broker-list mypc01:9092,mypc02:9092,mypc03:9092 \
--topic pet

启动消费者

kafka-console-consumer.sh \
--bootstrap-server mypc01:9092,mypc02:9092,mypc03:9092 \
--topic pet

代码如下

import org.apache.spark.sql.{DataFrame, SparkSession}object KafkaSink extends App {//构建Spark Sessionprivate val session: SparkSession = SparkSession.builder().master("local[2]").appName("test").getOrCreate()private val df: DataFrame = session.readStream.format("kafka").option("kafka.bootstrap.servers", "mypc01:9092,mypc02:9092,mypc03:9092").option("subscribe", "pet").option("startingOffsets", "latest").load()import session.implicits._
//过滤行,并转为valueprivate val df1: DataFrame = df.selectExpr("cast(value as String)").as[String].filter((_.contains("Comedy"))).toDF("test").selectExpr("test as value")
//输出到kafkadf1.writeStream.format("kafka").option("checkpointLocation", "out3").option("topic", "pet").option("kafka.bootstrap.servers", "mypc01:9092").start().awaitTermination()
}

注意事项

输出的内容要合并为一个,否则会报错. 整体给起个别名value…必须的 坑贼大

Required attribute 'value' not found;

kafka--Struct Streaming--kafka案例相关推荐

  1. Spark Streaming 实战案例(五) Spark Streaming与Kafka

    主要内容 Spark Streaming与Kafka版的WordCount示例(一) Spark Streaming与Kafka版的WordCount示例(二) 1. Spark Streaming与 ...

  2. Flume+Kafka+Spark Streaming+MySQL实时日志分析

    文章目录 项目背景 案例需求 一.分析 1.日志分析 二.日志采集 第一步.代码编辑 2.启动采集代码 三.编写Spark Streaming的代码 第一步 创建工程 第二步 选择创建Scala工程 ...

  3. Kafka:ZK+Kafka+Spark Streaming集群环境搭建(九)安装kafka_2.11-1.1.0

    如何搭建配置centos虚拟机请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.& ...

  4. Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十一)NIFI1.7.1安装

    一.nifi基本配置 1. 修改各节点主机名,修改/etc/hosts文件内容. 192.168.0.120master192.168.0.121slave1192.168.0.122 slave2 ...

  5. spark spark streaming + kafka receiver方式消费消息

    2019独角兽企业重金招聘Python工程师标准>>> kafka + spark streaming 集群 前提: spark 安装成功,spark 1.6.0 zookeeper ...

  6. Scala Spark Streaming + Kafka + Zookeeper完成数据的发布和消费

    一.Spark Streaming Spark Streaming是核心Spark API的扩展,可实现实时数据流的可扩展,高吞吐量,容错流处理.数据可以从许多来源(如Kafka,Flume,Kine ...

  7. kafka channle的应用案例

      kafka channle的应用案例 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 最近在新公司负责大数据平台的建设,平台搭建完毕后,需要将云平台(我们公司使用的Ucloud的 ...

  8. spark streaming kafka Couldn't find leader

    问题描述: 使用spark streaming接受kafka数据(使用direct方式)报错 Couldn't find leader offsets for Set([test,0], [test, ...

  9. Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十二)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网。...

    Centos7出现异常:Failed to start LSB: Bring up/down networking. 按照<Kafka:ZK+Kafka+Spark Streaming集群环境搭 ...

  10. 使用spark.streaming.kafka.consumer.poll.ms和reconnect.backoff.ms解决spark streaming消费kafka时任务不稳定的问题

    问题描述 在用spark streaming程序消费kafka的数据时,遇到了一个神奇的现象:同样的数据量.相似的数据,在消费时,有些批次的数据在做map操作时神奇的多了40多秒,具体看下面的数据:在 ...

最新文章

  1. 计算机网络09年考研题,计算机网络考研真题及答案
  2. Squid 反向代理服务器配置
  3. Python实现二叉树的三种深度遍历方法!
  4. Java小数中的四舍五入
  5. en_core_web_sm下载
  6. Cypress 里的 ensureAttached 检测原理
  7. redis源码剖析(八)—— 当你启动Redis的时候,Redis做了什么
  8. 2.1 vector
  9. python调用c++的库传递二级指针
  10. 命令行参数怎么输入_太好用了!谷歌开源的命令行接口工具fire
  11. minimum在java中的意思_Java Calendar getMinimum()用法及代码示例
  12. TCP、UDP相关协议使用的端口号
  13. LightGBM(lgb)详解
  14. /31位掩码实验演示
  15. atitit.eclipse 新特性总结3.1--4.3
  16. 剧透和评析之車輪の国、向日葵の少女
  17. RGB-D立体视觉导航之图像预处理
  18. GBase 8s 客户端工具之dbaccess
  19. 设置 SSH 通过密钥登录
  20. PC(Ubuntu)和树莓派实现无秘ssh

热门文章

  1. 电脑怎么卸载软件干净_电脑卸载软件怎么卸载?
  2. Centos7快速安装gcc8.3.1
  3. 交互设计软件 html,交互设计工具篇
  4. oracle改成归档模式_将Oracle数据库改为归档模式并启用Rman备份
  5. gispython定义查询_定义查询方法
  6. 电脑画画软件_绘画软件分享,还不过来看!
  7. mysql maven 自动生成_【图文经典版】maven自动生成dao层
  8. c++ regex 替换汉字_C++ - 正则表达式(regex) 替换(replace) 的 详解 及 代码
  9. C语言求二阶矩阵最小值,C语言科学计算入门之矩阵乘法的相关计算
  10. redis 计数器 java_Redis介绍