Structured Streaming整合kafka

Spark2.0以后开始推出Structured Streaming,详情参考上文Spark2.0 Structured Streaming。本文介绍一种常用的方式: Structured Streaming读取kafka数据,并使用spark sql过滤,最终输出到终端。

本示例能够读取多个topic数据,并分别映射为Spark内存表,执行多个sql。

1.环境

(1)版本

Spark版本:spark-2.2.0

Scala版本:scala-2.11.4

Kafka版本:kafka 0.10 or higher

(2)数据

Kafka topic:dyl_test01

数据内容:{"a":"1","b":"2","c":"2018-01-08","d":[23.9,45]}

2.基本介绍

2.1 Linking

使用maven/SBT的项目,需要引用如下的库文件:

groupId = org.apache.spark

artifactId = spark-sql-kafka-0-10_2.11

version = 2.2.1

其他jar

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>${spark.version}</version>
      <scope>${scope}</scope>

</dependency>

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>${spark.version}</version>
      <scope>${scope}</scope>

</dependency>

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>${spark.version}</version>
      <scope>${scope}</scope>

</dependency>

2.2 Reading Data from Kafka

(1)直接从Kafka读取数据,并且进行查询:

// Subscribe to 1 topic

val df= spark

.readStream

.format("kafka")

.option("kafka.bootstrap.servers","host1:port1,host2:port2")

.option("subscribe","topic1")

.load()

df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")

.as[(String,String)]

// Subscribe to multiple topics

val df= spark

.readStream

.format("kafka")

.option("kafka.bootstrap.servers","host1:port1,host2:port2")

.option("subscribe","topic1,topic2")

.load()

df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")

.as[(String,String)]

// Subscribe to a pattern

val df= spark

.readStream

.format("kafka")

.option("kafka.bootstrap.servers","host1:port1,host2:port2")

.option("subscribePattern","topic.*")

.load()

df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")

.as[(String,String)]

这个没什么说的,简单的设置kafka集群参数以及topic,然后进行查询,df.selectExpr中能够使用sql里的语法,df.select里只能设置选择的字段。设置多个kafka topic时,可以逗号分割,或者正则匹配,这时候,所有topic的数据都会写入到一张表中,很多时候,不同topic更想映射到不同的表中,稍后说明。先来看看直接读取Kafka数据并输出时,结果如何,如下图2.1所示:

图2.1 直接读取kafka数据输出

如上图2.1,是没有执行selectExpr("CAST(value AS STRING)"的结果,value是字节数组,所以,执行CAST的作用是将value转换为字符串,结果如下2.2所示:

图2.2 value转为字符串

此时,selectExpr中只选择了key和value,所以结果中也只有key和value字段。读取kafka时,默认并不会解析kafka的数据内容,而是直接将kafka数据放到value列中显示出来,关于其他几个字段如下表2.1所示:

表2.1 默认内存表schema

Column                                                                                                                   

Type                                                                                                                      

key

binary

value

binary

topic

string

partition

int

offset

long

timestamp

long

timestampType

int

关于Kafka source的详细配置,可以参考官网:Kafka Integration Guide (Kafka broker version 0.10.0 or higher)。这里不多介绍,kafka的数据被写入value,其实对用户而言,并没有什么用,除非使用正则去匹配value的内容,我们更想解析value的内容,将value的内容映射为一张表,如上,value内容是JSON格式数据,解析方式也很简单,使用from_json函数即可,这里有个例子可以参考:kafka-spark-structured-streaming-example,主要内容如下所示:

valschema=StructType(Seq(

StructField("schema",StringType, true),

StructField("payload",StringType, true)

))

 
 

valdf= ds1.selectExpr("cast (value as string) as json")

.select(from_json($"json",schema=schema).as("data"))

.select("data.payload")

首先,将value字段转换为字符串,重名了为"json",然后使用from_json函数,应用预定义的schema,解析json数据,将结果重命名为"data",接下来就可以直接使用json中的字段了。

这里有一个比较纠结的地方,必须预定义json的格式,而不是像spark Streaming一样,提供了自动推测json格式的功能,让我一度觉得Structured Streaming太局限,不实用,经过查证以及思考,觉悟这原来是一个优化,json格式自动推测的前提是遍历所有数据,收集信息才能正确推测json的字段和字段类型,这无疑是很慢的,也很消耗资源,实际上,json格式字段事先基本上都是能够知道的,发生更改的情况很少,但是,还是有可能发生的,目前,我没找到如何动态更改json schema的方式,提供了Schema之后,spark生成的任务计划中已经写死了json数据解析的字段名和字段类型,当查询任务start之后,则使用该任务计划解析数据,并没有提供修改scheme的接口,但是通过查看源码,发现有个RuntimeReplaceable Expression,例子是spark的nvl函数,好像可以运行时更改选择列的表达式,没细看,知道的朋友交流一下,也因此,实际中并没有使用Structured Steamming,因为项目中需要修改json的解析方式。

经过from_json解析之后,结果如下图2.3所示:

图2.3 from_json解析数据

完整代码如下:

  1. package com.dyl.pwrd  
  2. import org.apache.spark.SparkConf  
  3. import org.apache.spark.sql.functions._  
  4. import org.apache.spark.sql.streaming.StreamingQuery  
  5. import org.apache.spark.sql.types._  
  6. import org.apache.spark.sql.SparkSession  
  7. import scala.collection.mutable  
  8. /** 
  9.   * Created by dongyunlong on 2018/1/5. 
  10.   * {"a":"1","b":"2","c":"2018-01-08","d":[23.9,45]} 
  11.   */  
  12. object SparkStructuredStreaming {  
  13.   /** 
  14.     * 创建SparkSession 
  15.     * @return 
  16.     */  
  17.   def getSparkSession={  
  18.     SparkSession  
  19.       .builder()  
  20.       .config(new SparkConf().setMaster("local[2]"))  
  21.       .appName(getClass.getName)  
  22.       .getOrCreate()  
  23.   }  
  24.   /** 
  25.     * 解析kafka json数据,并将其映射为spark临时表 
  26.     * @param spark 
  27.     * @param kafkaTopic 
  28.     * @param sourceName 
  29.     */  
  30.   def createOrReplaceTempView(spark:SparkSession, kafkaTopic:String, sourceName:String): Unit ={  
  31.     import spark.implicits._  
  32.     val df = spark  
  33.       .readStream  
  34.       .format("kafka")  
  35.       .option("kafka.bootstrap.servers", "XX.XX.XX.XX:9092")  
  36.       .option("subscribe", kafkaTopic)  
  37.       .option("startingOffsets", "earliest")  
  38.     .load()  
  39. //    val schema = SocSchemaCollection.getSchemaBySourceName(sourceName) //从数据库加载json schema  
  40.     val schema = StructType(mutable.Seq(  
  41.       StructField("a", DataTypes.StringType),  
  42.       StructField("b", DataTypes.StringType),  
  43.       StructField("c", DataTypes.StringType),  
  44.       StructField("d", DataTypes.createArrayType(DataTypes.StringType))  
  45.     ))  
  46.     if(schema != null){  
  47.       val jsonDf = df.selectExpr("CAST(key AS STRING)", "cast (value as string) as json")  
  48.           .select(from_json($"json", schema=schema).as("data"))  
  49.       jsonDf.select("data.*").createOrReplaceTempView(sourceName)  
  50.     }else{  
  51.       println("error,schema is null")  
  52.     }  
  53.   }  
  54.   /** 
  55.     * 输出spark sql的查询结果 
  56.     * @param spark 
  57.     * @param sql 
  58.     * @return 
  59.     */  
  60.   def sqlWriteStream(spark:SparkSession, sql:String): StreamingQuery ={  
  61.     val query = spark.sql(sql)  
  62.       .writeStream  
  63.       .outputMode("append")  
  64.       .format("console")  
  65.       .start()  
  66.     query  
  67.   }  
  68.   /** 
  69.     * 注册spark临时表,执行sql语句,注意这里每一个sql都是一个writeStream,最后使用spark.streams.awaitAnyTermination()等待所有查询 
  70.     * @param spark 
  71.     */  
  72.   def sparkReadKafka(spark:SparkSession): Unit ={  
  73.     createOrReplaceTempView(spark, "dyl_test01", "dyl_test")  
  74.     val sqls = Array("select * from dyl_test","select *,'2' as e from dyl_test")  
  75.     val querys = mutable.ListBuffer[StreamingQuery]()  
  76.     for(sql <- sqls){  
  77.       println(sql)  
  78.       querys += sqlWriteStream(spark, sql)  
  79.     }  
  80.   }  
  81.   /** 
  82.     * 主函数 
  83.     * @param args 
  84.     */  
  85.   def main(args: Array[String]) {  
  86.     println("hello world")  
  87.     val spark = getSparkSession  
  88.     sparkReadKafka(spark)  
  89.     spark.streams.awaitAnyTermination()  
  90.   }  
  91. }  

Structured Streaming整合kafka相关推荐

  1. 2021年大数据Spark(四十九):Structured Streaming 整合 Kafka

    目录 整合 Kafka 说明 Kafka特定配置 ​​​​​​​KafkaSoure 1.消费一个Topic数据 2.消费多个Topic数据 3.消费通配符匹配Topic数据 ​​​​​​​Kafka ...

  2. Structured Streaming 整合 Kafka指南

    用于 Kafka 0.10 的结构化流式处理集成,用于从 Kafka 读取数据和写入数据. 从kafka读取数据 // Subscribe to 1 topic val df = spark.read ...

  3. kafka spark Structured streaming整合后集群报错KafkaConsumer.subscribe(Ljava/util/Collection;)V

    简介 整个项目架构是在CDH中,,然后spark Structured streaming消费kafka. spark 2.3版本 kafka0.10版本 <!-- spark sql kafk ...

  4. Structured Streaming从Kafka 0.8中读取数据的问题

    众所周知,Structured Streaming默认支持Kafka 0.10,没有提供针对Kafka 0.8的Connector,但这对高手来说不是事儿,于是有个Hortonworks的邵大牛(前段 ...

  5. spark streaming 整合kafka 报错 KafkaConsumer is not safe for multi-threaded access

    问题描述 spark streaming 使用 直连方式 读取kafka 数据,使用窗口时出现 java.util.ConcurrentModificationException: KafkaCons ...

  6. 大数据Spark Structured Streaming集成 Kafka

    目录 1 Kafka 数据消费 2 Kafka 数据源 3 Kafka 接收器 3.1 配置说明 3.2 实时数据ETL架构 3.3 模拟基站日志数据 3.4 实时增量ETL 4 Kafka 特定配置 ...

  7. 7.1.5 智慧物流【车辆监控Structured Streaming、整合kafka、Redis、Mysql、HBASE 写入数据】

    车辆监控 文章目录 车辆监控 第一节 Structured Streaming 1.1 Structured Streaming发展历史 1.1.1 Spark Streaming 1.1.2 Dat ...

  8. Structured Streaming基础入门

    Structured Streaming 1. 回顾和展望 1.1. Spark 编程模型的进化过程 RDD rdd.flatMap(_.split(" ")).map((_, 1 ...

  9. Structured Streaming + Kafka测试

    前言 Structured Streaming出来有几年了,一直没有机会使用,最近闲来无事,就想先测试一下,完全没有细看关于它的一些详细介绍情况,仅仅想根据官网案例,执行一遍,没想到- copy官网的 ...

最新文章

  1. 如何使用Cisco命令阻止访问特定网站
  2. 脑机接口,风口还是入口?
  3. 使用STM32CubeMX,生成STM32F103ZE SPI3 HAL 工程
  4. 【LeetCode】0103.二叉树的锯齿形层序遍历
  5. .NET技术之美-网络编程
  6. Imation亏损额急剧增长 CEO仍表示“成功”
  7. [Golang] GoConvey测试框架使用指南
  8. 【TensorFlow-windows】学习笔记三——实战准备
  9. 一个java工程师必知的安全意识(信息传输篇)
  10. serversocket中的backlog是什么_输入网址按回车,到底发生了什么
  11. 【SSL Certificates】什么是数字证书(Certificates)?
  12. 【HTML5】创造一款成功HTML5游戏的完整指南
  13. 直播问答的后博弈时代:社交化或许才是未来
  14. OpenCV3车牌识别(C++版)
  15. 约瑟夫生者死者游戏:有N个旅客同乘一条船,因为严重超载,加上风高浪大,危险万分;因此船长告诉乘客,只有将全船一半的旅客投入海中,其余人才能幸免于难;无奈,大家只得同意这种办法,并议定N个人围成一圈,由
  16. linux共享打印机smb,centos7 安装cups+smb共享打印机
  17. 下一代通信的野心:超奈奎斯特!超香农?
  18. python中格式化输出是什么意思_python中的格式化输出用法总结
  19. Redis是什么?看这一篇就够了
  20. 新型激光辨识算法助力机器人逃离死胡同

热门文章

  1. 最新2022【iOS开发面试真题】初级,中级,高级
  2. 钉钉 阿里 The request signature we calculated does not match the signature you provide
  3. A5M2里的数据怎么排序显示
  4. 您的代码,是在接飞刀吗?
  5. GridView触发SelectedIndexChanged事件
  6. windows server 2003 复制其他电脑的文件到服务器,访问windows server 2003共享文件夹特别慢...
  7. Win10笔记本开启WiFi提示“无法启动承载网络”解决方法
  8. SWD Registers
  9. PROFIBUS和PROFINET两种通信协议的区别比较
  10. 嵌入式开发常见英文单词及缩写