目的

将kafka的offset保存到外部的redis数据库中,再次读取的时候也从外部的redis数据库读取
主要步骤

1 从kafka获取要读取的消息的开始offset
2 通过offset读取数据,进行处理
3将读取到的最新的offset更新到redis

演示案例

首先启动生产者

kafka-console-producer.sh \
--broker-list mypc01:9092,mypc02:9092,mypc03:9092 \
--topic pet

下述为consumer.properties的内容,消费者策略从这里提取

bootstrap.servers=mypc01:9092,mypc02:9092,mypc03:9092
group.id=test1
enable.auto.commit=false
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
zookeeper.servers=mypc01:2181,mypc02:2181,mypc03:2181

实例代码

import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.{Jedis, JedisPool}import java.util
import java.util.Properties/*
使用redis kv数据库维护kafka主题分区的offset
1 从kafka获取要读取的消息的开始offset
2 通过offset读取数据,进行处理
3将读取到的最新的offset更新到redisredis 存储offset数据的设计思路:使用hash类型比较ok
test1     pet0  12pet1  10pet2  12*/
object RedisOffsetDemo extends App {private val conf = new SparkConf().setAppName("test").setMaster("local[*]").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")private val ssc: StreamingContext = new StreamingContext(conf, Seconds(5))private val properties = new Properties()//加载消费者的配置文件properties.load(RedisOffsetDemo.getClass.getClassLoader.getResourceAsStream("consumer.properties"))//将消费者的配置参数转为Map类型private val paras: Map[String, String] = Map[String, String]("bootstrap.servers" -> properties.getProperty("bootstrap.servers"),"group.id" -> properties.getProperty("group.id"),"enable.auto.commit" -> properties.getProperty("enable.auto.commit"),"key.deserializer" -> properties.getProperty("key.deserializer"),"value.deserializer" -> properties.getProperty("value.deserializer"))//定义topic数组val topics = Array("pet")private val RedisUtils = new RedisUtils()//获取jedis对象private val jedis: Jedis = RedisUtils.getJedis//获取offsets对象,类型是一个Mapprivate val offsets: Map[TopicPartition, Long] = RedisUtils.getOffset(jedis, properties)var dstream: InputDStream[ConsumerRecord[String, String]] = _//如果offsets不为空,就从offsets处开始消费if (offsets.nonEmpty) {//从kafka消费数据,消费的数据构成一个DStream,之后就可以应用各种算子进行处理了//createDirectStream的第三个参数是个方法,且该方法可以传入一个offsets//如果需要手动提交,我们需要传入这个offsetsdstream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Array("pet"), paras, offsets))} else {//如果offsets为空,就从头开始消费dstream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Array("pet"), paras))}//处理消费者获取的数据//从kafka获取的Dstream,每一条都是一个ConsumerRecord,就是一条消息//从消息上可以解析出各种信息dstream.foreachRDD((rdd: RDD[ConsumerRecord[String, String]]) => {rdd.foreach((x: ConsumerRecord[String, String]) => {println(s"partition: ${x.partition()} offset: ${x.offset()}  value: ${x.value()}")//获取offset的最新值val ranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRangesRedisUtils.updateOffsets(properties.getProperty("group.id"), ranges, jedis)})})ssc.start()ssc.awaitTermination()
}class RedisUtils {//定义一个获取jedis的工具方法def getJedis: Jedis = {val config = new GenericObjectPoolConfig()config.setMaxTotal(15)//最大空闲连接数config.setMaxIdle(10)//最小空闲连接数config.setMinIdle(5)//创建线程池val pool = new JedisPool(config, "mypc01", 6379)//获取连接对象val jedis: Jedis = pool.getResourcejedis}//定义一个获取offsets对象的工具方法def getOffset(jedis: Jedis, prop: Properties): Map[TopicPartition, Long] = {//定义一个空的offsets对象var offsets: Map[TopicPartition, Long] = Map()//通过组名作为key从redis获取对应的field和value//本例中就是获取key=test1的fied以及value,返回的是一个map//就是利用hash类型的key获取hash类型的值//此处field代表 主题,value代表offset的那个数字//Map((pet0,11),(pet0,18))var kvs: util.Map[String, String] = jedis.hgetAll(prop.getProperty("group.id"))import scala.collection.JavaConversions._for (kv <- kvs) {val arr: Array[String] = kv._1.split("#")//从field解析出topicval topic: String = arr(0)从field解析出partitionval partition: Int = arr(1).toInt//offsets是个mapoffsets += (new TopicPartition(topic, partition) -> kv._2.toLong)}offsets}def updateOffsets(groupName: String, range: Array[OffsetRange], jedis: Jedis): Unit = {for (x <- range) {jedis.hset(groupName, x.topic + "#" + x.partition, x.untilOffset.toString)}}
}

解析

def createDirectStream[K, V](ssc: StreamingContext, locationStrategy: LocationStrategy, consumerStrategy: ConsumerStrategy[K, V]): InputDStream[ConsumerRecord[K, V]]

DStream的Scala构造函数,其中每个给定的Kafka主题/分区都对应于RDD分区。 spark配置spark.streaming.kafka.maxRatePerPartition给出每个分区每秒接受的最大消息数。

org.apache.spark.streaming.kafka010
trait HasOffsetRanges

表示任何具有OffsetRanges集合的对象。 这可用于访问由直接Kafka DStream生成的RDD中的偏移范围(请参阅KafkaUtils.createDirectStream)。

  KafkaUtils.createDirectStream(...)。foreachRDD {rdd =>val offsetRanges = rdd.asInstanceOf [HasOffsetRanges] .offsetRanges...}

使用redis kv数据库维护kafka主题分区的offset相关推荐

  1. 【Kafka】Kafka 1.1.0以后版本获取Kafka每个分区最新Offset的几种方法

    1.概述 脚本方法 [lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$ ./bin/kafka-run-

  2. 【Kafka】Kafka 0.10.0版本获取Kafka每个分区最新Offset的几种方法

    1.概述 脚本方法 [root@1 kafka]# ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list xxx:9092

  3. 使用Redis和Apache Kafka处理时间序列数据

    目录 场景:设备监控 解决方案架构 先决条件: 设置基础设施组件 设置本地服务 MQTT代理 Grafana Kafka 连接 创建MQTT源连接器实例 部署设备数据处理器应用程序 启动模拟设备数据生 ...

  4. Kafka主题体系架构-复制、故障转移和并行处理

    本文讨论了Kafka主题的体系架构,讨论了如何将分区用于故障转移和并行处理. Kafka主题,日志和分区 Kafka将主题存储在日志中.主题日志分为多个分区.Kafka将日志的分区分布在多个服务器或磁 ...

  5. Kafka系列之:深入理解Kafka 主题、分区、副本、LEO、ISR、HW、Kafka的主写主读和分区leader选举

    Kafka系列之:深入理解Kafka 主题.分区.副本.LEO.ISR.HW.Kafka的主写主读和分区leader选举 一.Kafka重要知识点提炼 二.详细介绍Kafka 主题.分区.副本.LEO ...

  6. 使用Java API创建(create),查看(describe),列举(list),删除(delete)Kafka主题(Topic)--转载...

    原文:http://blog.csdn.net/changong28/article/details/39325079 使用Kafka的同学都知道,我们每次创建Kafka主题(Topic)的时候可以指 ...

  7. kafka的分区策略(partition assignment strategy)

    概述 kafka的分区策略指的是producer端的 各个partition中的数据如何安排给consumer消费. Range(按范围) ange策略是对每个主题而言的,首先对同一个主题里面的分区按 ...

  8. java上传kafka的方法_哪种方法是将所有数据从Kafka主题复制到接收器(文件或Hive表)的最佳方法?...

    我正在使用Kafka Consumer API将所有数据从Kafka主题复制到Hive表 . 为此,我使用HDFS作为中间步骤 . 我使用唯一的组ID并将偏移重置为"最早",以便从 ...

  9. 【kafka】kafka 指定分区消费 不会触发 reblance

    文章目录 1.概述 2.验证 2.1 2个都是subscribeTopic 2.2 指定消费与全部消费 2.3 两个指定消费 2.4 2个都消费同样的分区呢? 1.概述 今天在博客:Kafka-消费, ...

最新文章

  1. 解决.net webservice的WebClient或HttpWebRequest首次连接缓慢问题
  2. cmake, This may result in binaries being created in the wrong place
  3. 单细胞转录组单飞第二期开课啦!!
  4. 后处理编辑修改_NX后处理打开报错处理方法
  5. busybox怎么安装
  6. pandas read_sql
  7. 爱我的人请别走远(转载)
  8. 老王学linux-系统安全
  9. My New Game2
  10. linux系统怎样设置分屏显示器,话说你们的双屏显示器是怎样设置的 尤其是外接显示器分辨率设置...
  11. 使用SendCloud企业发送邮件
  12. mysql sniffer 源码_MySQL 抓包工具 - MySQL Sniffer 使用小结 (含带general_log日志)
  13. Redis:字符串INCR、INCRBY、INCRBYFLOAT、DECR、DECRBY命令介绍
  14. 【c语言】求一个3行4列矩阵的外框的元素值之和
  15. 最小化安装 Rocky Linux(CentOS的传承,同创始人发起)
  16. JAVA-SE中:集合,IO流,文件与异常
  17. 经典生产者-消费者问题解析
  18. MSE——软件工程硕士
  19. 南非世界杯 1/8决赛 德国vs英格兰
  20. 买开 、卖平、卖开、买平

热门文章

  1. Python 日期计算:计算某日期前几天,后几天的日期,也可以计算小时,分钟之后的日期时间
  2. java .class 实例对象_Java产生Class类的三种实例化对象的方法
  3. python中使用什么来实现异常捕捉_python 异常捕捉
  4. linux的基本命令tail,Linux基本命令(示例代码)
  5. centos ipv6 网卡_Linux_03-Centos的基本网络配置
  6. 小程序二维码需要发布正式版后才能获取到_IOS14.3正式版发布时间12月15日:苹果ios14.3正式版内容一览[多图]-游戏产业...
  7. linux系统上安装远程软件下载,如何安装向日葵远程控制软件Linux被控端?
  8. python读取文件登录_python 3.x 循环读取文件用户登录
  9. nacos安装与基础配置
  10. MATLAB对表达式进行降幂排列,MATLAB上机答案.doc