目录

  • 1 业务场景
  • 2 初始化环境
    • 2.1 创建 Topic
    • 2.2 模拟日志数据
    • 2.3 StreamingContextUtils 工具类
  • 3 实时数据ETL存储
  • 4 实时状态更新统计
    • 4.1 updateStateByKey 函数
    • 4.2 mapWithState 函数
  • 5 实时窗口统计

1 业务场景

百度搜索风云榜(http://top.baidu.com/)以数亿网民的单日搜索行为作为数据基础,以搜索关键词为统计对象建立权威全面的各类关键词排行榜,以榜单形式向用户呈现基于百度海量搜索数据的排行信息,线上覆盖十余个行业类别,一百多个榜单
在【热点榜单】中,可以看到依据搜索关键词实时统计各种维度热点,下图展示【实时热点】。
仿【百度搜索风云榜】对用户使用百度搜索时日志进行分析:【百度搜索日志实时分析】,主要业务需求如下三个方面:

  • 业务一:搜索日志数据存储HDFS,实时对日志数据进行ETL提取转换,存储HDFS文件系统;
  • 业务二:百度热搜排行榜Top10,累加统计所有用户搜索词次数,获取Top10搜索词及次数;
  • 业务三:近期时间内热搜Top10,统计最近一段时间范围(比如,最近半个小时或最近2个小时)
    内用户搜索词次数,获取Top10搜索词及次数;
    开发Maven Project中目录结构如下所示:

2 初始化环境

编程实现业务之前,首先编写程序模拟产生用户使用百度搜索产生日志数据和创建工具类StreamingContextUtils提供StreamingContext对象与从Kafka接收数据方法。

2.1 创建 Topic

启动Kafka Broker服务,创建Topic【search-log-topic】,命令如下所示:

# 1. 启动Zookeeper 服务
zookeeper-daemon.sh start
# 2. 启动Kafka 服务
kafka-daemon.sh start
# 3. Create Topic
kafka-topics.sh --create --topic search-log-topic \
--partitions 3 --replication-factor 1 --zookeeper node1.oldlut.cn:2181/kafka200
# List Topics
kafka-topics.sh --list --zookeeper node1.oldlut.cn:2181/kafka200
# Producer
kafka-console-producer.sh --topic search-log-topic --broker-list node1.oldlut.cn:9092
# Consumer
kafka-console-consumer.sh --topic search-log-topic \
--bootstrap-server node1.oldlut.cn:9092 --from-beginning

2.2 模拟日志数据

模拟用户搜索日志数据,字段信息封装到CaseClass样例类【SearchLog】类,代码如下:

package cn.oldlut.spark.app.mock/*** 用户百度搜索时日志数据封装样例类CaseClass* <p>** @param sessionId 会话ID* @param ip        IP地址* @param datetime  搜索日期时间* @param keyword   搜索关键词*/
case class SearchLog(sessionId: String, //ip: String, //datetime: String, //keyword: String //) {override def toString: String = s"$sessionId,$ip,$datetime,$keyword"
}`` `
模拟产生搜索日志数据类 【 MockSearchLogs 】 具体代码如下 :
`` `
package cn.oldlut.spark.app.mockimport java.util.{Properties, UUID}
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import scala.util.Random/*** 模拟产生用户使用百度搜索引擎时,搜索查询日志数据,包含字段为:* uid, ip, search_datetime, search_keyword*/
object MockSearchLogs {def main(args: Array[String]): Unit = {// 搜索关键词,直接到百度热搜榜获取即可val keywords: Array[String] = Array("罗志祥", "谭卓疑", "当当网", "裸海蝶", "张建国")// 发送Kafka Topicval props = new Properties()props.put("bootstrap.servers", "node1.oldlut.cn:9092")props.put("acks", "1")props.put("retries", "3")props.put("key.serializer", classOf[StringSerializer].getName)props.put("value.serializer", classOf[StringSerializer].getName)val producer = new KafkaProducer[String, String](props)val random: Random = new Random()while (true) {// 随机产生一条搜索查询日志val searchLog: SearchLog = SearchLog(getUserId(), //getRandomIp(), //getCurrentDateTime(), //keywords(random.nextInt(keywords.length)) //)println(searchLog.toString)Thread.sleep(10 + random.nextInt(100))val record = new ProducerRecord[String, String]("search-log-topic", searchLog.toString)producer.send(record)}// 关闭连接producer.close()}/*** 随机生成用户SessionId*/def getUserId(): String = {val uuid: String = UUID.randomUUID().toStringuuid.replaceAll("-", "").substring(16)}/*** 获取当前日期时间,格式为yyyyMMddHHmmssSSS*/def getCurrentDateTime(): String = {val format = FastDateFormat.getInstance("yyyyMMddHHmmssSSS")val nowDateTime: Long = System.currentTimeMillis()format.format(nowDateTime)}/*** 获取随机IP地址*/def getRandomIp(): String = {// ip范围val range: Array[(Int, Int)] = Array((607649792, 608174079), //36.56.0.0-36.63.255.255(1038614528, 1039007743), //61.232.0.0-61.237.255.255(1783627776, 1784676351), //106.80.0.0-106.95.255.255(2035023872, 2035154943), //121.76.0.0-121.77.255.255(2078801920, 2079064063), //123.232.0.0-123.235.255.255(-1950089216, -1948778497), //139.196.0.0-139.215.255.255(-1425539072, -1425014785), //171.8.0.0-171.15.255.255(-1236271104, -1235419137), //182.80.0.0-182.92.255.255(-770113536, -768606209), //210.25.0.0-210.47.255.255(-569376768, -564133889) //222.16.0.0-222.95.255.255)// 随机数:IP地址范围下标val random = new Random()val index = random.nextInt(10)val ipNumber: Int = range(index)._1 + random.nextInt(range(index)._2 - range(index)._1)//println(s"ipNumber = ${ipNumber}")// 转换Int类型IP地址为IPv4格式number2IpString(ipNumber)}/*** 将Int类型IPv4地址转换为字符串类型*/def number2IpString(ip: Int): String = {val buffer: Array[Int] = new Array[Int](4)buffer(0) = (ip >> 24) & 0xffbuffer(1) = (ip >> 16) & 0xffbuffer(2) = (ip >> 8) & 0xffbuffer(3) = ip & 0xff// 返回IPv4地址buffer.mkString(".")}
}

运行应用程序,源源不断产生日志数据,发送至Kafka(同时在控制台打印),截图如下:

2.3 StreamingContextUtils 工具类

所有SparkStreaming应用都需要构建StreamingContext实例对象,并且从采用New KafkaConsumer API消费Kafka数据,编写工具类【StreamingContextUtils】,提供两个方法:

  • 方法一:getStreamingContext,获取StreamingContext实例对象
  • 方法二:consumerKafka,消费Kafka Topic中数据
    具体代码如下:
package cn.oldlut.spark.appimport org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** 工具类提供:构建流式应用上下文StreamingContext实例对象和从Kafka Topic消费数据*/
object StreamingContextUtils {/*** 获取StreamingContext实例,传递批处理时间间隔** @param batchInterval 批处理时间间隔,单位为秒*/def getStreamingContext(clazz: Class[_], batchInterval: Int): StreamingContext = {// i. 创建SparkConf对象,设置应用配置信息val sparkConf = new SparkConf().setAppName(clazz.getSimpleName.stripSuffix("$")).setMaster("local[3]")// 设置Kryo序列化.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").registerKryoClasses(Array(classOf[ConsumerRecord[String, String]]))// ii.创建流式上下文对象, 传递SparkConf对象和时间间隔val context = new StreamingContext(sparkConf, Seconds(batchInterval))// iii. 返回context}/*** 从指定的Kafka Topic中消费数据,默认从最新偏移量(largest)开始消费** @param ssc       StreamingContext实例对象* @param topicName 消费Kafka中Topic名称*/def consumerKafka(ssc: StreamingContext, topicName: String): DStream[ConsumerRecord[String, String]] = {// i.位置策略val locationStrategy: LocationStrategy = LocationStrategies.PreferConsistent// ii.读取哪些Topic数据val topics = Array(topicName)// iii.消费Kafka 数据配置参数val kafkaParams = Map[String, Object]("bootstrap.servers" -> "node1.oldlut.cn:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "group_id_streaming_0001","auto.offset.reset" -> "latest","enable.auto.commit" -> (false: java.lang.Boolean))// iv.消费数据策略val consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.Subscribe(topics, kafkaParams)// v.采用新消费者API获取数据,类似于Direct方式val kafkaDStream: DStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc, locationStrategy, consumerStrategy)// vi.返回DStreamkafkaDStream}
}

3 实时数据ETL存储

实时从Kafka Topic消费数据,提取ip地址字段,调用【ip2Region】库解析为省份和城市,存储到HDFS文件中,设置批处理时间间隔BatchInterval为10秒,完整代码如下:

package cn.oldlut.spark.app.etlimport cn.oldlut.spark.app.StreamingContextUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
import org.lionsoul.ip2region.{DataBlock, DbConfig, DbSearcher}/*** 实时消费Kafka Topic数据,经过ETL(过滤、转换)后,保存至HDFS文件系统中,BatchInterval为:10s*/
object StreamingETLHdfs {def main(args: Array[String]): Unit = {// 1. 获取StreamingContext实例对象val ssc: StreamingContext = StreamingContextUtils.getStreamingContext(this.getClass, 10)// 2. 从Kafka消费数据,使用Kafka New Consumer APIval kafkaDStream: DStream[ConsumerRecord[String, String]] = StreamingContextUtils.consumerKafka(ssc, "search-log-topic")// 3. 数据ETL:过滤不合格数据及转换IP地址为省份和城市,并存储HDFS上kafkaDStream.foreachRDD { (rdd, time) =>// i. message不为null,且分割为4个字段val kafkaRDD: RDD[ConsumerRecord[String, String]] = rdd.filter { record =>val message: String = record.value()null != message && message.trim.split(",").length == 4}// ii. 解析IP地址val etlRDD: RDD[String] = kafkaRDD.mapPartitions { iter =>// 创建DbSearcher对象,针对每个分区创建一个,并不是每条数据创建一个val dbSearcher = new DbSearcher(new DbConfig(), "dataset/ip2region.db")iter.map { record =>val Array(_, ip, _, _) = record.value().split(",")// 依据IP地址解析val dataBlock: DataBlock = dbSearcher.btreeSearch(ip)val region: String = dataBlock.getRegionval Array(_, _, province, city, _) = region.split("\\|")// 组合字符串s"${record.value()},$province,$city"}}// iii. 保存至文件val savePath = s"datas/streaming/etl/search-log-${time.milliseconds}"if (!etlRDD.isEmpty()) {etlRDD.coalesce(1).saveAsTextFile(savePath)}}// 4.启动流式应用,一直运行,直到程序手动关闭或异常终止ssc.start()ssc.awaitTermination()ssc.stop(stopSparkContext = true, stopGracefully = true)}
}

运行模拟日志数据程序和ETL应用程序,查看实时数据ETL后保存文件,截图如下:

4 实时状态更新统计

实 时 累 加 统 计 用 户 各 个 搜 索 词 出 现 的 次 数 , 在 SparkStreaming 中 提 供 函 数【updateStateByKey】实现累加统计,Spark 1.6提供【mapWithState】函数状态统计,性能更好,实际应用中也推荐使用。

4.1 updateStateByKey 函数

状态更新函数【updateStateByKey】表示依据Key更新状态,要求DStream中数据类型为【Key/Value】对二元组,函数声明如下:

将每批次数据状态,按照Key与以前状态,使用定义函数【updateFunc】进行更新,示意图如下:

文档: http://spark.apache.org/docs/2.4.5/streaming-programming-guide.html#updatestatebykey-operation
针对搜索词词频统计WordCount,状态更新逻辑示意图如下:
以前的状态数据,保存到Checkpoint检查点目录中,所以在代码中需要设置Checkpoint检查点目录:
完整演示代码如下:

package cn.oldlut.spark.app.stateimport cn.oldlut.spark.app.StreamingContextUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream/*** 实时消费Kafka Topic数据,累加统计各个搜索词的搜索次数,实现百度搜索风云榜*/
object StreamingUpdateState {def main(args: Array[String]): Unit = {// 1. 获取StreamingContext实例对象val ssc: StreamingContext = StreamingContextUtils.getStreamingContext(this.getClass, 5)// TODO: 设置检查点目录ssc.checkpoint(s"datas/streaming/state-${System.nanoTime()}")// 2. 从Kafka消费数据,使用Kafka New Consumer APIval kafkaDStream: DStream[ConsumerRecord[String, String]] = StreamingContextUtils.consumerKafka(ssc, "search-log-topic")// 3. 对每批次的数据进行搜索词次数统计val reduceDStream: DStream[(String, Int)] = kafkaDStream.transform { rdd =>val reduceRDD = rdd// 过滤不合格的数据.filter { record =>val message: String = record.value()null != message && message.trim.split(",").length == 4}// 提取搜索词,转换数据为二元组,表示每个搜索词出现一次.map { record =>val keyword: String = record.value().trim.split(",").lastkeyword -> 1}// 按照单词分组,聚合统计.reduceByKey((tmp, item) => tmp + item) // TODO: 先聚合,再更新,优化reduceRDD // 返回}/*def updateStateByKey[S: ClassTag](// 状态更新函数updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]第一个参数:Seq[V]表示的是相同Key的所有Value值第二个参数:Option[S]表示的是Key的以前状态,可能有值Some,可能没值None,使用Option封装S泛型,具体类型有业务具体,此处是词频:Int类型*/val stateDStream: DStream[(String, Int)] = reduceDStream.updateStateByKey((values: Seq[Int], state: Option[Int]) => {// a. 获取以前状态信息val previousState = state.getOrElse(0)// b. 获取当前批次中Key对应状态val currentState = values.sum// c. 合并状态val latestState = previousState + currentState// d. 返回最新状态Some(latestState)})// 5. 将结果数据输出 -> 将每批次的数据处理以后输出stateDStream.print()// 6.启动流式应用,一直运行,直到程序手动关闭或异常终止ssc.start()ssc.awaitTermination()ssc.stop(stopSparkContext = true, stopGracefully = true)}
}

运行应用程序,通过WEB UI界面可以发现,将以前状态保存到Checkpoint检查点目录中,更新时在读取。
此外,updateStateByKey函数有很多重载方法,依据不同业务需求选择合适的方式使用。

4.2 mapWithState 函数

Spark 1.6提供新的状态更新函数【mapWithState】,mapWithState函数也会统计全局的key的状态,但是如果没有数据输入,便不会返回之前的key的状态,只是关心那些已经发生的变化的key,对于没有数据输入,则不会返回那些没有变化的key的数据。
这样的话,即使数据量很大,checkpoint也不会像updateStateByKey那样,占用太多的存储,效率比较高;
需要构建StateSpec对象,对状态State进行封装,可以进行相关操作,类的声明定义如下:
状态函数【mapWithState】参数相关说明:
完整演示代码如下:

package cn.oldlut.spark.app.stateimport cn.oldlut.spark.app.StreamingContextUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{State, StateSpec, StreamingContext}
import org.apache.spark.streaming.dstream.DStream/*** 实时消费Kafka Topic数据,累加统计各个搜索词的搜索次数,实现百度搜索风云榜*/
object StreamingMapWithState {def main(args: Array[String]): Unit = {// 1. 获取StreamingContext实例对象val ssc: StreamingContext = StreamingContextUtils.getStreamingContext(this.getClass, 5)// TODO: 设置检查点目录ssc.checkpoint(s"datas/streaming/state-${System.nanoTime()}")// 2. 从Kafka消费数据,使用Kafka New Consumer APIval kafkaDStream: DStream[ConsumerRecord[String, String]] = StreamingContextUtils.consumerKafka(ssc, "search-log-topic")// 3. 对每批次的数据进行搜索词进行次数统计val reduceDStream: DStream[(String, Int)] = kafkaDStream.transform { rdd =>val reduceRDD: RDD[(String, Int)] = rdd// 过滤不合格的数据.filter { record =>val message: String = record.value()null != message && message.trim.split(",").length == 4}// 提取搜索词,转换数据为二元组,表示每个搜索词出现一次.map { record =>val keyword: String = record.value().trim.split(",").lastkeyword -> 1}// 按照单词分组,聚合统计.reduceByKey((tmp, item) => tmp + item) // TODO: 先聚合,再更新,优化// 返回reduceRDD}// TODO: 4、实时累加统计搜索词搜索次数,使用mapWithState函数/*按照Key来更新状态的,一条一条数据的更新状态def mapWithState[StateType: ClassTag, MappedType: ClassTag](spec: StateSpec[K, V, StateType, MappedType]): MapWithStateDStream[K, V, StateType, MappedType]a. 通过函数源码发现参数使用对象StateSpec 实例对象b. StateSpec表示对状态封装,里面涉及到相关数据类型c. 如何构建StateSpec对象实例呢??StateSpec 伴生对象中function函数构建对象def function[KeyType, ValueType, StateType, MappedType](// 从函数名称可知,针对每条数据更新Key的转态信息mappingFunction: (KeyType, Option[ValueType], State[StateType]) => MappedType): StateSpec[KeyType, ValueType, StateType, MappedType]*/// 状态更新函数,针对每条数据进行更新状态val spec: StateSpec[String, Int, Int, (String, Int)] = StateSpec.function(// (KeyType, Option[ValueType], State[StateType]) => MappedType(keyword: String, countOption: Option[Int], state: State[Int]) => {// a. 获取当前批次中搜索词搜索次数val currentState: Int = countOption.getOrElse(0)// b. 从以前状态中获取搜索词搜索次数val previousState = state.getOption().getOrElse(0)// c. 搜索词总的搜索次数val latestState = currentState + previousState// d. 更行状态state.update(latestState)// e. 返回最新省份销售订单额(keyword, latestState)})// 调用mapWithState函数进行实时累加状态统计val stateDStream: DStream[(String, Int)] = reduceDStream.mapWithState(spec)// 5. 将结果数据输出 -> 将每批次的数据处理以后输出stateDStream.print()// 6.启动流式应用,一直运行,直到程序手动关闭或异常终止ssc.start()ssc.awaitTermination()ssc.stop(stopSparkContext = true, stopGracefully = true)}
}

运行程序可以发现,当Key(搜索单词)没有出现时,不会更新状态,仅仅更新当前批次中出现的Key的状态。
mapWithState 实现有状态管理主要是通过两点:a)、历史状态需要在内存中维护,这里必需的了,updateStateBykey也是一样;b)、自定义更新状态的mappingFunction,这些就是具体的业务功能实现逻辑了(什么时候需要更新状态)
首先数据像水流一样从左侧的箭头流入,把mapWithState看成一个转换器的话,mappingFunc就是转换的规则,流入的新数据(key-value)结合历史状态(通过key从内存中获取的历史状态)进行一些自定义逻辑的更新等操作,最终从红色箭头中流出。

5 实时窗口统计

SparkStreaming中提供一些列窗口函数,方便对窗口数据进行分析,文档:

http://spark.apache.org/docs/2.4.5/streaming-programming-guide.html#window-operations

在实际项目中,很多时候需求:每隔一段时间统计最近数据状态,并不是对所有数据进行统计,称为趋势统计或者窗口统计,SparkStreaming中提供相关函数实现功能,业务逻辑如下:
针对用户百度搜索日志数据,实现【近期时间内热搜Top10】,统计最近一段时间范围(比如,最近半个小时或最近2个小时)内用户搜索词次数,获取Top10搜索词及次数。窗口函数【window】声明如下,包含两个参数:窗口大小(WindowInterval,每次统计数据范围)和滑动大小(每隔多久统计一次),都必须是批处理时间间隔BatchInterval整数倍。
案例完整实现代码如下,为了演示方便,假设BatchInterval为2秒,WindowInterval
为4秒,SlideInterval为2秒。

package cn.oldlut.spark.app.windowimport cn.oldlut.spark.app.StreamingContextUtils
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** 实时消费Kafka Topic数据,每隔一段时间统计最近搜索日志中搜索词次数* 批处理时间间隔:BatchInterval = 2s* 窗口大小间隔:WindowInterval = 4s* 滑动大小间隔:SliderInterval = 2s*/
object StreamingWindow {def main(args: Array[String]): Unit = {// Streaming应用BatchIntervalval BATCH_INTERVAL: Int = 2// Streaming应用窗口大小val WINDOW_INTERVAL: Int = BATCH_INTERVAL * 2val SLIDER_INTERVAL: Int = BATCH_INTERVAL * 1// 1. 获取StreamingContext实例对象val ssc: StreamingContext = StreamingContextUtils.getStreamingContext(this.getClass, BATCH_INTERVAL)// 2. 从Kafka消费数据,使用Kafka New Consumer APIval kafkaDStream: DStream[String] = StreamingContextUtils.consumerKafka(ssc, "search-log-topic").map(record => record.value())// TODO: 添加窗口,设置对应参数/*def window(windowDuration: Duration, slideDuration: Duration): DStream[T]警告信息:ERROR KafkaRDD: Kafka ConsumerRecord is not serializable.Use .map to extract fields before calling .persist or .window*/val windowDStream: DStream[String] = kafkaDStream.window(Seconds(WINDOW_INTERVAL), Seconds(SLIDER_INTERVAL))// 4. 对每批次的数据进行搜索词进行次数统计val countDStream: DStream[(String, Int)] = windowDStream.transform { rdd =>val resultRDD = rdd// 过滤不合格的数据.filter(message => null != message && message.trim.split(",").length == 4)// 提取搜索词,转换数据为二元组,表示每个搜索词出现一次.map { message =>val keyword: String = message.trim.split(",").lastkeyword -> 1}// 按照单词分组,聚合统计.reduceByKey((tmp, item) => tmp + item)// 返回resultRDD}// 5. 将结果数据输出 -> 将每批次的数据处理以后输出countDStream.print()// 6.启动流式应用,一直运行,直到程序手动关闭或异常终止ssc.start()ssc.awaitTermination()ssc.stop(stopSparkContext = true, stopGracefully = true)}
}

SparkStreaming中同时提供将窗口Window设置与聚合reduceByKey合在一起的函数,为了更加方便编程。
使用【reduceByKeyAndWindow】函数,修改上述代码,实现窗口统计,具体代码如下:

package cn.oldlut.spark.app.windowimport cn.oldlut.spark.app.StreamingContextUtils
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** 实时消费Kafka Topic数据,每隔一段时间统计最近搜索日志中搜索词次数* 批处理时间间隔:BatchInterval = 2s* 窗口大小间隔:WindowInterval = 4s* 滑动大小间隔:SliderInterval = 2s*/
object StreamingReduceWindow {def main(args: Array[String]): Unit = {// Streaming应用BatchIntervalval BATCH_INTERVAL: Int = 2// Streaming应用窗口大小val WINDOW_INTERVAL: Int = BATCH_INTERVAL * 2val SLIDER_INTERVAL: Int = BATCH_INTERVAL * 1// 1. 获取StreamingContext实例对象val ssc: StreamingContext = StreamingContextUtils.getStreamingContext(this.getClass, BATCH_INTERVAL)// 2. 从Kafka消费数据,使用Kafka New Consumer APIval kafkaDStream: DStream[String] = StreamingContextUtils.consumerKafka(ssc, "search-log-topic").map(recored => recored.value())// 3. 对每批次的数据进行搜索词进行次数统计val etlDStream: DStream[(String, Int)] = kafkaDStream.transform { rdd =>val etlRDD = rdd// 过滤不合格的数据.filter(message => null != message && message.trim.split(",").length == 4)// 提取搜索词,转换数据为二元组,表示每个搜索词出现一次.map { message =>val keyword: String = message.trim.split(",").lastkeyword -> 1}etlRDD // 返回}// 4. 对获取流式数据进行ETL后,使用窗口聚合函数统计计算/*def reduceByKeyAndWindow(reduceFunc: (V, V) => V, // 聚合函数windowDuration: Duration, // 窗口大小slideDuration: Duration // 滑动大小): DStream[(K, V)]*/val resultDStream: DStream[(String, Int)] = etlDStream.reduceByKeyAndWindow((tmp: Int, value: Int) => tmp + value, //Seconds(WINDOW_INTERVAL), //Seconds(SLIDER_INTERVAL) //)// 5. 将结果数据输出 -> 将每批次的数据处理以后输出resultDStream.print()// 6.启动流式应用,一直运行,直到程序手动关闭或异常终止ssc.start()ssc.awaitTermination()ssc.stop(stopSparkContext = true, stopGracefully = true)}
}

大数据Spark实时搜索日志实时分析相关推荐

  1. 大数据Spark对SogouQ日志分析

    目录 1 业务需求 2 准备工作 2.1 HanLP 中文分词 2.2 样例类 SogouRecord 3 业务实现 3.1 读取数据 3.2 搜索关键词统计 3.3 用户搜索点击统计 3.4 搜索时 ...

  2. 日志分析 进入大数据Spark SQL的世界

    以慕课网日志分析为例 进入大数据 Spark SQL 的世界 这篇blog是这个项目的总结,因为从别的地方学习过Hadoop.Spark的一些基础知识了,所以这部分略过.针对自身的不足,我抛出一些问题 ...

  3. 携程是如何把大数据用于实时风控的

    携程是如何把大数据用于实时风控的 大数据 风控 携程 阅读20608  本文由携程技术中心投递,ID:ctriptech.作者:郁伟,携程技术中心风险控制部高级开发经理.2010加入携程,参与了携程结 ...

  4. 连载:阿里巴巴大数据实践—实时技术

    简介:相对于离线批处理技术,流式实时处理技术作为一个非常重要的技术补充,在阿里巴巴集团内被广泛使用. 前言: -更多关于数智化转型.数据中台内容请加入阿里云数据中台交流群-数智俱乐部 和关注官方微信公 ...

  5. 大数据时代的全能日志分析专家--Splunk安装与实践

    大数据时代的全能日志分析专家 --Splunk安装与实践 0.背  景 随着大家对网络安全意识的提高,企业网管理人员,必须对IT基础设置进行监控及安全事件的管理,管理数据的数量和种类非常巨大,那么就需 ...

  6. 【Todo】【读书笔记】大数据Spark企业级实战版 Scala学习

    目录:/Users/baidu/Documents/Data/Interview/Hadoop-Spark-Storm-Kafka 下了这本<大数据Spark企业级实战版>, 另外还有一本 ...

  7. Python+大数据-Spark技术栈(二)SparkBaseCore

    Python+大数据-Spark技术栈(二)SparkBase&Core 学习目标 掌握SparkOnYarn搭建 掌握RDD的基础创建及相关算子操作 了解PySpark的架构及角色 环境搭建 ...

  8. 2016年大数据Spark“蘑菇云”行动代码学习之AdClickedStreamingStats模块分析

    2016年大数据Spark"蘑菇云"行动代码学习之AdClickedStreamingStats模块分析     系统背景:用户使用终端设备(IPAD.手机.浏览器)等登录系统,系 ...

  9. 光环大数据spark文档_推荐大数据Spark必读书目

    我有一个非常要好的同事,无数次帮我解决了业务上的痛.技术能力很强,业务方面也精通.而且更耐得住加班,并且是自愿加班,毫无怨言.不像我,6点到准时走人了.但就是这么一位兢兢业业的技术人,却一直没有升职加 ...

最新文章

  1. LDO和DC-DC有什么不同?如何选型?
  2. MySQL和Oracle中的隐式转换
  3. boost::ptr_container::ptr_inserter相关的测试程序
  4. linux内核支持utf8,Linux对非UTF-8中文编码的支持
  5. 写一个工具生成数据库实体类
  6. .Net Core配置与自动更新
  7. 计算正方形面积和周长_寒假作业:长方形、正方形周长面积应用题,附答案
  8. 扎根CNCF社区贡献五年是怎样的体验?听听华为云原生开源团队的负责人怎么说
  9. Eos的Wasm智能合约的局限性
  10. @程序员,Web 开源神器了解一下? | 程序员硬核评测
  11. 2020-02-06 asm内联汇编
  12. php案例之后台数据显示-- mysqli面向过程版(procedure oriented programming = POP)
  13. 如何在 Ubuntu 上转换图像、音频和视频格式
  14. 单片机技术应用实训考核,AT89S52单片机应用教学,QY-KC20
  15. 使用脚本自制 SSL 域名证书
  16. 新时达服务器说明书_新时达电梯主板操作服务器使用手册操作说明书使用说明...
  17. Codeforces 553A Kyoya and Colored Balls 给球涂颜色
  18. 中国电信 CTWing 物联网平台运营数据大解密
  19. 神奇的汉诺塔(图文解析)
  20. JAVA编程基础(四)开启Java语言编程之旅

热门文章

  1. c语言异或运算作用,一文看懂C语言异或运算
  2. Spring源码——bean的加载
  3. 千峰实训-Python-周六
  4. 磁盘显示没有初始化恢复数据办法
  5. 计算机专业春考难不难,【过来人经验】春考之路接下来怎么走?听过来人讲讲填报志愿有哪些注意事项!...
  6. Qt多语言切换-Qt语言家
  7. int型转换为long型遇到的一个小问题
  8. ChatGPT玩起来真是上头,AI广泛应用元年体验AI之美
  9. 机器学习中的数学——病态条件
  10. Freemark 常用代码总结2