kafka--Struct Streaming--kafka案例
需求
从kafka读取数据,用Struct Streaming处理,然后保存到kafka
过滤出含有Comedy的行,再送给kafka
kafka数据格式如下
1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
4::Waiting to Exhale (1995)::Comedy|Drama
5::Father of the Bride Part II (1995)::Comedy
6::Heat (1995)::Action|Crime|Thriller
7::Sabrina (1995)::Comedy|Romance
8::Tom and Huck (1995)::Adventure|Children's
9::Sudden Death (1995)::Action
案例
启动kafka生产者
kafka-console-producer.sh \
--broker-list mypc01:9092,mypc02:9092,mypc03:9092 \
--topic pet
启动消费者
kafka-console-consumer.sh \
--bootstrap-server mypc01:9092,mypc02:9092,mypc03:9092 \
--topic pet
代码如下
import org.apache.spark.sql.{DataFrame, SparkSession}object KafkaSink extends App {//构建Spark Sessionprivate val session: SparkSession = SparkSession.builder().master("local[2]").appName("test").getOrCreate()private val df: DataFrame = session.readStream.format("kafka").option("kafka.bootstrap.servers", "mypc01:9092,mypc02:9092,mypc03:9092").option("subscribe", "pet").option("startingOffsets", "latest").load()import session.implicits._
//过滤行,并转为valueprivate val df1: DataFrame = df.selectExpr("cast(value as String)").as[String].filter((_.contains("Comedy"))).toDF("test").selectExpr("test as value")
//输出到kafkadf1.writeStream.format("kafka").option("checkpointLocation", "out3").option("topic", "pet").option("kafka.bootstrap.servers", "mypc01:9092").start().awaitTermination()
}
注意事项
输出的内容要合并为一个,否则会报错. 整体给起个别名value…必须的 坑贼大
Required attribute 'value' not found;
kafka--Struct Streaming--kafka案例相关推荐
- Spark Streaming 实战案例(五) Spark Streaming与Kafka
主要内容 Spark Streaming与Kafka版的WordCount示例(一) Spark Streaming与Kafka版的WordCount示例(二) 1. Spark Streaming与 ...
- Flume+Kafka+Spark Streaming+MySQL实时日志分析
文章目录 项目背景 案例需求 一.分析 1.日志分析 二.日志采集 第一步.代码编辑 2.启动采集代码 三.编写Spark Streaming的代码 第一步 创建工程 第二步 选择创建Scala工程 ...
- Kafka:ZK+Kafka+Spark Streaming集群环境搭建(九)安装kafka_2.11-1.1.0
如何搭建配置centos虚拟机请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.& ...
- Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十一)NIFI1.7.1安装
一.nifi基本配置 1. 修改各节点主机名,修改/etc/hosts文件内容. 192.168.0.120master192.168.0.121slave1192.168.0.122 slave2 ...
- spark spark streaming + kafka receiver方式消费消息
2019独角兽企业重金招聘Python工程师标准>>> kafka + spark streaming 集群 前提: spark 安装成功,spark 1.6.0 zookeeper ...
- Scala Spark Streaming + Kafka + Zookeeper完成数据的发布和消费
一.Spark Streaming Spark Streaming是核心Spark API的扩展,可实现实时数据流的可扩展,高吞吐量,容错流处理.数据可以从许多来源(如Kafka,Flume,Kine ...
- kafka channle的应用案例
kafka channle的应用案例 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 最近在新公司负责大数据平台的建设,平台搭建完毕后,需要将云平台(我们公司使用的Ucloud的 ...
- spark streaming kafka Couldn't find leader
问题描述: 使用spark streaming接受kafka数据(使用direct方式)报错 Couldn't find leader offsets for Set([test,0], [test, ...
- Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十二)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网。...
Centos7出现异常:Failed to start LSB: Bring up/down networking. 按照<Kafka:ZK+Kafka+Spark Streaming集群环境搭 ...
- 使用spark.streaming.kafka.consumer.poll.ms和reconnect.backoff.ms解决spark streaming消费kafka时任务不稳定的问题
问题描述 在用spark streaming程序消费kafka的数据时,遇到了一个神奇的现象:同样的数据量.相似的数据,在消费时,有些批次的数据在做map操作时神奇的多了40多秒,具体看下面的数据:在 ...
最新文章
- 计算机网络09年考研题,计算机网络考研真题及答案
- Squid 反向代理服务器配置
- Python实现二叉树的三种深度遍历方法!
- Java小数中的四舍五入
- en_core_web_sm下载
- Cypress 里的 ensureAttached 检测原理
- redis源码剖析(八)—— 当你启动Redis的时候,Redis做了什么
- 2.1 vector
- python调用c++的库传递二级指针
- 命令行参数怎么输入_太好用了!谷歌开源的命令行接口工具fire
- minimum在java中的意思_Java Calendar getMinimum()用法及代码示例
- TCP、UDP相关协议使用的端口号
- LightGBM(lgb)详解
- /31位掩码实验演示
- atitit.eclipse 新特性总结3.1--4.3
- 剧透和评析之車輪の国、向日葵の少女
- RGB-D立体视觉导航之图像预处理
- GBase 8s 客户端工具之dbaccess
- 设置 SSH 通过密钥登录
- PC(Ubuntu)和树莓派实现无秘ssh
热门文章
- 电脑怎么卸载软件干净_电脑卸载软件怎么卸载?
- Centos7快速安装gcc8.3.1
- 交互设计软件 html,交互设计工具篇
- oracle改成归档模式_将Oracle数据库改为归档模式并启用Rman备份
- gispython定义查询_定义查询方法
- 电脑画画软件_绘画软件分享,还不过来看!
- mysql maven 自动生成_【图文经典版】maven自动生成dao层
- c++ regex 替换汉字_C++ - 正则表达式(regex) 替换(replace) 的 详解 及 代码
- C语言求二阶矩阵最小值,C语言科学计算入门之矩阵乘法的相关计算
- redis 计数器 java_Redis介绍