文章目录

  • Structured Streaming
    • 简介
    • 快速入门
    • Programming Model(编程模型)
      • 1.输入表
      • 2.结果表
      • 3.输出方式
    • Kafka Source
    • Foreach(单行)|ForeachBatch(多行) sink(输出)
      • foreach sink 会遍历表中的每一行, 允许将流查询结果按开发者指定的逻辑输出。
      • ForeachBatch Sink 是 spark 2.4 才新增的功能, 该功能只能用于输出批处理的数据。
    • 基于 event-time 的窗口操作
    • 基于 Watermark 处理延迟数据

Structured Streaming

简介

spark streaming (spark 1.6 引入 使用批处理模拟流式计算) DStream (离散流)

structured streaming (结构化流 spark2.0引入)

structured streaming是构建在spark sql之上的流式计算模型

​ 从 spark2.0 开始, spark 引入了一套新的流式计算模型: Structured Streaming.

Structured Streaming 基于 Spark SQl 引擎, 是一个具有弹性和容错的流式处理引擎. 使用 Structure Streaming 处理流式计算的方式和使用批处理计算静态数据(表中的数据)的方式是一样的.

spark streaming和structured streaming

1.简化编码

spark streaming是基于rdd算子操作的

structured streaming是基于spark sql操作的

2.关于数据的时间

spark streaming 基于 processing time将数据落入window

structured streaming是基于event time将数据落入window

基于event time的好处:对于延迟数据可以落入他本该的统计窗口中

​ 对于一些“过分”延迟的数据,就可以丢弃了,当然通过学习我们会发现,丢弃的不是数据,而是窗口

快速入门

  • 添加依赖

    <dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.4.3</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.4.3</version>
    </dependency>
    
  • 代码

    package structedstreaming
    import org.apache.spark.sql.streaming.OutputMode
    import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}object StructuredStreamingSocket {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().master("local[*]").appName("StructuredStreamingSocket").getOrCreate()import spark.implicits._val df1: DataFrame = spark.readStream.format("socket").option("host","spark56")       //监听的主机名.option("port","9999")          //监听的端口号.load()//df1.map(v=> v.getAs[String](0).split(" "))//df1.map(v=> v.getAs[String]("value").split(" "))//官方写法   转换为DataSet,DataFrame里面类型转换为需要的类型//基于DSL//val df2: DataFrame = df1.as[String].flatMap(_.split(" ")).groupBy("value").count()//基于SQL语句方式val ds: Dataset[String] = df1.as[String].flatMap(_.split(" "))ds.createOrReplaceTempView("t_words")val df2: DataFrame = spark.sql("select value,count(*) c from t_words group by value")//当流式处理没有聚合操作的时候,Complete输出模式不被支持df2.writeStream.outputMode(OutputMode.Complete())      //写出数据的模式    Append()  Complete()  Update().format("console")                        //往哪里写.start()                                  //启动流式计算.awaitTermination()                     //如果不停止,将一直运行}
    }
    

    虚拟机运行nc -lk 9999,并查看运行结果

代码说明:

  • DataFrame df1 表示一个“无界表(unbounded table)”, 存储着流中所有的文本数据. 这个无界表包含列名为value的一列数据, 数据的类型为String, 而且在流式文本数据中的每一行(line)就变成了无界表中的的一行(row). 注意, 这时我们仅仅设置了转换操作, 还没有启动它, 所以现在还没有收到任何数据
  • 紧接着我们把 DateFrame 通过 .as[String] 变成了 DataSet, 所以我们可以切割每行为多个单词.得到的 words DataSet包含了所有的单词
  • 最后, 我们通过value(每个唯一的单词)进行分组得到wordCounts DataFrame, 并且统计每个单词的个数. 注意, wordCounts是一个流式DataFrame, 它表示流中正在运行的单词数(the running word counts of the stream).
  • 我们必须在流式数据(streaming data)上启动查询. 剩下的实际就是开始接收数据和计算个数. 为此, 当数据更新的时候, 我们通过outputMode(“complete”)来打印完整的计数集到控制台,
    然后通过.start来启动流式计算.
  • 代码执行之后, 流式计算将会在后台启动. 查询对象(query: StreamingQuery)可以激活流式查询(streaming
    query), 然后通过awaitTermination()来等待查询的终止,从而阻止查询激活之后进程退出.

Programming Model(编程模型)

Structured Streaming 的核心思想是:把持续不断的流式数据当做一个不断追加的表.

这使得新的流式处理模型同批处理模型非常相像. 我们可以表示我们的流式计算类似于作用在静态数据表上的标准批处理查询, spark 在一个无界表上以增量查询的方式来运行.

基本概念

1.输入表

把输入数据流当做输入表(Input Table). 到达流中的每个数据项(data item)类似于被追加到输入表中的一行.

2.结果表

作用在输入表上的查询将会产生“结果表(Result Table)”. 每个触发间隔(trigger interval, 例如 1s), 新行被追加到输入表, 最终会更新结果表. 无论何时更新结果表, 我们都希望将更改的结果行写入到外部接收器(external sink)

3.输出方式

输出(Output)定义为写到外部存储. 输出模式(outputMode)有 3 种:

  1. Complete Mode 整个更新的结果表会被写入到外部存储. 存储连接器负责决定如何处理整个表的写出(类似于 spark streaming 中的有转态的转换).

    Complete Mode必须使用聚合函数,否则输出报错

  2. Append Mode 从上次触发结束开始算起, 仅仅把那些新追加到结果表中的行写到外部存储(类似于无状态的转换). 该模式仅适用于不会更改结果表中行的那些查询. (如果有聚合操作, 则必须添加 wartemark, 否则不支持此种模式)

    Append Mode如果使用聚合,必须配合wartemark(水印/水位线)一起使用。Append Mode一般都是配合wartemark使用

  3. Update Mode 从上次触发结束开始算起, 仅仅在结果表中更新的行会写入到外部存储. 此模式从 2.1.1 可用. 注意, Update Mode 与 Complete Mode 的不同在于 Update Mode 仅仅输出改变的那些行. 如果查询不包括聚合操作, 则等同于 Append Mode

    推荐使用Update Mode (常用)

    一种情况新值,另一种情况原有值改变。

Kafka Source

  • 添加依赖

    <dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_2.11</artifactId><version>2.4.3</version>
    </dependency>
    
  • 获取kafka数据

    package structedstreaming
    import org.apache.spark.sql.streaming.OutputMode
    import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
    object StructuredStreamingKafka {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().master("local[*]").appName("StructuredStreamingKafka").getOrCreate()import spark.implicits._val df1: DataFrame = spark.readStream.format("kafka").option("kafka.bootstrap.servers","kafka51:9092")   //kafka的机器.option("subscribe","topica")                       //订阅的topic.load().selectExpr("cast(value as string)")                  //对获取的数据进行反序列化//df1.map(v=> v.getAs[String](0).split(" "))//df1.map(v=> v.getAs[String]("value").split(" "))//官方写法   转换为DataSet,DataFrame里面类型转换为需要的类型//基于DSL//val df2: DataFrame = df1.as[String].flatMap(_.split(" ")).groupBy("value").count()//基于SQL语句方式val ds: Dataset[String] = df1.as[String].flatMap(_.split(" "))ds.createOrReplaceTempView("t_words")val df2: DataFrame = spark.sql("select value,count(*) c from t_words group by value")//当流式处理没有聚合操作的时候,Complete输出模式不被支持df2.writeStream.outputMode(OutputMode.Update())       //写出数据的模式    Append()  Complete()  Update().format("console")                       //往哪里写.start()                               //启动流式计算.awaitTermination()                    //如果不停止,将一直运行}
    }
    
  • 启动kafka生产者模拟发送数据

    [root@kafka51 kafka0.11]# bin/kafka-console-producer.sh --broker-list kafka51:9092 --topic topica
    >hello hello
    >world world
    >hello world
    
  • 运行程序查看结果

    -------------------------------------------
    Batch: 1
    -------------------------------------------
    +-----+---+
    |value|  c|
    +-----+---+
    |hello|  2|
    +-----+---+
    -------------------------------------------
    Batch: 2
    -------------------------------------------
    +-----+---+
    |value|  c|
    +-----+---+
    |world|  2|
    +-----+---+
    -------------------------------------------
    Batch: 3
    -------------------------------------------
    +-----+---+
    |value|  c|
    +-----+---+
    |hello|  3|
    |world|  3|
    +-----+---+
    

Foreach(单行)|ForeachBatch(多行) sink(输出)

foreach sink 会遍历表中的每一行, 允许将流查询结果按开发者指定的逻辑输出。

需求:把 wordcount 数据写入到 mysql

步骤 1: 添加 mysql 驱动

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version>
</dependency>

步骤 2: 在 mysql 中创建表

create table word_count(word varchar(255) primary key not null, count bigint not null
);

步骤 3: 实现代码

a、定义WordCountWriter类

package structedstreaming.day1
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.sql.{ForeachWriter, Row}
class MysqlWrite extends ForeachWriter[Row]{var conn:Connection = _var ps:PreparedStatement = _//打开一个连接,数据库连接override def open(partitionId: Long, epochId: Long): Boolean = {conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test1?useSSL=false", "root", "123456")// 插入数据, 当有重复的 key 的时候更新数据ps = conn.prepareStatement("insert into word_count values(?,?) on duplicate key update word=?, count=?")conn!=null && !conn.isClosed && ps != null}//处理阶段override def process(value: Row): Unit = {//value就是处理的结果数据    单词  次数println(value)val word: String = value.getAs[String](0)//val word: String = value.getAs(0)val count: Long = value.getAs[Long](1)//val count: Long = value.getAs(1)//设置占位符ps.setString(1,word)ps.setLong(2,count)ps.setString(3,word)ps.setLong(4,count)ps.executeUpdate()}//关闭连接override def close(errorOrNull: Throwable): Unit = {ps.close()conn.close()}
}

b、定义测试类

package structedstreaming
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import structedstreaming.day1.MysqlWrite
object StructuredStreamingMysql {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().master("local[*]").appName("StructuredStreamingMysql").getOrCreate()import spark.implicits._val df1: DataFrame = spark.readStream.format("kafka").option("kafka.bootstrap.servers","kafka51:9092")   //kafka的机器.option("subscribe","topica")                       //订阅的topic.load().selectExpr("cast(value as string)")        //对获取的数据进行反序列化//df1.map(v=> v.getAs[String](0).split(" "))//df1.map(v=> v.getAs[String]("value").split(" "))//官方写法   转换为DataSet,DataFrame里面类型转换为需要的类型//基于DSL//val df2: DataFrame = df1.as[String].flatMap(_.split(" ")).groupBy("value").count()//基于SQL语句方式val df2: DataFrame = df1.as[String].flatMap(_.split(" ")).groupBy("value").count()//当流式处理没有聚合操作的时候,Complete输出模式不被支持df2.writeStream.outputMode(OutputMode.Update())      //写出数据的模式    Append()  Complete()  Update()//.format("console")                  //外围设备,format只针对控制台.foreach(new MysqlWrite)              //外围设备.start()                              //启动流式计算.awaitTermination()                   //如果不停止,将一直运行}
}

c、运行kafka,在运行程序,查看数据库

ForeachBatch Sink 是 spark 2.4 才新增的功能, 该功能只能用于输出批处理的数据。

需求:将统计结果同时输出到本地文件和 mysql 中

package structedstreaming
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, SparkSession}
import structedstreaming.day1.MysqlWrite
object StructuredStreamingForeachBatch {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().master("local[*]").appName("StructuredStreamingForeachBatch").getOrCreate()import spark.implicits._val df1: DataFrame = spark.readStream.format("kafka").option("kafka.bootstrap.servers","kafka51:9092")   //kafka的机器.option("subscribe","topica")                       //订阅的topic.load().selectExpr("cast(value as string)")        //对获取的数据进行反序列化//df1.map(v=> v.getAs[String](0).split(" "))//df1.map(v=> v.getAs[String]("value").split(" "))//官方写法   转换为DataSet,DataFrame里面类型转换为需要的类型//基于DSL//val df2: DataFrame = df1.as[String].flatMap(_.split(" ")).groupBy("value").count()//基于SQL语句方式val df2: DataFrame = df1.as[String].flatMap(_.split(" ")).groupBy("value").count()//当流式处理没有聚合操作的时候,Complete输出模式不被支持df2.writeStream.outputMode(OutputMode.Update())      //写出数据的模式    Append()  Complete()  Update()//.format("console")                  //外围设备,format只针对控制台.foreach(new MysqlWrite)              //外围设备    [基于row行的操作].foreachBatch((df,batchId)=> {        //外围设备    基于dataFrame处理,可以包含多行rowif(df.count()!=0){df.write.json(s"file:///D:/test/b.json-${batchId}")}}).start()                              //启动流式计算.awaitTermination()                   //如果不停止,将一直运行}
}

foreach与ForeachBatch Sink的区别

1.foreach Sink要自定义类并继承ForeachWriter这个类,ForeachBatch Sink直接使用。
2.foreach里面有三个关键方法(open、process、close)并基于行处理,ForeachBatch基于dataFrame处理,可以包含多行row,还带有一个批次的id即batchId。

基于 event-time 的窗口操作

在 Structured Streaming 中, 可以按照事件发生时的时间对数据进行聚合操作, 即基于 event-time (产生这条数据的时间)进行操作。

备注: spark streaming(DStream,离散化流)并不基于event time操作,而是基于

Processing-Time(处理时间)。

在这种机制下, 即不必考虑 Spark 陆续接收事件的顺序是否与事件发生的顺序一致, 也不必考虑事件到达 Spark 的时间与事件发生时间的关系.

因此, 它在提高数据处理精度的同时, 大大减少了开发者的工作量.

我们现在想计算 10 分钟内的单词, 每 5 分钟更新一次, 也就是说在 10 分钟窗口 12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20等之间收到的单词量. 注意, 12:00 - 12:10 表示数据在 12:00 之后 12:10 之前到达.

现在,考虑一下在 12:07 收到的单词。单词应该增加对应于两个窗口12:00 - 12:10和12:05 - 12:15的计数。

因此,计数将由分组键(即单词)和窗口(可以从事件时间计算)索引。

  • 统计后的结果应该是这样的:
package structedstreaming.day2
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession, functions}
import org.apache.spark.sql.streaming.OutputMode
object Test_Window {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().master("local[*]").appName("Test_Window").getOrCreate()import spark.implicits._val df1: DataFrame = spark.readStream.format("socket")              // 设置数据源.option("host","spark56")     //监听的主机名.option("port","9999")        //监听的端口号.load()//hello,2021-8-04 16:40:20val df2: DataFrame = df1.as[String].map(v => {val strings: Array[String] = v.split(",")(strings(0), strings(1))}).toDF("word", "timestamp")//根据时间列生成窗口,进而根据窗口+单词进行分组val df3: DataFrame = df2.groupBy(functions.window($"timestamp", "10 seconds", "5 seconds"),$"word").count()df3.writeStream.outputMode(OutputMode.Update()).format("console").option("truncate",false)       //显示结果不让阶段.start().awaitTermination()}
}
模拟数据发送并查看结果:
[root@spark56 ~]# nc -lk 9999
hello,2021-8-14 17:02:40
//运行结果:会统计在两个窗口内
Batch: 0
-------------------------------------------
+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+-----+-----+
|window                                    |word |count|
+------------------------------------------+-----+-----+
|[2021-08-14 17:02:40, 2021-08-14 17:02:50]|hello|1    |
|[2021-08-14 17:02:35, 2021-08-14 17:02:45]|hello|1    |
+------------------------------------------+-----+-----+

基于 Watermark 处理延迟数据

在数据分析系统中, Structured Streaming 可以持续的按照 event-time 聚合数据, 然而在此过程中并不能保证数据按照时间的先后依次到达. 例如: 当前接收的某一条数据的 event-time 可能远远早于之前已经处理过的 event-time. 在发生这种情况时, 往往需要结合业务需求对延迟数据进行过滤。

现在考虑如果事件延迟到达会有哪些影响. 假如, 一个单词在 12:04(event-time) 产生, 在 12:11 到达应用. 应用应该使用 12:04 来在窗口(12:00 - 12:10)中更新计数, 而不是使用 12:11. 这些情况在我们基于窗口的聚合中是自然发生的, 因为结构化流可以长时间维持部分聚合的中间状态

但是, 如果这个查询运行数天, 系统很有必要限制内存中累积的中间状态的数量. 这意味着系统需要知道何时从内存状态中删除旧聚合, 因为应用不再接受该聚合的后期数据。

为了实现这个需求, 从 spark2.1, 引入了 watermark(水印), 使用引擎可以自动的跟踪当前的事件时间, 并据此尝试删除旧状态。

通过指定 event-time 列和预估事件的延迟时间上限来定义一个查询的 watermark.。针对一个以时间 T 结束的窗口, 引擎会保留状态和允许延迟时间直到(max event time seen by the engine - late threshold > T)。换句话说, 延迟时间在上限内的被聚合, 延迟时间超出上限的开始被丢弃。

可以通过withWatermark() 来定义watermark:

watermark 计算: watermark = MaxEventTime - Threshhod

而且, watermark只能逐渐增加, 不能减少,Threshhod越大保留的数据越多

总结:

Structured Streaming 引入 Watermark 机制, 主要是为了解决以下两个问题:

  1. 处理聚合中的延迟数据0

  2. 减少内存中维护的聚合状态

注意:引⼊入watermarker以后,⽤用户只能使⽤用 update 、 append 模式,系统才会删除过期数据。

  • 在update模式下,水位线没有沒过窗口的end time之前,如果有数据落入到该窗口,该窗口会重复触发
package structedstreaming.day2
import java.sql.Timestamp
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, SparkSession, functions}
object Test_Window_WaterMark {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().master("local[*]").appName("Test_Window_WaterMark").getOrCreate()import spark.implicits._val df1: DataFrame = spark.readStream.format("socket").option("host","spark56")   //监听的主机名.option("port","9999")   //监听的端口号.load()//hello,2020-8-04 16:40:20val df2: DataFrame = df1.as[String].map(v => {val strings: Array[String] = v.split(",")(strings(0),Timestamp.valueOf(strings(1)))   //需要将字符串转换成Timestamp,否则结合水印使用会报错}).toDF("word", "timestamp")//根据时间列生成窗口,进而根据窗口+单词进行分组val df3: DataFrame = df2//参数一:时间戳列名           参数二:水位线阈值        水印参数设置.withWatermark("timestamp","1 seconds").groupBy(functions.window($"timestamp", "5 seconds", "5 seconds"),$"word").count()df3.writeStream.outputMode(OutputMode.Update())      //一旦使用水印,只能使用Update和Append,不支持Complete模式.format("console")                      //因为语义上有冲突,水印会导致一些旧窗口丢弃,而Complete是完整输出.option("truncate",false)             //显示结果不让阶段.start().awaitTermination()}
}
//输出
会输出水位线后面的数据,如果输入水位线所在窗口之前的窗口数据都不会输出。

  • 在Append模式下,水位线没有沒过窗口的end time之前,如果有数据落入到该窗口,该窗口不会触发,只会默默的计算,只有当水位线沒过窗口end time的时候,才会做出最终输出。
package com.baizhi.test2
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, SparkSession, functions}
object Test_Window_SQL {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().master("local[*]").appName("app1").getOrCreate()import spark.implicits._//df1被称之为输入表val df1: DataFrame = spark.readStream.format("socket").option("host","node1")      //监听的主机名.option("port","9999")          //监听的端口号.load()//hello,2021-8-05 16:40:20val df2: DataFrame = df1.as[String].map(v => {val arr: Array[String] = v.split(",")(arr(0), arr(1))}).toDF("word", "timestamp")//基于DSL的窗口设置/*val df3: DataFrame = df2.groupBy(functions.window($"timestamp", "10 seconds", "10 seconds"),$"word" //$"word"  string--->column).count()*///基于SQL的窗口设置  //df3就可以当成一个具有window列的表了val df3: DataFrame = df2.withWatermark("timestamp","1 seconds").select(functions.window($"timestamp", "10 seconds", "10                               seconds"),$"word")df3.createTempView("t_words")val df4: DataFrame = spark.sql("select window , word ,count(*) c from t_words group by window , word")df4.writeStream.outputMode(OutputMode.Update()).format("console").option("truncate",false)    //显示结果不让截断.start().awaitTermination()}
}

Spark五之Structured-Streaming相关推荐

  1. Spark中的Structured Streaming

    介绍 spark streaming (spark 1.6 引入 使用批处理模拟流式计算) DStream (离散流) structured streaming (结构化流 spark2.0引入) S ...

  2. Structured Streaming编程 Programming Guide

    Structured Streaming编程 Programming Guide • Overview • Quick Example • Programming Model o Basic Conc ...

  3. 2021年大数据Spark(五十三):Structured Streaming Deduplication

    目录 Streaming Deduplication 介绍 需求 ​​​​​​​代码演示 Streaming Deduplication 介绍 在实时流式应用中,最典型的应用场景:网站UV统计. 1: ...

  4. 2021年大数据Spark(五十二):Structured Streaming 事件时间窗口分析

    目录 事件时间窗口分析 时间概念 ​​​​​​​event-time ​​​​​​​延迟数据处理 ​​​​​​​延迟数据 ​​​​​​​Watermarking 水位 ​​​​​​​官方案例演示 事件 ...

  5. 2021年大数据Spark(五十一):Structured Streaming 物联网设备数据分析

    目录 ​​​​​​​物联网设备数据分析 ​​​​​​​设备监控数据准备 ​​​​​​​创建Topic ​​​​​​​模拟数据 ​​​​​​​SQL风格 ​​​​​​​DSL风格 物联网设备数据分析 在 ...

  6. 2021年大数据Spark(五十):Structured Streaming 案例一实时数据ETL架构

    目录 案例一 实时数据ETL架构 准备主题 ​​​​​​​模拟基站日志数据 ​​​​​​​实时增量ETL 案例一 实时数据ETL架构 在实际实时流式项目中,无论使用Storm.SparkStreami ...

  7. 2021年大数据Spark(四十五):Structured Streaming Sources 输入源

    目录 Sources 输入源 Socket数据源-入门案例 需求 编程实现 ​​​​​​​文件数据源-了解 ​​​​​​​需求 ​​​​​​​代码实现 ​​​​​​​Rate source-了解 So ...

  8. Spark2.3(三十五)Spark Structured Streaming源代码剖析(从CSDN和Github中看到别人分析的源代码的文章值得收藏)...

    从CSDN中读取到关于spark structured streaming源代码分析不错的几篇文章 spark源码分析--事件总线LiveListenerBus spark事件总线的核心是LiveLi ...

  9. Spark 2.3.0 Structured Streaming详解

    一.什么是Structured Streaming 结构化流(Structured Streaming)是一个建立在Spark SQL引擎之上可扩展且容错的流处理引擎.你可以使用与静态数据批处理计算相 ...

  10. 2021年大数据Spark(四十九):Structured Streaming 整合 Kafka

    目录 整合 Kafka 说明 Kafka特定配置 ​​​​​​​KafkaSoure 1.消费一个Topic数据 2.消费多个Topic数据 3.消费通配符匹配Topic数据 ​​​​​​​Kafka ...

最新文章

  1. 混合密度网络(MDN)进行多元回归详解和代码示例
  2. java学习与总结:索引
  3. PHP实现二叉树的深度优先遍历(前序、中序、后序)和广度优先遍历(层次) 转载陈小龙哈2017...
  4. Libra教程之:数据结构和存储
  5. java_math_BigDecimal
  6. 相对湿度计算软件_冷却塔填料的用量要怎么计算?
  7. SAP License:SAP ECC6安装系列一:安装前硬件和软件准备
  8. python及拓展版_python扩展模块
  9. linux oracle pam,Linux下安装Oracle11g软、硬件环境检测和修改
  10. 畅通工程(hdu1863)并查集
  11. 2022年下半年软考初级程序员备考
  12. java的io流有什么作用_Java IO流详解(一)——简单介绍
  13. a modern epidemic
  14. bootloader 详细介绍
  15. 字谜 大小写重复全排列问题
  16. mysql根据姓分组_mysql 分组查询 group by
  17. HTML - CSS - JavaScript
  18. 笔记本重装系统后声音图标显示红色×号
  19. 华为鸿蒙系统新界面,华为德国申请专利更新 自研操作系统鸿蒙界面曝光
  20. 智能化“决战”开启新周期:大众“向上”、蔚来“向下”

热门文章

  1. java中的public void是什么意思
  2. 百度地图 去掉地名和logo
  3. 【JAVA】-- Java五大常用布局管理器(BorderLayout、FlowLayout、GridLayout、GridBagLayout、CardLayout)
  4. 网络——TCP连接管理
  5. 物联网技术与应用、传感器原理与应用
  6. Could not find method stopService(View) in a parent or ancestor Context for android:onClick attribut
  7. Westcar液力耦合器易熔塞-WESTCAR限矩型液力偶合器的过载保护装置
  8. MS 08-067失败过程记录
  9. matlab怎么做参数估计,[转载]参数估计(matlab)
  10. 123大学计算机,大学计算机及程序设计Ⅲ(贺兴亚)-中国大学mooc-题库零氪