1. Scala编程思想

(1) val常量,var变量,在声明变量的时候不需要说明具体的类型,类似于javascript语言

(2) def表示声明函数或方法,其中参数名颠倒写,即变量名在前:类型,返回值不需要写return

def getPath(path:String):String=
{println(path) path
}

(3) for循环:<-被称生成器(generator)

//包含10
for( a <- 1 to 10){println( "Value of a: " + a );}//不包含10
for( a <- 1 until 10){println( "Value of a: " + a );}
//相当于嵌套循环
for( a <- 1 to 3; b <- 1 to 3){println( "Value of a: " + a );println( "Value of b: " + b );
}========>for(a <- 1 to 3){for(b <- 1 to 3){println( "Value of a: " + a );println( "Value of b: " + b );}
}
//循环列表以及可以过滤处理
val numList = List(1,2,3,4,5,6,7,8,9,10,11)
for( a <- numList ){println( "Value of a: " + a );}for( a <- numListif a != 3; if a < 8 ){println( "Value of a: " + a );}//不会输出3以及不会输出大于8的数字

(4) 数组Array,列表List-- List一但创建,其值不能被改变

https://blog.csdn.net/lovehuangjiaju/article/details/46963721

//定长数组
val strArray=new Array[String](10);val numberArray=new Array[Int](10)
//变长数组
val strArrayVar=ArrayBuffer[String]()
//+=意思是在尾部添加元素  strArrayVar+="hello"
//+=后面还可以跟多个元素的集合 strArrayVar+=("World","Programmer")
//++=用于向数组中追加内容,++=右侧可以是任何集合
//追加Array数组
strArrayVar++=Array("Wellcome","To","XueTuWuYou")
//追加List
strArrayVar++=List("Wellcome","To","XueTuWuYou")

(5) scala中所有的集合都来自于scala.collection包及其子包mutable, immutable当中

//scala.collection.immutable包中的集合绝对是不可变的,函数式编程语言推崇使用immutable集合

//scala.collection.immutable包中的集合在是可变的,使用的时候必须明白集合何时发生变化

在scala中,默认使用的都是immutable集合,如果要使用mutable集合,需要在程序中引入

  • Set(集)是一种不存在重复元素的集合,它与数学上定义的集合是对应的

    val numsSet=Set(3.0,5) //使用可变集合,默认的情况下,set的实现方式是HashSet方式
    val linkedHashSet=scala.collection.mutable.LinkedHashSet(3.0,5)
    

Option、None、Some是scala中定义的类型,它们在scala语言中十分常用,因此这三个类型非学重要。

None、Some是Option的子类,它主要解决值为null的问题,在java语言中,对于定义好的HashMap,如果get方法中传入的键不存在,方法会返回null,在编写代码的时候对于null的这种情况通常需要特殊处理,然而在实际中经常会忘记,因此它很容易引起 NullPointerException异常。在Scala语言中通过Option、None、Some这三个类来避免这样的问题,这样做有几个好处,首先是代码可读性更强,当看到Option时,我们自然而然就知道它的值是可选的,然后变量是Option,比如Option[String]的时候,直接使用String的话,编译直接通不过。

Option是一个很有意思的类,首先,这个类并不一个真正的集合类,因为它并没有有继承Traversable或Iterable。但是,它确实具有Iterable的所有操作,这意味着你完全可以把Option当成一个集合去使用

val map=mutable.HashMap[String,Int]()map.put("scala",21)for(i<-map.keys){var value=map.getOrElse(i,throw new NoSuchElementException(i))println(value)value+=10println(value)}/** Get a parameter as an integer, falling back to a default if not set */def getInt(key: String, defaultValue: Int): Int = {getOption(key).map(_.toInt).getOrElse(defaultValue)}/** Get a parameter as an Option */def getOption(key: String): Option[String] = {settings.get(key)}

getOrElse:顾名思义,对于一个Option,如有它确实有值就返回值,否则,也就是为空的话,给一个约定的值。

(6) 函数和闭包

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NXEjLC4n-1585537039402)(.\png\scala函数定义.png)]

  • 值函数,指的是函数可以赋值给变量

    /*函数字面量 function literal=>左侧的表示输入,右侧表示转换操作
    */
    val increase=(x:Int)=>x+1
    //前面的语句等同于
    def increaseAnother(x:Int):Int={x+1}
    
  • 高阶函数:柯里化(Currying)指的是把原来接受多个参数的函数变换成接受一个参数的函数过程,并且返回接受余下的参数且返回结果为一个新函数的技术。

    //普通的非柯里化函数
    scala> def plainOldSum(x:Int,y:Int)=x+y
    plainOldSum: (x: Int, y: Int)Int
    scala> plainOldSum(1,2)
    res0: Int = 3
    scala> def curriedSum(x:Int)(y:Int)=x+y
    curriedSum: (x: Int)(y: Int)Int
    scala> curriedSum(1)(2)
    res1: Int = 3
    /*当你调用 curriedSum (1)(2)时,实际上是依次调用两个普通函数(非柯里化函数),
    第一次调用使用一个参数 x,返回一个函数类型的值,
    第二次使用参数 y 调用这个函数类型的值。*/
    //首先定义第一个函数:
    scala> def first(x:Int)=(y:Int)=>x+y
    first: (x: Int)Int => Int
    //然后我们使用参数 1 调用这个函数来生成第二个函数:
    scala> val second =first(1)
    second: Int => Int = <function1>
    scala> second(2)
    res2: Int = 3scala> val onePlus=curriedSum(1)_
    onePlus: Int => Int = <function1>
    //下划线“_” 作为第二参数列表的占位符, 这个定义的返回值为一个函数,当调用时会给
    //调用的参数加一。
    scala> onePlus(2)
    res3: Int = 3
    //调用生成的函数,给函数传入参数,即可得到我们想要的结果。

    scala 柯里化风格的使用可以简化主函数的复杂度,提高主函数的自闭性,提高功能上的可扩张性、灵活性。可以编写出更加抽象,功能化和高效的函数式代码。

    (7) 单例对象

    在某些应用场景下,我们可能不需要创建对象,而是想直接调用方法,虽然Scala语言并不支持静态成员,但是它也为我们提供了单例模式的实现方法,那就是使用关键字 object。当单例对象与某个类共享同一个名称时,他被称作是这个类的伴生对象:companion object。你必须在同一个源文件里定义类和它的伴生对象。类被称为是这个单例对象的伴生类:companion class。类和它的伴生对象可以互相访问其私有成员。

    (8) 字符串

    (9) Scala编程例子

    val test=new testScala("hello")class testScala {def this(str:String){this()println(str)}
    ===========>
    class testScala(str:String){println(str)
    }
    
       val template="username={0} password={1};"val username="mikasa"val password="123456"val value=MessageFormat.format(template,username,password)println(value)
    

2. Scala读取配置文件

(1) 读取local配置文件

以下程序必须把.properties配置文件放在resources中,但是在spark上可能访问不到

    val properties: Properties = new Propertiesval in: InputStream = getClass.getResourceAsStream("/test.properties")properties.load(new BufferedInputStream(in))var days=properties.getProperty("days")println(days)

以下读取绝对路径,默认的是根目录,即包含src目录的这一层目录

    val properties: Properties = new Propertiesval in: InputStream = new FileInputStream("args(0)")properties.load(i)var days=properties.getProperty("days")println(days)/*********************************************/
val settings = new mutable.HashMap[String, String]()var in: FileInputStream = nulltry {in = new FileInputStream(propertiesPath)val prop = new Properties()prop.load(in)val keys = prop.propertyNames()while (keys.hasMoreElements) {val key = keys.nextElement().toStringsettings += ((key, prop.getProperty(key)))}} catch {case ex: FileNotFoundException => {println("#" * 30)println("no casetest.properties file" + ex)println("#" * 30)}}finally {if (in != null) {in.close()}}println("#" * 30)settings.foreach(println)println("#" * 30)
/*****************************************************/

(2) 读取HDFS上的配置文件(这里要学习如何把文件上传到HDFS)

/***********************************************/
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, Path}
/***********************************************/
val settings = new mutable.HashMap[String, String]()
var in: FSDataInputStream = nulltry {val conf = new Configuration()val path = new Path(propertiesPath)val fs = path.getFileSystem(conf)in = fs.open(path)val prop = new Properties()prop.load(in)val keys = prop.propertyNames()while (keys.hasMoreElements) {val key = keys.nextElement().toStringsettings += ((key, prop.getProperty(key)))}} catch {case ex: FileNotFoundException => {println("#" * 30)println("no casetest.properties file" + ex)println("#" * 30)}}finally {if (in != null) {in.close()}}println("#" * 30)settings.foreach(println)println("#" * 30)

3. Spark开发

(1) 初始化Spark

添加Spark的Maven依赖

groupId = org.apache.spark
artifactId = spark-core_2.10
version = 1.2.0

如果需要访问HDFS集群,那么可以根据HDFS版本添加hadoop-client的依赖。

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

Spark 编程的第一步是需要创建一个 SparkContext 对象,用来告诉 Spark 如何访问集群。在创建 SparkContext 之前,你需要构建一个 SparkConf 对象, SparkConf 对象包含了一些你应用程序的信息。

val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)

appName参数是程序的名字,它会显示在clusterUI上。master是Spark,Mesos或YARN 集群的 URL,或运行在本地模式时,使用专用字符串 “local”。在实践中,当应用程序运行在一个集群上时,你并不想要把 master 硬编码到你的程序中,你可以用 spark-submit启动你的应用程序的时候传递它。然而,你可以在本地测试和单元测试中使用 “local” 运行Spark 进程。

(2) 弹性分布式数据集 (RDDs)

Spark 核心的概念是 Resilient Distributed Dataset (RDD):一个可并行操作的有容错机制的数据集合。有 2 种方式创建 RDDs:第一种是在你的驱动程序中并行化一个已经存在的集合;另外一种是引用一个外部存储系统的数据集,例如共享的文件系统,HDFS,HBase或其他Hadoop 数据格式的数据源。

(3) Spark Streaming

Spark streaming是Spark核心API的一个扩展,它对实时流式数据的处理具有可扩展性、高吞吐量、可容错性等特点。我们可以从kafka、flume、Twitter、 ZeroMQ、Kinesis等源获取数据,也可以通过由 高阶函数map、reduce、join、window等组成的复杂算法计算出数据。最后,处理后的数据可以推送到文件系统、数据库、实时仪表盘中。Spark Streaming支持一个高层的抽象,叫做离散流( discretized stream )或者 DStream ,它代表连续的数据流。DStream既可以利用从Kafka, Flume和Kinesis等源获取的输入数据流创建,也可以 在其他DStream的基础上通过高阶函数获得。在内部,DStream是由一系列RDDs组成。

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
// Create a local StreamingContext with two working thread and batch interval of 1 sec
ond
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

注意:"local[2]"表示本地开启两个线程来操作。本地工作方式除了集合生成RDD、读取本地文件和hdfs文件能开启一个线程就可以工作外,其他情况都开启至少两条线程才能正常工作。这是由于除以上情况,Spark会开启一个reciver来接受数据,若只有一条线程,reciver就占用唯一线程的资源,而数据处理等操作将没有资源可执行。

setMaster主要是连接主节点,如果参数是"local",则在本地用单线程运行spark,如果是 local[4],则在本地用4核运行

Spark Streaming也可以利用maven仓库。编写你自己的Spark Streaming程序,你需要引入下面的依赖到你的SBT或者Maven项目中

Seconds(1)批处理按间隔1秒进行处理

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.2</version>
</dependency>

为了从Kafka, Flume和Kinesis这些不在Spark核心API中提供的源获取数据,我们需要添加相关的模块 spark-streaming-xyz_2.10 到依赖中。例如,一些通用的组件如下表所示:

Source Artifact
Kafka spark-streaming-kafka_2.10
Flume spark-streaming-flume_2.10
Kinesis spark-streaming-kinesis-asl_2.10
Twitter spark-streaming-twitter_2.10
ZeroMQ spark-streaming-zeromq_2.10
MQTT spark-streaming-mqtt_2.10

为了初始化Spark Streaming程序,一个StreamingContext对象必需被创建,它是Spark Streaming所有流操作的主要入口。一个StreamingContext 对象可以用SparkConf对象创建。

当一个上下文(context)定义之后,你必须按照以下几步进行操作

  • 定义输入源;
  • 准备好流计算指令;
  • 利用 streamingContext.start() 方法接收和处理数据;
  • 处理过程将一直持续,直到 streamingContext.stop() 方法被调用。

几点需要注意的地方:

  • 一旦一个context已经启动,就不能有新的流算子建立或者是添加到context中;
  • 一旦一个context已经停止,它就不能再重新启动;
  • 在JVM中,同一时间只能有一个StreamingContext处于活跃状态在StreamingContext上调用 stop() 方法,也会关闭SparkContext对象。如果只想仅关闭StreamingContext对象,设置 stop() 的可选参数为false;
  • 一个SparkContext对象可以重复利用去创建多个StreamingContext对象,前提条件是前初始化StreamingContext面的StreamingContext在后面StreamingContext创建之前关闭(不关闭SparkContext)

(4) Checkpointing

一个流应用程序必须全天候运行,所有必须能够解决应用程序逻辑无关的故障(如系统错误,JVM崩溃等)。为了使这成为可能,Spark Streaming需要checkpoint足够的信息到容错存储系统中, 以使系统从故障中恢复。

4. Kafka基本概念

object SparkStreamKafka {def main(args: Array[String]) {val sparkConf = new SparkConf().setMaster("local[2]").setAppName("kafka-spark-demo")val streamingContext = new StreamingContext(sparkConf, Duration(5000))val topics = Set("kafka-spark-demo") //设置kafka的topicval kafkaParams = Map[String, Object]("bootstrap.servers" -> "localhost:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "use_a_separate_group_id_for_each_stream","auto.offset.reset" -> "latest","enable.auto.commit" -> (false: java.lang.Boolean))val stream = KafkaUtils.createDirectStream[String, String](streamingContext,PreferConsistent,Subscribe[String, String](topics, kafkaParams))stream.map(record => (record.key, record.value)).print() // 打印streamingContext.start() // 真正启动程序streamingContext.awaitTermination() //阻塞等待}}

(1) Kafka基本概念

kafka 是一个实时、容错、可扩展的分布式发布-订阅消息系统,提供发布-订阅解决方案,主要用于处理活跃的流式数据。https://www.cnblogs.com/wjcx-sqh/p/6723539.html

性能特点

  • 为发布-订阅提供高吞吐量(核心目标):生产:25万消息(50 MB)/s,消费:55万消息(110 MB)/s

  • 支持 online在线应用(消息)和 offline离线应用(数据文件,日志)

  • 持久化:存取代价为O(1)的磁盘数据结构、充分利用磁盘的顺序读写性能,性能稳定、防止数据丢失,同时消息持久化到磁盘,支持批量消费

  • 支持 Hadoop 并行数据加载

  • Zero-Copy 技术:减少IO操作步骤,提高发送性能

  • DelayQueue 机制

  • 消息分组压缩:高效信息传输,减少网络带宽消耗

  • 高峰流量缓冲问题

相关术语

  • Broker:Kafka 集群由一个或多个 Kafka 实例组成,该实例称为代理或 Server 或 Broker

  • Topic:每条发布到 Kafka 集群的消息都有一个类别,该类别称为 Topic(话题),Kafka 根据 Topic 对消息归类存储,Topic 是特定类型的消息流,消息是字节的有效负载(Payload),每条消息由一个key,一个value和时间戳构成

  • Partition:并行处理单元,每个 Topic 划分成一个或多个 Partition(提高 Parallelism),每个 Partition 是一个有序、不可变、可持续追加的消息队列,在存储层面是 AppendLog 文件,该文件由多个大小相等的 Segment 组成,每个 Segment 中存储多条消息,消息 ID 由其逻辑位置决定

  • Producer:生产者,发布(推送)消息到 Kafka Broker

  • Consumer:消费者,从 Kafka Broker 读取(拉取)消息的客户端,可以订阅一个或多个 Topic

  • Consumer Group: 每个 Consumer 属于一个特定的 Consumer Group

注意:Broker 没有主从之分,所有 Brokers 是对等的,Brokers 的元数据信息由 ZooKeeper 维护并被所有的 Consumers 共享。

此外,Partition 只是物理上的概念,物理上不同 Topic 的消息分开存储,逻辑上一个 Topic 的消息虽然会保存于一个或多个 Broker 上,但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处。任何发布到 Partition 的消息都会被直接追加到 log 文件尾部,每条消息在文件中的位置称为 offset(偏移量 或 消息的ID,long 型数字,有序唯一标识),offset 唯一标记一条消息,偏移量是消费者保存和控制的唯一元数据。但是,Kafka 几乎不允许对消息进行“随机读写”(不可变性)。

Kafka 是显式分布式架构,其 5 个重要的组件:Broker,Topic,Producer,Consumer,Zookeeper

ZooKeeper 是一个高性能快速、高可用、容错、分布式的协调服务,可以构建可靠的、分布式的分层数据结构。ZooKeeper 用于管理和协调 Kafka Broker,Producer 和 Consumer 实现 Kafka 注册的接口,Brokers 承担数据信息中间缓存和分发的作用,客户端和服务器端基于 TCP 协议通信。

kafka参数基本设定:

//其中inputBrokers,一般设定为多个IP的访问端口
var kafkaParams = Map[String, Object]("bootstrap.servers" -> inputBrokers,"auto.offset.reset" -> offsetRest,"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","group.id" ->groupId,"enable.auto.commit" -> (false: java.lang.Boolean))

·bootstrap.servers

在启动consumer时配置的broker地址的。不需要将cluster中所有的broker都配置上,因为启动后会自动的发现cluster所有的broker。 它配置的格式是:host1:port1;host2:port2…

·auto.offset.reset(默认值是latest)

这个配置项,是告诉Kafka Broker在发现kafka在没有初始offset,或者当前的offset是一个不存在的值(如果一个record被删除,就肯定不存在了)时,该如何处理。它有4种处理方式:

1) earliest:自动重置到最早的offset

2) latest:看上去重置到最晚的offset

3) none:如果边更早的offset也没有的话,就抛出异常给consumer,告诉consumer在整个consumer group中都没有发现有这样的offset

4) 如果不是上述3种,只抛出异常给consumer

·key.descrializer,·value.descrializer

Message record 的key, value的反序列化类

·group.id

用于表示该consumer想要加入到哪个group中,默认值是 “”

·enable.auto.commit

Consumer 在commit offset时有两种模式:自动提交,手动提交。手动提交在前面已经说过。自动提交:是Kafka Consumer会在后台周期性的去commit。默认值是true。

·security.protocol

https://blog.csdn.net/jsjsjs1789/article/details/53161985

http://orchome.com/270

权限认证,通信协议的支持情况

SSL Kerberos
PLAINTEXT NO NO
SSL YES NO
SASL_PLAINTEXT NO YES
SASL_SSL YES YES

·sasl.mechanism

在server.properties中启用1个或多个SASL机制

a. 主题(topic)

Kafka 组消息抽象归纳为一个主题( Topic ),也就是说,一个主题就是对消息的分类。生产者将消息发送到特定主题,消费者订阅主题或主题的某些分区进行消费。

b. 消息(message)

消息是 Kafka 通信的基本单位,由 个固定长度的消息头和 个可变长度的消息体构成。在老版本中,每 条消息称为 Message :在由 Java 重新实现的客户端中,每 条消息称为 Record

c. 分区和副本(partition and replica)

kafka把每个主题分成一个或多个分区( Partition个分区由 系列有序、不可变的消息组成,是一个有序队列。每个分区又有一个至多个副本( Replica ),分区的副本分布在集群的不同代理上,以提高可用性。

  • 分区

    Kafka将一个主题在逻辑上分成一个或多个分区,每个分区在物理存储上对应一个目录,目录名为topicName−{topicName}-topicName−{partitionId},其中topicName是主题的名字,{topicName}是主题的名字,topicName是主题的名字,{partitionId}是分区编号,每个主题的分区都有唯一编号,分区编号从0依次递增。分区目录下存储的是该分区的日志段,包括日志数据文件和两个索引文件。每个分区又对应一个或多个副本。需要注意的是,分区数可以大于节点数,但副本数不能大于节点数,因为副本需要分布在不同的节点上,这样才能达到备份的目的。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FG24bnq2-1585537039403)(.\png\kafka-action主题3个分区在代理上的分布情况.png)]

  • 副本

    一个分区可以有一个或多个副本,副本根据是否接受读写请求,又分为 Leader 副本和Follower 副本,一个分区有 Leader 副本,有一个或多个 Follower 副本 Leader 副本处理分区的所有读写请求并维护自身及follower 副本的状态信息 ,如 LEO、HW 等,follower 副本作为消费者从Leader 副本拉取消息进行同步。当 Leader 失效时 通过分区 Leader 选举器从副本列表中选出一个副本作为新的 Leader。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-UUsZZ3f5-1585537039403)(.\png\分区多副本分布.png)]

d. 偏移量(offset)

每条消息在日志文件中的位置都会对应一个按序递增的偏移量。由于 Kafka 几乎不允许对消息进行随机读写,因此 Kafka 并没有提供额外索引机制到存储偏移 ,也就是说并不会给偏移量再提供索引。消费者可以通过控制消息偏移量来对消息进行消费,如消费者可以指定消费的起始偏移量。为了保证消息被顺序消费 消费者己消费的消息对应的偏移量也需要保存。旧版消费者将消费偏移量保存到 ZooKeeper 当中,而新版消费者是将消费偏移量保存到 Kafka 内部 个主题当中当然,消费者也可以自己在外部系统保存消费偏移 ,而无需保存到 Kafka中。

e. 代理(broker)

Kafka 集群就是由一个或多个 Kafka实例构成,我们将每一个 Kafka 实例称为代理(Broker),通常也称代理为 Kafka 服务器(KafkaServer) 。每一个代理都有唯 的标识 id ,这个 id 个非负整数。在一个 Kafka集群中,每增加一个代理就需要为这个代理配置 个与该集群中其他代理不同的 id, id 值可以选择任意非负整数即可,只要保证它在整个 Kafka 集群中唯 ,这个 id 就是代理的名字,也就是在启动代理时配置的 broker.id 对应的值,因此在本书中有时我们也称为 brokerId。

f. 生产者(producer)

生产者(Producer )负责将消息发送给代理,也就是向 Kafka 代理发送消息的客户端

g. 消费者和消费组(consumer)

消费者( omsumer )以拉取( pull )方式拉取数据,它是消费的客户端。在 Kafka 中每一个消费者都属于一 个特定消费组( ConsumerGroup ),我们可以为每个消费者指定一个消费组,以groupld 代表消费组名称,通过 group.id 配置设置。同时,每个消费者有一个全局唯一的id,通过配置项client.id指定,如果客户端没有指定消费者的id,kafka会自动为该消费者生成一个全局的唯一的id,格式为groupId−{groupId}-groupId−{hostName}-timestamp−{timestamp}-timestamp−{UUID前8位字符}。同一个主题的一条消息只能被同一个消费组下某一个消费者消费,但是不同消费组的消费者可同时消费该消息。

h. ZooKeeper

Kafka 利用 ZooKeeper 保存相应元数据信息, Kafka 元数据信息包括如代理节点信息、 Kafka集群信息、旧版消费者信息及其消费偏移量信息、主题信息、分区状态信息、分区副本分配方案信息、动态配置信息等。 Kafka 在启动或运行过程当中会在 ZooKeeper 上创建相应节点来保存元数据信息, Kafka 通过监昕机制在这些节点注册相应监听器来监昕节点元数据的变化,从而由 ZooKeeper 负责管理维护 Kafka 集群,同时通过 ZooKeeper 我们能够很方便地对 Kafka集群进行水平扩展及数据迁移。

ZooKeeper是一个分布式应用程序协调服务框架,分布式应用程序可以基于ZooKeeper来实现同步废物、配置维护、命名服务等,ZooKeeper能提供基于类似于文件系统的目录节点树方式的数据存储,通过监控各节点数据状态的变化,达到基于数据的集群管理。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-92zeB5Bk-1585537039404)(.\png\Kafka的集群结构.bmp)]

(2) kafka核心组件

a. 延迟操作组件(DelayedOperation)

Kafka 将一些不立即执行而要等待满足一定条件之后才触发完成的操作称为延迟操作,井将这类操作定义为一个抽象类 DelayedOperation, DelayedOperation 是一个基于事件启动有失效时间的 TimerTask。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-f3Yt332b-1585537039404)(.\png\DelayedOperation方法调用流程.png)]

b. DelayedProduce

DelayedProduce 是协助 ReplicaManager 完成相应延迟操作的,而ReplicaManager 的主要功能是负责将生产者发送的消息写入 Leader 本、管理 Follower 副本Leader 副本之间的同步以及副本角色之间的转换 DelayedProduce 显然是与生产者发送消息相关的延迟操作,因此只可能在消息写入 Leader 副本时需要 DelayedProduce 的协助。DelayedProduce 的作用就是协助副本管理器在 acks 为- 的场景时,延迟回调 responseCallback 向生产者做出响应。

(3) 控制器

在启动 Kafka 集群时,每一个代理都会实例化并启动一个 KafkaController ,并将该代理的brokerId 注册到 ZooKeeper 的相应节点当中。Kafka 集群中各代理会根据选举机制选出其中一个代理作为 Leader, 即 Leader 控制器。控制器负责主题的创建与删除、分区和副本的管理以及代理故障转移处理等。

相关术语

  • controller_epoch:用于记录控制器发生变更次数,初始值为0,控制器发生变更,每次选出一个新的控制器则加1。该字段对应 ZooKeeper controller_epoch 节点,通过登录 ZooKeeper客户端执行 get/controller epoch 命令,可以查看该字段对应的值。
  • zkVersion:作用类似数据库的乐观锁,用于更新ZooKeeper路径下相应元数据信息。
  • leader_epoch:分区Leader更新次数。
  • 己分配副本(assigned replica):每个分区的所有副本集合被称作己分配副本,简写为AR。

主题管理

由于分区、副本是主题的固有属性,因此在讲解控制器对主题管理时将同时讲解控制器对分区副本创建及删除的管理操作。控制器对分区、副本的管理在逻辑上体 在分区状态 以及副本状态机ZooKeeper的/brokers topic 节点及其子节点注册的 一系列监昕器上。

  • 创建主题

    当创建一个主题时会在 ZooKeeper /brokers topics 目录下 个与主题同名的节点该节点下会记录该主题的分区副本分配方案。

    当创建一个主题时,该主题及分区副本分配信息写入/brokes/topics路径下后就会触发TopicChangeListener监听器的handleChildChange()方法进行处理,在ControllerContext实例化时创建了一个ReentrantLock锁对象,handleChildChange()方法是在获取该重入锁的条件下进行处理的。

  • 删除主题

    客户端执行删除主题操作时仅是在 ZooKeeper admin/delete topics 径下创建一个与待删除主题同名的节点,返回该主题被标记为删除,保证本步操作成功执行的前提是配置项
    delete.topic.enable 值被设置为 true。

    在控制器实例化时创建了一个分区状态机,而分区状态 注册了一个监听 ZooKeeper
    admin/delete topics 子节点变化的监听器,即 DeleteTopicsListener 监听器。当客户端执行删除主题操作将待 除主题写入/admin/delete_topics 路径下时,将会触发该监昕器。在该监听器的handleChildChange()方法中执行实际删除主题操作。

    分区管理

    Kafka 控制器对分区的管理包括对分区创建及删除的管理,分区 Leader 选举的管理,分区
    自动平衡、分区副本重分配的管理等。

(4) 协调器

Kafka 提供了消费者协调器(ConsumerCoordinator)、组协调器 (GroupCoordinator)和任务管理协调器(WorkCoordinator) 种协调器(coordinator)消费者协调器和组协调器,这两种协调器与消费者密切相关。为了解决消费者依赖ZooKeeper所带来的问题,kafka在服务端引入了组协调器(GroupCoordinator),每个kafkaServer启动时都会创建一个GroupCoordinator实例,用于管理部分消费组和消费组下每个消费者的消费偏移量。同时在客户端引入了消费者协调器(ConsumerCoordinator),每个 KafkaConsumer 实例化时会实例化 一个ConsumerCoordinator 对象,消费者协调器负责同一个消费组下各消费者与服务端组协调器之间的通信。

a. 消费者协调器(ConsumerCoordinator)

消费者协调器是KafakConsumer的一个成员变量,KafakConsumer通过消费者协调器与服务端的组协调器进行通信。消费者协调器负责处理更新消费者缓存的 Metadata 请求,负责向组协调器发起加入消费组的请求,负责对本消费者加入消费组前、后相应的处理,负责请求离开消费组(如当消费者取消订阅时),还负责向组协调器发送提交消费偏移量的请求。总之,消费者协调器负责消费者与组协调器通信。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MukKSGtP-1585537039404)(.\png\消费者协调器底层实现所依赖的组件的类图.png)]

b. 组协调器(GroupCoordinator)

组协调器( GroupCoordinator )负责对其管理的组员提交的相关请求进行处理,这里的组员即消费者。它负责管理与消费者之间建立连接,并从与之连接的消费者之中选出一个消费者作为 Leader 消费者, Leader 费者负责消费者分区的分配,在SyncGroupRequest 请求时发送给组协调器,组协调器会在请求处理后返回响应时下发给其管理的所有消费者。同时,组协调器还管理与之连接的消费者的消费偏移量的提交,将每个消费者消费偏移量保存到 Kafka 的内部户主题当中,并通过心跳检测来检测消费者与自己的连接状态。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3Zw31mFg-1585537039405)(.\png\消费者加入消费组的过程.png)]

c. 消费偏移量管理

新版的 KafkaConsumer 将消费偏移量保存到 Kafka 一个内部主题中,当消费者正常运行或者进行平衡操作时都要向组协调器提交当前的消费偏移量。组协调器负责消费组的管理及消费偏移量的管理,但客户端可以仅选择让组协调器管理消费偏移量,例如,当客户端通过 assign()方法订阅指定的分区时,就不用 Kafka 负责分区的分配。当组协调器收到 OffsetCommitRequest请求时,会进行相应的检查判断 若满足偏移量处理的条件时,就会调用 GroupCoordinator.doCommitOffsets()方法进行处理。这里所说的偏移量处理的条件有两种情况:一种是该消费组
的成员提交的消费偏移量,另一种是仅选择让组协调器负责消费偏移量的管理的消费者提交的请求。若不满足偏移量提交条件就会调用回调函数返回相应的错误码。

d. 网络通信服务

在KafkaSever启动时,初始化并启动了一个SocketSever服务,用于接受客户端的连接,处理客户端请求,发送相应等,同时创建一个KafkaRequestHandlerPool用户管理KafkaRequestHandler。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lgEitFAM-1585537039405)(.\png\Kafk网络层线程模型.png)]

一个Acceptor线程负责接受客户端所有的连接,N个Processor线程,每个Processor有多个Selector,负责从每个连个中读取请求;M个Handle线程处理请求,并将产生的请求返回给Processor线程。

相关术语

  • Acceptor

    Acceptor的主要职责是监昕并接受客户端(统指请求发起方) 的请求,建立和客户端的数据传输通道 ServerSocketChannel ,然后为客户端指定一个 Processor。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fa54vpAl-1585537039406)(.\png\Acceptor在Kafka网络层连接中的地位.png)]

  • Processor

    Processor 也是一个线程类,继承 AbstractServerThread 类,主要用于从客户端读取请求数据和将相应的响应结果返回给客户端。 Processor 定义了一个 ConcurrentLinkedQueue[SocketChannel]类型的 newConnections 队列,该队列用来保存新连接的交由本 Processor处理的 SocketChanneI ;定义了一个Map[String, RequestChannel.Response 类型的 inflightResponses 集合,用来记录还未发送的响应;定义了一个管理网络连接的KSelector 类型的selector 字段。同时,Processor构造方法还接受一个由调用者传入的RequestChannel 对象, RequestChannel是Processor与Handler线程之间交换数据的队列 ,用于暂存通信的Request和Response。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EUUTVjNN-1585537039406)(.\png\Processor线程run()]方法执行逻辑流程.png)

  • Processor

    Processor 也是一个线程类,继承 AbstractServerThread 类,主要用于从客户端读取请求数据和将相应的响应结果返回给客户端。 Processor 定义了一个 ConcurrentLinkedQueue[SocketChannel]类型的 newConnections 队列,该队列用来保存新连接的交由本 Processor处理的 SocketChanneI ;定义了一个Map[String, RequestChannel.Response 类型的 inflightResponses 集合,用来记录还未发送的响应;定义了一个管理网络连接的KSelector 类型的selector 字段。同时,Processor构造方法还接受一个由调用者传入的RequestChannel 对象, RequestChannel是Processor与Handler线程之间交换数据的队列 ,用于暂存通信的Request和Response。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-N3dvb9mX-1585537039406)(.\png\RequestChannel缓冲处理逻辑.png)]

5. Kafka日志管理

日志管理(LogManager)是 Kafka 用来管理所有日志的,也称为日志管理子系统(Log
Management Subsystem)。它负责管理 日志的创建与、日志检索、日志加载和恢复、检查点及日志文件刷写磁盘以及日志清理等。

import org.apache.log4j.LoggerLogger.getLogger("org").setLevel(Level.ERROR)

(1) Kafka日志结构

注意通过 Kafka自带的用于主题管理操作的脚 kafka-topics.sh 来修改某个主题区数,但只能增加一个主题的分区数而不能减少其分区数。

在存储 结构上分区的每个副本在逻辑上对应一个Log对象,每个Log又划分为多个LogSegment,每个LogSegment包括一个日志文件和两个索引文件,其中两个索引文件分别为偏移量索引文件和时间戳索引文件。Kafka 将日志文件封装为一个 FileMessageSet 对象,将两个索引文件封装为 Offsetlndex和Timelndex 对象。 Log和LogSegment 都是逻辑上的概念, Log 是对副本在代理上存储文件的逻辑抽象, LogSegrnnent 是对副本存储文件下每个日志片段的抽象,日志文件和索引文件才与磁盘上的物理存储相对应。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QMK4IlTM-1585537039407)(.\png\Kafka日志存储结构中的映射关系.png)]

(2) 日志加载及恢复

在日志管理器初始化时,会调用 loadLogs()方法加载和恢复 log.dir 配置项指定目录下的分区文件,为每个分区文件创建一个Log 对象。

Log 类在逻辑层面上可以理解为与分区对应,也就是说 Log 类封装了对一个分区的基本操作,从实现层面而言,也是对日志段( LogSegment )操作和管理的封装。在实例化 Log 对象时,Log 会完成该分区目录下所有日志段的恢复操作,并将日志段加载到 ConcurrentSkipListMap型的 segments 集合中。 ConcurrentSkipListMap 具有跳跃表的功能,适用于高并发访问,多个线程可以安全地并发进行插入、删除、更新和访问操作,该集合的特性为通过偏移量快速查找日志提供了保证。

Log恢复和加载日志段由Log.loadSegments()方法实现。

(3) 日志清理

Kafka分成多个日志段文件而不是一个文件便于清理操作,可以通过日志段的更新事件或是日志段的大小控制进行日志清理。Kafka 提供了日志 删除(delete)和日志压缩(compact)两种清理日志的策略,通过参数cleanup.policy来指定日志清理的策略。日志清理粒度可以控制到主题级别,我们可以通过参数cleanup.policy 为每个主题指定不同的清理策略。当然也可以在代理启动时通过配置项log.cleanup policy 指定日志清理策略,这样该代理上的所有分区日志清理默认使用该配置设置的策略,主题级别的策略设置会覆盖代理级别的配置。

  • 日志删除

    日志管理启动时会启动一个后台定时任务线程用于定时删除日志段文件。该删除线程每隔${log.retention .check .interval.ms}毫秒检查一次是否该进行日志删除,默认是每5分钟执行一次。基于日志保留时长的配置有 log.retention .hours、log.retention minutes 、log.retention.ms。

    日志删除线程调用日志管理器的cleanupLogs()方法进行日志删除操作,该方法再调用Log.deleteOldSegments()方法查找并删除该待删除的日志段文件。该方法逻辑代码:

    def deleteOdlSegments():Int={//保证清理策略是deleteif(!config.delete) return 0//查找基于保留时长及日志段大小的待删除的旧日志段文件deleteRetentionMsBreachedSegments()+deleteRetentionSizeBreachedSegments()
    }
    

    删除时会通过 Log.deleteRetenionMsBreachedSegments()方法查找保留时长超过预设值的待删除的日志段,以及通过 Log.deleteRetentionSizeBreachedSegments()方法查找待删除的文件,以保证磁盘上的日志大小不超过 retention.bytes。若需要通过日志段大小来删除日志,需要保证{retention. bytes}。若需要通过日志段大小来删除日志,需要保证retention.bytes。若需要通过日志段大小来删除日志,需要保证 {retention.bytes}的值大于0。

  • 日志压缩

    这种策略是一种更细粒度的清理策略,它基于消息的Key,通过压缩每个Key对应的消息只保留最后一个版本的数据,该Key对应的其他版本在压缩时会被清楚,类似数据库的更新操作。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pUn72Le9-1585537039407)(.\png\日志压缩过程.png)]

压缩步骤:

通过CleanerManager.grabFilthiestCompactedLog()方法查找满足压缩条件的Log,即可清理比例大于预设阈值并且Log没有处于LogCleaningInProgress状态。其中可清理比例是指dirty段的字节总数与日志段总字节数之比, 只有当Log可清理比例不小于${min.cleanable.dirty ratio}时,即代理级别的配置log.cleaner.min.cleanable.ratio默认为 0.5,才有可能成为被压缩的对象。通过Cleaner对象的clean()方法执行真正的压缩逻辑。根据日志的最晚更新时间与${delete.retention.ms}计算需要删除的日志时间戳,记为deleteHorizonMs,日志段的最晚时间与该时间戳比较作为日志段是否保留的判断条件之一。将Log从0到endOffset的消息以LogSegment为单位进行分组,每组LogSegment字节大小不超过log.config.segmentSize,每组索引大小不能超过log.confing.maxIndexSize。分组之后通过Cleaner.cleanSegments()方法进行压缩。压缩操作的实质是将满足保留条件的消息复制到以“ .cleaned ”为后缀的数据文件中。对两个索引文件进行处理,去掉多余的索引项,同时将压缩后的日志段数据刷到磁盘。更新压缩后的日志段的最后修改时间,然后调用 Log.replaceSegments()方法进行处理,将文件后缀由“ .cleaned ”修改为“ .swap ”,并将压缩后的日志段加入到 segments 集合中,然后将分组中的所有 LogSegment 从Log的segments 集合中删除,并执行对这些日志段的删除操作。最后将“ .swap ”后缀去掉。

注意:

日志清理与日志删除区分开,日志删除是删除整个日志段,而日志清理是将相同key的日志进行合并,只保留该key最后一个值,将合并后的数据构成新的日志段,同时删除原来的日志段。

(4) 副本管理器

只要保证至少有一个代理存活就不会影响整个集群的工作,从而大大提高了 Kafka 集群的可靠性和稳定性。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-62J6kUug-1585537039407)(.\png\过期副本检查基本流程.png)]

(5) 追加消息

当生产者发送消息(ProduceRequest)或是消费者提交偏移量到内部主题时,由副本管理器的appendMessages()将消息追加到相应分区的Leader副本中。方法定义如下:

appendMessages(timeout:Long,requiredAcks:Short,internalTopicsAllowed:Boolean,messagesPerPartition:Map[TopicPartition,MessageSet],responseCallback:Map[TopicPartition,PartitionResponse])

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-poUM9wls-1585537039407)(.\png\副本管理器追加消息到Leader副本的时序图.png)]

(6) 拉取消息

副本管理器除了负责将消息写入 Leader 副本外,同时还负责处理 KafkaApis的FetchRequest
请求,通过 ReplicaManager.fetchMessages()方法从分区 Leader 副本获取消息,其实是由 KafkaApis在handleF etchRequest()方法中调用ReplicaManager.fetchMessages()方法 。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qhfIEYR5-1585537039407)(.\png\副本管理器对FecthRequest处理的基本流程.png)]

(7) 副本同步过程

ReplicaManager 初始化时会创建一个 ReplicaFetcherManager 对象 Follower副本与 Leader 副本之间的数据同步就是由 ReplicaFecherManager完成的。ReplicaFetchManager继承 AbstractFetcherManager类,该类定义了一个 fetcherThreadMap 用于保存对每个代理的拉取请求的 Fetcher 线程。同时还提供了 个由子类来实现的抽象方createFetcherThread(),用于创建拉取线程,以及对 fetcherThreadMap 管理相关的方法,主要包括以下方法:

  • addFetcherForPartitions()方法:用于为分区添加Fetcher线程,其实就是将分区添加到ReplicaFetcherThread线程中,一个ReplicaFetcherThread可以对应多个分区,也就是说多个分区公用一个Fetcher线程,由该Fetcher线程负责这些分区的数据拉取操作。fetcherThreadMap的Key是一个 BrokerAndFetcherld 对象,该对象包括两个属性 BrokerEndPoint和Fetcher线程的 id, BrokerEndPoint 封装了连接代理的 host port 信息,Value为一个AbstractFetcherThread对象,在添加分区到 Fetcher 线程时, 若fetcherThreadMap中还没有与该分区代理连接的Fetcher 线程,则创建之,否则直接将分区添加到对应的 Fetcher 线程中。

  • removeFetcherForPartitions()方法:用于 fetcherThreadMap 中找到 Fetcher该分区线程,从Fetcher 中移除该分区, 也就移除了该分区同步数据的线程,在关闭副本时就需要调用该方法,移除相应的 Fetcher 线程。

  • shutdownIdleFetcherThreads()方法:当一个Fetcher线程不再包含任何分区时,该Fetcher线程就会被关闭

6. Kafka核心流程分析

(1) kafka启动流程

KafkaServer启动的工作是由KafkaServer.startup()来完成的,在Kafka.startup()方法中会完成相应组价的初始化并启动这些组件。

(2) 创建主题流程分析

服务端创建主题,在控制器中已经介绍,以下是客户端创建主题。

客户端通过调用TopicCommand.createTopic(zkUtils:ZkUtils,opt:TopicCommandOptions)方法创建主题。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Iq4ZJurR-1585537039408)(.\png\KafkaServer初始化流程.png)]

(3) 生产者

在对生产者和消费者实现原理讲解时也是重点对 Java 重新实现的新版生产者和消费者的执行流程进行讲解。

  • 生产者重要配置说明

    Kafka 为生产者提供3种消息确认机制(acks),用于配置代理接收到消息后向生产者发送确认信号,以便生产者根据 acks 进行相应处理,该机制通过属性request.required.acks 设置,取值可为0 、-1 、1 中之一 ,默认取1。

    当acks=0时,生产者不用等待代理返回确认信息,而是连续发送信息。优点:加快了消息投递的速度;缺点:无法保证消息是否已被代理接受,有可能存在丢失数据的风险。

    当acks=1时,生产者需要等待Leader副本已成功将消息写入日志文件中。优点:降低数据丢失的可能性。

    当acks=-1时,Leader 副本和所有 ISR 列表中的副本都完成数据存储时才会向生产者发送确认信息,这种策略保证只要 Leader 副本和 Follower 副本中至少有一个节点存活,数据就不会丢失。优点:保证了数据不丢失;缺点:影响了生产者发送消息的速度以及吞吐量。

  • KafkaProducer实现原理

    KafkaProducer在实例化时首先会加载和解析生产者相关的配置信息并封装成 ProducerConfig对象,然后根据配置项主要完成以下对象或数据结构的实例。

    • 从配置项中解析出clientId,客户端指定该配置项的值以便追踪程序运行情况,在同一个进程内,当有多个KafkaProducer时,若没有配置cient.id,则clientId以前缀“producer-”后加一个从1递增的整数。
    • 根据配置项创建和注册用于 Kafka metrics 指标收集的相关对象,用于对 Kafka 集群相关指标的追踪。
    • 实例化分区器。分区器用于为消息指定分区,客户端可以通过实现 Partitioner 接口自定义消息分配分区的规则。
    • 实例化消息Key和Value进行序列化操作的Serializer。
    • 根据配置实例化一组拦截器(ProducerInterceptor),用户可以指定多个拦截器。多个拦截器会被顺序调用执行。
    • 实例化用于消息发送相关元数据信息的 MetaData 对象。MetaData是被客户线程共享的,因此它必须线程安全的。
    • 实例化用于存储消息的 RecordAccumulator RecordAccumulator 的作用类似一个队列,这里称为消息累加器。
    • 根据指定的安全协议${security.protocol}创建一个 Channe!Builder, Kafka 目前支持PLAINTEXT,SSL, SASL_PLAINTEXT,SASL_SSL和TRACE5种协议。然后创建NetworkClient实例,这个对象的底层是通过维持一个Socket连接来进行TCP通信的,用于生产者与各个代理进行Socket通信。由 NetworkClient对象构造一个用于数据发迭的 Sender 实例sender 线程,最后通过 sender 创建一个 KafkaThread 线程,启动该线程,该线程是一个守护线程,在后台不断轮询,将消息发送给代理。
  • send过程分析

    在KafkaProducer 实例化后,调用 KafkaProducer.send() 方法进行消息发送。send操作没有发起网络请求,只是将消息发送到消息缓冲区,而网络请求是由KafkaProducer实例化时创建的Sender线程来完成的。后台线程Sender不断循环,把消息发送给Kafka集群。一个完整的KafkaProducer发送消息过程:

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2SzW4cQR-1585537039408)(.\png\KafkaProducer生产消息主体流程.png)]

(4) 消费者

SimpleConsumer和ZooKeeperConsumerConnector ,我们习惯称 SimpleConsumer 为低级消费者,称 ZooKeeperConsumerConnector 高级消费者。通过Java 语言重新实现的消费者 KafkaConsumer 我们称为新版消费者。

  • KafkaConsumer初始化

    KafkaProducer 是线程安全的,然而 KafkaConsumer 是非线程安全的。KafkaConsumer定义了一个acquier()方法用来检测每个方法的调用是否只有一个线程在操作,在KafkaConsumer底层实现时我们可以看到每个方法的第一步就是检测当前方法是否有其他线程正在执行,若有其他线程正在操作即发生并发操作,则抛出ConcurrentModificationException异常。acqurie()方法和release()成对出现与锁的 lock和unlock 用法类似。

    KafkaConsumer 实现了Consumer接口,Consumer定义了对外提供的API,主要包括订阅消息的subscribe()方法和assign()方法,分别用来指定订阅主题和订阅主题的某些分区;poll()方法,用于拉取消息;seek()方法、seekToBeginning()方法和seekToEnd()方法,用来指定消费起始位置;commitSync()方法和 commitAsync()方法,分别用来以同步和异步方式提交消费偏移量;

    KafkaConsumer初始化与KafkaProducer实例化相类似,只不过实例化的组件不同。KafkaConsumer实例化就是从ConsumerConfig中提取相应的消费者级别的配置实例化相应的组件。

    属性名 默认值 描述
    group.id / 消费组 id ,新版本消费者必须由客户端指定
    client.id / KafkaConsumer 对应的客户端 id ,客户端可以不指定, Kafka 会自动生成一个 clientld 字符串
    key.deserializer / 消息的 Key 反序列化类,需要实现 org.apache.ka fka.common.serialization.Deserializer 接口
    value.deserializer / 消息的 Value 反序列化类,需要实现 org.apache.ka f ka.common.serialization.Deserializer 接口
    enable.auto.commit true 是否开启自动提交消费偏移量
    max.poll.records 500 一次拉取消息的最大数量
    max.poll.interval.ms 300000 配置指定拉取消息线程最长空闲时间
    send.buffer.bytes 128KB
  • 消费订阅

    KafkaConsumer 提供了两种订阅消息 方法,一种是通过 KafkaConsumer.subscribe()方法指定消息对应的主题,支持以正则表达式方式指定主题,另一种是通过 KafkaConsumer.assign ()方法指定需要消费的分区。第一种订阅方式由同一个消费组的Leader 消费者根据各消费者都支持的分区分配策略为消费者分配分区。同时在订阅主题时可以指定一个ConsumerRebalanceListener,在消费者发生平衡操作时回调处理。第二种订阅方式客户端直接指定了消费者与分区的对应关系。

  • 消费消息

    KafkaConsumer 提供了一个poll()方法用于从服务端拉取消息,该方法通过Fetcher类来完成消息的拉取及更新消费偏移量,因此对KafkaConsumer消费消息的讲解,首先必须讲解Fetecher拉取消息的过程。

    Fetcher主要功能是负责构造拉取消息的 FetchRequest 请求,然后通过 ConsumerNetworkClient 发送 FetchRequest 请求,最后对返回的结果进行处理并更新缓存中记录的消费位置。

    在查找到所有"可拉取消息"的分区集合之后,迭代集合中的每个分区,查找该分区的Leader副本所在的节。如果Leader节点不存在,则设置metadata更新标识为true,触发Kafka元数据信息的更新操作,由于分区Leader副本对应的节点不存在,因此本次拉取消息将忽略该分区。若Leader副本对应的节点存在,同时unsent队列中不包括将要发往该Leader节点的请求,并且inFlightRequests 也不包括发往该节点的请求,则构造与该分区对应的 FetchRequest.PartitionData 对象,并将该对象保存到 **fetchable 集合中, fetchable是一个 Map<Node,LikedHashMap<TopicPartition,FetchRequest.PartitionData>>**类型的集合,这样就按分区 Leader 节点进行了分组,最后再遍历 fetchable 中的每个元素,根据每个元素的值构造 FetchRequest 最终将 fetchable转换为Map<Node,FetchRequest>类型的requests集合。

  • 消费偏移量提交

    新版消费者将消费偏移量保存到Kafka一个内部主题"_consumer_offsets"中,消费偏移量如同普通消息一样追加到该主题相应的分区当中。Kafka内部主题配置了"compact"策略,这样不仅保证了该主题总保留个分区将消费的最新偏移量,而且控制了该主题的日志容量。通过该消费者对应的消费者(${group.id})与该主题分区总数取模的方式来确定消费偏移量提交的分区。

    Kafka 提供了两种提交消费偏移量的方式:KafkaConsumer 自动提交和客户端调用KafkaConsumer 相应 API 提交,后者提交偏移量的方式通常也称为手动提交。

    手动提交

    由客户端调用 API 提交消费偏移量需要在实例化 KafkaConsumer 时设置enable auto.commit配置项为也false。Kafka 提供了同步提交 commitSync()方法和异步提交 commitAsync()方法供客户端提交消费偏移量,这两种方法分别调用的是 ConsumerCoordinator的commitOffsetsSync()方法和commitOffsetsAsync()方法。两种提交消费偏移量方法的区别在于:

    使用同步提交时,KafkaConsumer在提交请求响应结果返回前会一直被阻塞,在成功提交后才会进行下 次拉取消息操作;

    使用异步提交时,KafkaConsumer不会阻塞,这样当提交发生异常时就有可能发生重复消费的问题,但异步方式会提高消费吞吐量。

    自动提交

    KafkaConsumer 自动提交消费偏移量,在 KafkaConsumer 实例化时需设置 enable.auto.commit为
    true,同时可以通过配置项 auto.commit.interval ms 来设置提交操作的时间间隔。

7. Kafka API编程实战

Kafka提供4大核心API:Producer API,Consumer API,Streams API,Connect API。

Maven管理工程,pom.xml添加依赖库

         <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>1.0.0</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.0.0</version></dependency>

(1) 主题管理

由于主题的元数据信息是注册在 ZooKeeper 相应节点之中,所以对主题的操作实质是对
ZooKeeper 中记录主题元数据信息相关路径的操作。Kafka 将对 ZooKeeper 的相关操作封装成一个ZkUtils,并封装了一个 AdminUtils 类调用 ZkClient 类的相关方法以实现对 Kafka 元数据的操作,包括对主题、代 、消费者等相关元数据的操作 。

a. 创建主题

首先实例化ZkUtils对象,然后调用AdminUtils.CreateTopic()方法创建主题,API创建主题和命令行创建主题基本相似,需要指定主题的分区数以及副本数,同时可以设置主题
级别的配置。由于采用Kafka 自动副本分配策略时支持指定代理机架信息,因此通过API
创建主题时,可以指定机架感知类型,这里采用默认类型。

    //连接zkprivate static final String ZK_CONNECT="server-1:2181,server-2:2181,server-3:2181";//session 过期时间private static final int SESSION_TIMEOUT=30000;//连接超时时间private static final int CONNECT_TIMEOUT=30000;
/*** * @param topic 主题名称* @param partition 主题的分区数* @param repilca 主题的副本数* @param properties 配置信息*/
public static void createTopic(String topic, int partition, int repilca, Properties properties) {ZkUtils zkUtils = null;try {//实例化ZkUtilszkUtils = ZkUtils.apply(ZK_CONNECT, SESSION_TIMEOUT, CONNECT_TIMEOUT,JaasUtils.isZkSecurityEnabled());if (!AdminUtils.topicExists(zkUtils, topic)) {//主题不存在则创建AdminUtils.createTopic(zkUtils, topic, partition, repilca, properties,AdminUtils.createTopic$default$6());} else {//TODO 进行相应处理,如打印日志等}} catch (Exception e) {e.printStackTrace();} finally {zkUtils.close();}}

其中ZK_CONNECT、SESSION_TIMEOUT 和CONNECT_TIMEOUT字段是自定义的常量,
分别指连接ZooKeeper 集群的地址、与ZooKeeper 连接Session 过期时间以及连接ZooKeeper的超时时间。

创建主题的方法返回类型是void ,在客户端创建主题时井不能真正保证创建主题成功,客户端创建主题仅是在Zoo Keeper 相应路径创建节点井写入主题元数据信息,客户端创建主题若没发生异常则表示在ZooKeeper 写入主题元数据信息成功。

b. 修改主题级别配置

配置修改是覆盖操作,但是每次修改时如果没有包括前一次相应的配置,在本次修改之后,不包括在本次修改的配置将恢复到默认值。

建议在每次修改前,先查询主题当前的配置,然后在此基础上进行修改。

 public static void rnodifyTopicConfig(String topic, Properties properties) {ZkUtils zkUtils = null;try {zkUtils = ZkUtils.apply(ZK_ CONNECT, SESSION_TIMEOUT, CONNECT_TIMEOUT,JaasUtils.isZkSecurityEnabled());//添加新修改的配置Properties curProp = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), topic);curProp.putAll(properties);AdminUtils.changeTopicConfig(zkUtils, topic, curProp);} catch (Exception e) {e.printStackTrace();} finally {zkUtils.close();}}

其中待修改的配置项以键值对形式保存在Java.util.Properties 对象中。删除对配置项所进行的修改,只需查询出当前配置项,然后从Proper世es 中移出相应配置井调用AdminUtiJs.changeTopicConfig()方法修改配置进行覆盖操作。

c. 增加分区

Kafka 提供了一个AdminUtils.addPartitions()方法为一个主题增加分区,在增加分区时可以指定分区副本分配方案,也可以不指定。若不指定分配方案,则Kafka 采用默认分区副本分配策略自动分配。

假设一个主题名为“partition-api-foo”的主题,该主题有一个分区、两个副本,当前分配方案为{”version”: 1 ,”partitions ”:{”0”:[3,1 ]}}。现在增加一个分区,指定新增加的分区两个副本分配在brokerld 为2 和3 的两个节点上,由于当前己有一个分区,新增加的分区编号自然为l 。正确的分配方案格式为“3:1 2:3 ”,其中“3:1 ”表示分区0 的两个副本对应的brokerld ,根据副本与分区对应规则,“ 2 :3 ”表示分区1 的两个副本对应的brokderld 。在分配时首先根据当前己有的分区数n 从分配方案中剔除前n 组副本分配信息,从第n+l 组开始依次为新增分区的副本分配方案。

AdminUtils.addPrtitions(zkUtils,"partiton-api-foo",2,"3:1,2:1",true,Admin.addPrtitions$default$6())

参数配置:

(1) 第3 个参数是指定分区总数。例如,某个主题当前己有一个分区,若希望再为该主题增加两个分区,此时该参数应传3 而不是2 。

(2) 第4 个参数是指定副本分配方案。与命令行数据格式不同,不同分区的副本用逗号分
隔,同一个分区的多个副本之间以冒号分隔。同时需要注意的是,副本分配方案要包括己有分区的副本分配信息,根据分配顺序从左到右依次与分区对应,分区编号递增

d. 分区副本重分配

Kafka 并没有提供直接增加副本的API ,但提供了修改分区副本分配方案的方法,AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK()。通过该方法可以实现分区副本重分配,同时也可以通过该接口为某个主题增加分区和副本。

ZkUtils zkUtils=null;
try{//1.实例化ZkUtilszkUtils = ZkUtils.apply(ZK_ CONNECT, SESSION_TIMEOUT, CONNECT_TIMEOUT,JaasUtils.isZkSecurityEnabled());//2.获取代理元数据信息BrokerMetadata信息Seq<BrokerMetadata> brokerMeta=AdminUtils.getBrokerMetadatas(zkUtils,AdminUtils.getBrokerMetadatas$default$2(),AdminUtils.getBrokerMetadatas$default$3());//3.生成分区副本分配方案:2个分区,3个副本Map<Object,Seq<Object>> replicaAssign = AdrninUtils.assignReplicasToBrokers
(brokerMeta,2,3,AdrninUtils.assignReplicasToBrokers$default$4(),
AdrninUtils.assignReplicasToBrokers$default$5());//4.修改分区副本分配方案,主题名是"partition-api-foo"AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils,"partition-api-foo",replicaAssign,null,true)
}catch(Exception e){e.printStackTrace();
}finally{//5.释放与ZooKeeper的连接zkUtils.close();
}

e. 删除主题

删除指定的主题

AdminUtils.deleteTopic(zkUtils,topic)

(2) 生产者API应用

a. 单线程生产者

实现一个简单的Kafka 生产者一般步骤如下:

  • 创建Properties 对象,设置生产者级别配置。以下3 个配置是必须指定的:

    • bootstrap.servers:配置连接Kafka代理列表,不必包含Kafka集群所有的代理地址,当连接上一个代理后,会从集群元数据信息中获取其他存活的代理信息。但为了保证能够成功连接上Kafka集群,在多代理集群的情况下建议至少配置两个代理。
    • key.serializer:配置用于序列化消息Key的类。
    • value.serializer:配置用于序列化消息实际数据的类。
  • 根据Properties对象实例化一个KafkaProducer对象。

  • 实例化ProducerRecord对象,每条消息对应一个ProducerRecord对象。

  • 调用KafkaProducer发送消息的方法将ProducerRecord发送到Kafka相应节点。

    Kafka提出了两个发送消息的方法,即send(ProducerRecord<String,String>record())方法和send(ProducerRecord<String,String>record(),Callback callback)方法,带有会掉函数的send()方法实现org.apache kafka.clients.producer. Callback 接口。如果消息发送发生异常,Callback接口的onCompletion会捕获到相应异常。

    KafkaProducer默认是异步发送消息,会将消息缓存到消息缓冲区中,当消息在消息缓冲区中累计到一定数量后作为一个RecordBatch在发送。

    ProducerRecord 含义:发送给Kafka Broker的key/value值对,默认的内部数据结构

    – Topic(主题)

    –PartitionID(可选),不选可为NULL

    –key

    –value

  • 关闭KafkaProducer,释放连接的资源。

以下是生产者将模拟股票行情信息发送到Kafka集群中。为了简化程序,通过一组随机数模拟股票行情信息,将股票信息封装为一个JavaBean,该JavaBean对象类名为StockQuotationInfo.java,同时覆盖其toString() 方法,这样便于调用该对象的toString方法得到的字符串作为消息内容分,除此之外,也省略了相应字段的get和set方法。

 public class StockQuotationInfo implements Serializable{private static final long serivalVersionUID=1L;//股票代码private String stockCode;//股票名称private String stockName;//交易时间private long tradeTime;//昨日收盘价private float preClosePrice;//开盘价private float openPrice;//当前价,收盘时即为当日收盘价private float currentPrice;//今日最高价private float highPrice;//今日最低价private float lowPrice;//...省略各属性的get和set方法@Overridepublic String toString(){return this.stockCode+"|"+stockName+"|"+tradeTime+"|"+preClosePrice+"|"+openPrice+"|"+currentPrice+"|"+highPrice+"|"+lowPrice;}}

下面是行情推送的生产者类QuotationProducer.java,在一个静态代码块中创建一个KafkaProducer,同时定义一个构造Properties对象的initConfig()方法和一个产生股票行情信息的createQuotationInfo()方法。

 public class QuotationProducer {private static final Logger logger = LoggerFactory.getLogger(QuotationProducer.class);//设置实例生产消息的总数private static final int MSG_SIZE = 100;//主题名称private static final String TOPIC = "stock_quotation";//Kafka集群private static final String BROKER_LIST = "server-1:9092,server-2:9092,server-3:9092";private static KafkaProducer<String, String> producer = null;static {//1.构造用于实例化KafkaProducer的Properties信息Properties configs = initConfig();//2.初始化一个KafkaProducerproducer = new KafkaProducer<String, String>(configs);}/*** 初始化Kafka配置** @return*/private static Properties initConfig() {Properties properties = new Properties();//Kafka broker列表properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);//设置序列化类properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());return properties;}private static StockQuotationInfo createQuotationInfo() {StockQuotationInfo quotationInfo = new StockQuotationInfo();//随机产生1到10之间的整数,然后与600100相加组成股票代码Random r = new Random();Integer stockCode = 600100 + r.nextInt(10);//随机产生一个0到1之间的浮点数float random = (float) Math.random();//设置涨跌规则if (random / 2 < 0.5) {random = -random;}//设置保存两位有效数字DecimalFormat decimalFormat = new DecimalFormat(".00");//设置最新价在11元浮动quotationInfo.setCurrentPrice(Float.valueOf(decimalFormat.format(11 + random)));//设置昨日收盘价为固定值quotationInfo.setPreClosePrice(11.80f);//设置开盘价quotationInfo.setOpenPrice(11.50f);//设置最低价,并不考虑10%限制,以及当前价是否已是最低价quotationInfo.setLowPrice(10.50f);//设置最高价,并不考虑10%限制,以及当前价是否已是最高价quotationInfo.setHighPrice(12.50f);quotationInfo.setStockCode(stockCode.toString());quotationInfo.setTradeTime(System.currentTimeMillis());quotationInfo.setStockName("股票-"+stockCode);return quotationInfo;}}

然后在main()方法中调用KafkaProducer对象发送消息,每推送10条股票行情信息让线程休眠2s。

 public static void main(String[] args){ProducerRecord<String,String> record=null;StockQuotationInfo quotationInfo=null;try {int num=0;for(int i=0;i<MSG_SIZE;++i){quotationInfo=createQuotationInfo();record=new ProducerRecord<>(TOPIC,null,quotationInfo.getTradeTime(),quotationInfo.getStockCode(),quotationInfo.toString());//异步发送消息producer.send(record);if(num++ % 10 ==0){Thread.sleep(2000L);//休眠2s}}}catch (InterruptedException e){e.printStackTrace();}finally {producer.close();}}

b. 多线程生产者

为了提升Kafka 发送消息的吞吐量,在数据量比较大同时对消息顺序也没有严格要求的情
况下,可以来用多线程的方式。实现多线程生产者一般有两种方式:

  • 只实例化一个KafkaProducer对象运行多个线程共享生产者发送消息;
  • 实例化多个KafkaProducer 对象;

由于KafkaProducer是线程安全, 经验证多个线程共享一个实例比每个线程各自实例化一个KafkaProducer 对象在性能上要好很多。

以下采用实例化一个KafkaProducer 对象,然后启动多个线程共享该KafkaProducer 实例的方式来介绍Kafka 生产者多线程的实现方式。

以下是在单线程生产者实现方式上进行修改。首先定义一个线程类KafkaProducerThread,该线程类持有一个KafkaProducer的引用,在线程内部调用外部传入的KafkaProducer对象发送消息。除此之外,如果我们希望在消息发送完成后获取消息的一些信息,例如获取消息偏移量及消息被发送到哪个分区,那么我们可以在发送消息时,指定回调C allBack , 只需对QuotationProducer. Java类中发送消息这块代码稍微进行修改。

public class KafkaProducerThread implements Runnable {private static final Logger logger= LoggerFactory.getLogger(KafkaProducerThread.class);private static KafkaProducer<String,String> producer=null;private ProducerRecord<String,String> record=null;public KafkaProducerThread(KafkaProducer<String,String> producer,ProducerRecord<String,String> record){this.producer=producer;this.record=record;}@Overridepublic void run(){producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(null!=e){logger.error("Send message occurs exception.",e);}if(null!=recordMetadata){logger.info(String.format("offset:%s,partition:%s",recordMetadata.offset(),recordMetadata.partition()));}}});}
}

然后创建一个固定线程数量的线程池,这些线程共享同一个KafkaProducer实例:

//创建一个固定线程数量的线程池
ExecutorService executorService= Executors.newFixedThreadPool(THREADS_NUMS);
//线程共享同一个KafkaProducer
executorService.submit(new KafkaProducerThread(producer,record));

所以main函数可以如下写:

 public static void main(String[] args){ProducerRecord<String,String> record=null;StockQuotationInfo quotationInfo=null;ExecutorService executorService= Executors.newFixedThreadPool(THREADS_NUMS);long current=System.currentTimeMillis();try {for(int i=0;i<MSG_SIZE;++i){quotationInfo=QuotationProducer.createQuotationInfo();record=new ProducerRecord<String,String>(TOPIC,null,quotationInfo.getTradeTime(),quotationInfo.getStockCode(),quotationInfo.toString());executorService.submit(new KafkaProducerThread(producer,record));}}catch (Exception e){logger.error("Send message occurs exception ",e);}finally {producer.close();executorService.shutdown();}}

(3) 消费者API应用

旧版消费者:SimpleConsumer和ZookeeperConsumerConnector。其中低级消费API(Low-Lever)SimpleConsumer和高级(High-Lever)API–ZookeeperConsumerConnector。

比较:

低级API 提供对消息更灵活的控制处理,但实现起来也更为复杂,调用者需要自己管理己消费的偏移量以及消费者平衡等。

高级API 提供了一种简单、方便的对外接口,屏蔽了底层实现细节,消费者无需管理己消费
的偏移量, Kafka 会将每个分区己消费的最后偏移量保存在ZooKeeper 的/consumers/$ {group.id} /offsets/$ { topicName} /$ {partitionld}节点中。

新版消费者:通过java语言对消费者进行重新实现,即KafkaConsumer。与旧版消费者的最大区别是不再强依赖于ZooKeeper。消费者提交的消费偏移量也不再保存到ZooKeeper当中,而是保存在Kafka内部主题"_consumer_offsets"之中,该主题默认有50个分区,每个分区有3个副本,分区数有配置项offsets.topic.num.partition 设置,通过{group.id}的hashcode值与{group.i d }的hashcode 值与{group.id}的hashcode值与 {offsets. topic.num.partition}取模的方式来确定某个消费组己消费的偏移量保存到该主题的哪个分区中。

由于工程使用1.0.0版本的Kafka,所以这里使用新版消费者kafkaConsumer,就不再阐述旧版消费者的高级API。

a. 旧版消费者低级API应用

虽然低级消费者API应用起来较为复杂,但是允许客户端对消息进行灵活的控制。以下几种常见应用场景通过低级API来实现规则更为方便。

  • 支持消息重复消费。
  • 添加事务管理机制,保证消息被处理且被处理一次。
  • 只消费指定分区或者指定分区的某些片段。

应用低级API编程实现一般步骤包括以下几步:

  • 获取指定主题相应分区对应的元数据信息。

  • 由于副本机制的引入, Leader 代理节点负责读写操作,因此需要找出指定分区的Leader副本节点,创建一个SimpleConsumer ,建立与Leader 副本的连接。

  • 构造消费者请求。

  • 获取数据并处理。

  • 对偏移量进行处理。

  • 当代理发送变化时进行相应处理,保证消息被正常消费。

    下面的例子是通过Java语言编写,只是简单的实现拉取到的消息打印出来。首先创建一个KafkaSimpleConsumer类。

public class KafkaSimpleConsumer {private static final Logger logger = LoggerFactory.getLogger(KafkaSimpleConsumer.class);/*** 指定Kafka 集群代理列表, 列表无需指定所有的代理地址,* 只要保证能连上Kafka 集群即可, 一般建议多个节点时至少写两个节点的地址*/private static final String BROKER_LIST = "172.117.12.61,172.117.12.62";//连接超时时间设置为1分钟private static final int TIME_OUT = 60 * 1000;//设置读取消息缓冲区大小private static final int BUFFER_SIZE = 1024 * 1024;//设置每次获取消息的条数private static final int FETCH_SIZE=100000;//设置broker的端口private static final int  PORT=9092;//设置容忍发生错误时重试的最大次数private static final int MAX_ERROR_NUM=3;}
下面是定义一个获取指定主题相应分区元数据信息的方法PartitionMetadata fetchPartitionMetadata(List< String> brokerList, int port, String topic, int partitionId), 该方法返回分区元数据信息PartitionMetadata 对象。获取分区元数据信息逻辑如下:
  • 实例化一个SimpleConsumer,该消费者作为获取数据信息的执行者。
  • 构造获取主题元数据信息的请求TopicMetadataRequest。
  • 通过Consumer.send()正式与代理建立连接,连接上代理后发送TopicMetadataRequest请求。
  • 从步骤3返回响应结果中获取主题元数据信息TopicMetadata 列表, 每个主题的每
    个分区的元数据信息对应一个TopicMetadata 对象,遍历主题元数据信息列表,获取当前分区对应的元数据信息PartitionMeatadata 。
private PartitionMetadata fetchPartitionMetadata(List<String> brokerList,int port, String topic,int partitionId) {SimpleConsumer consumer = null;TopicMetadataRequest metadataRequest = null;TopicMetadataResponse metadataResponse = null;List<TopicMetadata> topicMetadata = null;try {for (String host : brokerList) {//1.构造一个消费者用于获取元数据信息的执行者consumer=new SimpleConsumer(host,port,TIME_OUT,BUFFER_SIZE,"fetch-etadata");//2.改造请求主题的元数据的requestmetadataRequest=new TopicMetadataRequest(Arrays.asList(topic));//3.发送获取主题元数据的请求try {metadataResponse=consumer.send(metadataRequest);}catch (Exception e){//有可能与代理连接失败logger.error("Send topicMetadataRequest occurs exception.",e);continue;}//4.获取主题元数据列表topicMetadata=metadataResponse.topicsMetadata();//5.主题元数据列表中提取指定分区的元数据信息for(TopicMetadata metadata:topicMetadata){for(PartitionMetadata item:metadata.partitionsMetadata()){if(item.partitionId()!=partitionId){continue;}else {return item;}}}}} catch (Exception e) {logger.error("Fetch PartitionMetadata occurs exception", e);} finally {if (null != consumer) {consumer.close();}}return null;}

以上程序为了防止只进行一次连接请求而得不到元数据信息,所以程序中在实现时通过轮询多个代理节点,若与某个节点创阿金连接时发生异常,则继续尝试与下一个代理节点创建连接,直到请求成功或者轮询完成所配置的代理节点。

除此之外,还需要实现对消费偏移量的管理,在每次拉取消息时需要指定起始偏移量。更多时候我们可能关注消息的起始偏移量或者消息的最大偏移量。为此定义一个获取消息偏移量的方法long getLastOffset(SimpleConsumer consume巳String topic, int partition, Jong beginTime, String clientName),该方法逻辑较简单,只需要构造一个OffsetRequest 请求,在构造OffsetRequest 请求参数PartitionOffsetRequestlnfo 对象时,通过将时间设置为kafka.api.OffsetRequest.EarliestTime(),则表示获取消息的起始偏移量; 若时间设置为kafka.api. OffsetRequest.LatestTime(),则表示获取消息最大偏移量。然后通过当前的消费者发送获取偏移量的请求,从响应中得到相应的偏移量。
 /**** @param consumer 消费者对象* @param topic 主题名字* @param partition 分区个数* @param beginTime 起始时间* @param clientName* @return*/private long getLastOffset(SimpleConsumer consumer,String topic,int partition,long beginTime,String clientName){TopicAndPartition topicAndPartition=new TopicAndPartition(topic,partition);Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfoMap=new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();//设置获取消息起始offsetrequestInfoMap.put(topicAndPartition,new PartitionOffsetRequestInfo(beginTime,1));//构造获取offset请求OffsetRequest request=new OffsetRequest(requestInfoMap,kafka.api.OffsetRequest.CurrentVersion(),clientName);OffsetResponse response=consumer.getOffsetsBefore(request);if(response.hasError()){logger.error("Fetch last offset occurs exception:"+response.errorCode(topic,partition));return -1;}long[] offsets=response.offsets(topic,partition);if(null==offsets||offsets.length==0){logger.error("Fetch last offset occurs error,offsets is null");return -1;}return offsets[0];}
定义一个consume(List< String> brokerList, int port, String topic, int partitionld) 方法, 该方法调用fetchPartitionMetadata()方法及getLastOffset()方法实现消费者拉取消息的功能。
  /*** @param brokerList  访问代理的地址列表* @param port        访问端口* @param topic       主题名字* @param partitionId 分区编号*/public void consume(List<String> brokerList, int port, String topic, int partitionId) {SimpleConsumer consumer = null;try {//1.首先获取指定分区的元数据信息PartitionMetadata metadata = fetchPartitionMetadata(brokerList, port, topic, partitionId);if (metadata == null) {logger.error("Can't find metadata info");return;}if (metadata.leader() == null) {logger.error("Can't find the partition:" + partitionId + "'s leader");return;}String leadBroker = metadata.leader().host();String clientId = "client-" + topic + "-" + partitionId;//2.创建一个消息作为消费消息的真正执行者consumer = new SimpleConsumer(leadBroker, port, TIME_OUT, BUFFER_SIZE, clientId);//设置时间为kafka.api.OffsetRequest.EarliestTime()从最新消息起始处开始long lastOffset = getLastOffset(consumer, topic, partitionId, kafka.api.OffsetRequest.EarliestTime(), clientId);int errorNum = 0;FetchRequest fetchRequest = null;FetchResponse fetchResponse = null;while (lastOffset > -1) {//当在循环过程中出错时将起始实例化的consumner关闭并设置为nullif (consumer == null) {consumer = new SimpleConsumer(leadBroker, port, TIME_OUT, BUFFER_SIZE, clientId);}//3.构造获取消息的requestfetchRequest = new FetchRequestBuilder().clientId(clientId).addFetch(topic, partitionId, lastOffset, FETCH_SIZE).build();//4.获取相应并处理fetchResponse = consumer.fetch(fetchRequest);if (fetchResponse.hasError()) {errorNum++;if (errorNum > MAX_ERROR_NUM) {//达到发生粗偶的最大次数时退出循环break;}//获取错误码short errorCode = fetchResponse.errorCode(topic, partitionId);//offset已失效,因为在获取lastOffset时设置为从最早开始时间,若是这种错误码//我们再将时间设置为从LatestTime()开始查找if (ErrorMapping.OffsetOutOfRangeCode() == errorCode) {lastOffset = getLastOffset(consumer, topic, partitionId, kafka.api.OffsetRequest.LatestTime(), clientId);continue;} else if (ErrorMapping.OffsetsLoadInProgressCode() == errorCode) {Thread.sleep(30000);//若是这种异常则让线程休眠30scontinue;} else {//这里只是简单地关闭当前分区Leader信息实例化的Consumer//并没有对代理失效时进行相应处理consumer.close();consumer = null;continue;}} else {errorNum = 0;//错误次数清零long fetchNum = 0;for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partitionId)) {long  currentOffset=messageAndOffset.offset();if(currentOffset<lastOffset){logger.error("Fetch an old offset:"+currentOffset+"expect the offset is greater than"+lastOffset);continue;}lastOffset=messageAndOffset.nextOffset();ByteBuffer payload=messageAndOffset.message().payload();byte[] bytes=new byte[payload.limit()];payload.get(bytes);//简单打印出消息及消息offsetlogger.info("message:"+(new String(bytes,"UTF-8"))+",offset:"+messageAndOffset.offset());fetchNum++;}if(fetchNum==0){try {Thread.sleep(1000);}catch (Exception e){e.printStackTrace();}}}}} catch (Exception e) {logger.error("Consume message occurs exception",e);}finally {if(consumer!=null){consumer.close();}}}

最后,在main方法中调用该消费者。例如,下面指定该消费者从主题名为"stock-quotation-partition",分区编号为5的分区中拉取消息。

public static void main(String[] args){KafkaSimpleConsumer consumer=new KafkaSimpleConsumer();consumer.consume(Arrays.asList(StringUtils.split(BROKER_LIST,",")),PORT,"stock-quotation-partition",5);}
}

b. 新版消费者API应用

1. 创建消费者

实例化一个KafkaConsumer对象与实例化KafkaProducer对象的步骤相同,KafkaConsumer构造方法接受一个Java.util.Properties类型的参数,用于客户端指定消费者相关的配置属性。

通常实例化一个KafkaConsumer 对象客户端需要:

  • 指定连接Kafka 节点bootstrap.servers 属性
  • 消息Key 反序列化类的key.deserializer 属性
  • 消息Value 反序化类的value.deserializer 属性
  • 是否自动提交消费偏移量的enable. auto . commit 属性,
  • 同时由于每个消费者都属于一个特定的消费组,一般通过group.id 参数指定该消费者所属的消费组。
  • 在没有指定消费偏移量的提交方式时,默认是每隔1秒自动提交偏移量。可以通过auto.commit.interval.ms参数设置偏移量提交的时间间隔。
    val groupPrefix=properties.get("kafka.groupPrefix")val appName=properties.get("app.name")val groupId=groupPrefix+appNameval inputBrokers=properties.get("kafka.input.brokers")val props:Properties=new Properties()props.put("bootstrap.servers",inputBrokers)props.put("group.id",groupId)props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")if(inputBrokers.endsWith("9093")){props.put("security.protocol","SASL_PLAINTEXT")props.put("sasl.mechanism","PLAIN")
val username=properties.get("kafka.username")
val password=properties.get("kafka.password")
val jaasTemplate:String="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{0}\" password=\"{1}\";"     props.put("sasl.jaas.config",MessageFormat.format(jaasTemplate,username,password))}
KafkaConsumer<String,String> consumer=new KafkaConsumer<>(props)
2. 订阅主题

在实例化一个消费者之后,那么需要为该消费者订阅主题。一个消费者可以同时订阅多个
主题,通常我们可以以集合的形式指定多个主题,或者以正则表达式形式订阅特定模式的主题。

Kafka定义了3种订阅主题方法:

  • subscribe(Collection topics)方法,以集合形式指定消费者订阅的主题,通常我们用ArrayList 。
  • subscribe(Collection topics, ConsumerRebalanceListener listener)方法,订阅主题时指定一个监昕器,用于在消费者发生平衡操作时回调进行相应的业务处理。
  • subscribe(Pattem pattern, ConsumerRebalanceListener listener)方法,以正则表达式形式指定匹配特定模式的主题。
 consumer.subscribe(util.Arrays.asList("stock-quation"), new ConsumerRebalanceListener {override def onPartitionsRevoked(collection: util.Collection[TopicPartition]): Unit = {consumer.commitAsync() //提交偏移量}override def onPartitionsAssigned(collection: util.Collection[TopicPartition]): Unit = {var committedOffset: Long = -1for(topicPartition<-collection){//获取该分区已取消的偏移量committedOffset=consumer.committed(topicPartition).offset()//重置偏移量到上一次提交的偏移量下一个位置处开始消费consumer.seek(topicPartition,committedOffset+1)}}})

该监听器为C onsumerRebalanceListener 接口,当消费者发生平衡操作时,可以在该接口的相应方法中完成必要的应用程序逻辑处理,如提交消费偏移量操作。该接口定义了两个回调方法: 一个是在消费者平衡操作开始之前、消费者停止拉取消息之后被调用的onPartitionsRevoked(Collection partitions)方法,在该方法中我们可以提交偏移量操作以避免数据重复消费: 另一个是在平衡之后、消费者开始拉取消息之前被调用的onPartitionsAssigned (Collection partitions)方法, 一般我们在该方法中保证各消费者回滚到正确的偏移量,即重置各消费者消费偏移量。

在订阅主题之后,就可以通过Kafka提供的poll(long timeout)方法轮询拉取消息

3. 订阅指定分区

Kafka 消费者可以通过调用KafkaConsumer.subscribe()方法订阅主题,也可以直接订阅某些主题的特定分区。Kafka 消息者API 提供了一个assign(Collection partitions)方法用来订阅指定的分区。

consumer.assign(Arrays.asList(new TopicPartition("stock-quotation",0),new TopicPartition("stock-quotation",2)));
4. 消费偏移量管理

Kafka 消费者API 提供了两个方法用于查询消费偏移量的操作, 一个是commit时(TopicPartition partition)方法,该方法返回一个OffsetAndMetadata 对象, 通过OffsetAndMetadata 对象可以获取指定分区己提交的偏移量:另一个是返回下一次拉取位置的position(TopicPartition partition)
方法。

同时,Kafka 消费者API 还提供了重置消费偏移量的方法。seek(TopicPartition partition, long offset )方法用于将消费起始位置重置到指定的偏移量位置, 还有另外两个重置消费偏移量的方法,即seekToBeginning()方法和seekToEnd() 方法。

  • seekToBeginning()是从消息起始位置开始消费,对应偏移量重置策略auto.offset.reset=earliest。
  • seekToEnd()方法指定从最新消息对应的位置开始消费,也就是说要等待新的消息写入后才进行拉取,对应偏移量策略auto.offset.reset=latest。

Kafka 消费者消费位移确认有自动提交与手动提交两种策略。

a. 自动提交
 private void setConumserOffsetAuto(){Properties props=new Properties();String inputBrokers="99.12.86.173:9093,99.12.86.174:9093,99.12.86.175:9093,99.12.86.176:9093,99.12.86.177:9093";String groupId="GLA04_1";props.put("bootstrap.servers",inputBrokers);props.put("group.id",groupId);props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("enable.auto.commit",true);//显示设置偏移量自动提交props.put("auto.commit.interval.ms",1000);//设置偏移量提交时间间隔KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props);//创建消费者consumer.subscribe(Arrays.asList("stock-quotation"));//订阅主题try {while(true){//长轮询拉取消息ConsumerRecords<String,String> records=consumer.poll(1000);for(ConsumerRecord<String,String> record:records){System.out.printf("partition=%d,offset=%d,key=%s value=%s%n",record.partition(),record.offset(),record.key(),record.value());}}}catch (Exception e){e.printStackTrace();}finally {consumer.close();}}
自动提交偏移量,客户端只关注业务处理,在程序中没有任何关于提交偏移量的操作,更不像SimpleConsumer 在每次poll 之前都需要知道拉取消息的位置。
b. 手动提交
手动提交策略提供了一种对偏移量更加灵活控制的管理方式,在有些场景我们可能对消费偏移量有更精确的管理,以保证消息不被重复消费以及消息不被丢失。假设我们对拉取到的消息需要写入数据库处理,或者其他网络访问请求,抑或更复杂的业务处理,在这种场景下我们认为所有的业务处理完成才认为消息被成功消费,显然在这种场景下我们必须手动控制偏移量的提交。
  • 异步提交(commitAsync):消费者线程不会被阻塞,可能在提交偏移量操作的结果还未返
    回时就开始进行下一次的拉取操作,在提交失败时也不会尝试提交。
  • 同步提交(commitSync):于同步模式下提交失败时一直尝试提交,直到遇到无法重试的情况下才会结束,同时同步方式下消费者线程在拉取消息时会被阻塞,直到偏移量提交操作成功或者在提交过程中发生错误。
 private void setCommunserOffsetNoAuto(){Properties props=new Properties();String inputBrokers="99.12.86.173:9093,99.12.86.174:9093,99.12.86.175:9093,99.12.86.176:9093,99.12.86.177:9093";String groupId="GLA04_1";props.put("bootstrap.servers",inputBrokers);props.put("group.id",groupId);props.put("fetch.max.bytes",1024);//设置一次fetch请求取得的数据最大值为1KB,默认是5MBprops.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("enable.auto.commit",false);//设置手动提交偏移量props.put("auto.commit.interval.ms",1000);//设置偏移量提交时间间隔KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props);//创建消费者consumer.subscribe(Arrays.asList("stock-quotation"));//订阅主题try {int minCommintSize=10;//最少处理10条消息后才进行提交int icount=0;while(true){//等待拉取消息ConsumerRecords<String,String> records=consumer.poll(1000);for(ConsumerRecord<String,String> record:records){for(ConsumerRecord<String,String> record:records){System.out.printf("partition=%d,offset=%d,key=%s value=%s%n",record.partition(),record.offset(),record.key(),record.value());icount++;}}//在业务逻辑处理成功后提交偏移量if(icount>=minCommintSize){consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {if(null==e){//表示偏移量成功提交System.out.printf("Commint Success");}else {//表示提交偏移量发生了异常,根据业务进行相关处理System.out.println("发生了异常");}}});icount=0;}}}catch (Exception e){e.printStackTrace();}finally{consumer.close();}}
实现手动提交前需要在创建消费者时关闭自动提交,即设置enable.auto.commit=false 。然后在业务处理成功后调用commitAsync()或commitSync()方法手动提交偏移量。由于同步提交会阻塞线程直到提交消费偏移量执行结果返回,而异步提交并不会等消费偏移量提交成功后再继续下一次拉取消息的操作,因此异步提交还提供了一个偏移量提交回调的方法commitAsync(OffsetCommitCallback callback)。当提交偏移量完成后会回调OffsetCommitCallack 接口的onComplete()方法,这样客户端根据回调结果执行不同的逻辑处理。
c. 以时间戳查询消息
Kafka 消费者API 提供了一个offsetsFor imes(Map <TopicPartition , Long> timestampsToSearch) 方法, 该方法入参为一个Map 对象, Key 为待查询的分区, Value 为待查询的时间戳, 该方法会返回时间戳大于等于待查询时间的第一条消息对应的偏移量和时间戳。需要注意的是, 若待查询的分区不存在,则该方法会被一直阻塞。假设我们希望从某个时间段开始消费,就可以用offsetsForTimes()方法定位到离这个时间最近的第一条消息的偏移量,在查到偏移量之后调用seek(TopicPartition partition,long offset)方法将消费偏移量重置到所查询的偏移量位置,然后调用poll()方法长轮询拉取消息。
private void findTopicMessageByTime(){Properties props = new Properties();String inputBrokers = "99.12.86.173:9093,99.12.86.174:9093,99.12.86.175:9093,99.12.86.176:9093,99.12.86.177:9093";String groupId = "GLA04_1";props.put("bootstrap.servers", inputBrokers);props.put("group.id", groupId);props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("enable.auto.commit", true);//显示设置偏移量自动提交props.put("auto.commit.interval.ms", 1000);//设置偏移量提交时间间隔KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);//创建消费者consumer.assign(Arrays.asList(new TopicPartition("stock-quotation",0)));//订阅主题第0分区try {Map<TopicPartition,Long> timestampsToserch=new HashMap<TopicPartition, Long>();//构造待查询的分区TopicPartition partition=new TopicPartition("stock-quotation",0);//设置查询12小时之前消息的偏移量timestampsToserch.put(partition,(System.currentTimeMillis()-12*3600*1000));//会返回时间大于等于查找时间的第一个偏移量Map<TopicPartition,OffsetAndTimestamp> offsetMap=consumer.offsetsForTimes(timestampsToserch);OffsetAndTimestamp offsetAndTimestamp=null;//这里依然用for轮询,当然由于本例是查询的一个分区,因此也可以用if处理for(Map.Entry<TopicPartition,OffsetAndTimestamp> entry:offsetMap.entrySet()){//若查询时间大于时间戳索引文件中大于最大记录索引时间,//此时value为空,即待查询时间点之后没有新消息生成offsetAndTimestamp=entry.getValue();if(null!=offsetAndTimestamp){//重置消费起始偏移量consumer.seek(partition,entry.getValue().offset());}}while(true){//等待拉取消息ConsumerRecords<String,String> records=consumer.poll(1000);for (ConsumerRecord<String, String> record : records) {System.out.printf("partition=%d,offset=%d,key=%s value=%s%n", record.partition(), record.offset(), record.key(), record.value());}}}catch (Exception e){e.printStackTrace();}finally {consumer.close();}}
d. 消费速度控制

Kafka 提供pause(Collection< TopicPartition> pa此itions)和resume(Collectionpartitions )方法,分别用来暂停某些分区在拉取操作时返回数据给客户端和恢复某些分区向客户端返回数据操作。

f. 多线程实现

KafkaConsumer 为非线程安全的,多线程需要处理好线程同步。多线程的实现方式有多种,
我们在这里介绍一种常见的实现方式,即每个线程各自实例化一个KafkaConsumer 对象。,多个消费者线程消费一个主题,而
不是多个线程消费同一个分区,若多个线程消费同一个分区时需要考虑偏移量提交处理的问
题,相对而言实现较复杂,在实际应用中一般也不推荐。事实上,一般我们是将分区作为消费者线程的最小划分单位。

public class KafkaConsumerThread extends Thread {//每个线程拥有私有的KafkaConsumer实例private KafkaConsumer<String,String> consumer;public KafkaConsumerThread(Map<String,Object>consumerConfig,String topic){Properties props=new Properties();props.putAll(consumerConfig);this.consumer=new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList(topic));}@Overridepublic void run(){try {while(true){ConsumerRecords<String,String> records=consumer.poll(1000);for(ConsumerRecord<String,String> record:records){//简单打印出消息内容System.out.printf("threadId=%s,partition=%d,offset=%d,key=%s,value=%s\n",Thread.currentThread().getId(),record.partition(),record.offset(),record.key(),record.value());}}}catch (Exception e){e.printStackTrace();}finally {consumer.close();}}
}
可以根据自身的主题分区数来确定创建多个消费者线程,这里创建6个线程。
public class KafkaConsumerExecutor {public static void main(String[] args) {Map<String, Object> config = new HashMap<String, Object>();config.put("bootstarp.servers","localhost:9092");config.put("group.id","test");//6个线程术语同一个消费组config.put("enable.auto.commit",true);//显示设置偏移量自动提交config.put("auto.commit.interval.ms",1000);//设置偏移量提交时间间隔config.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");config.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");for(int i=0;i<6;++i){new KafkaConsumerThread(config,"stock-quotation").start();}}}
运行该main()方法,从控制台输出信息可以看到每个分区由固定的线程消费。

8. Kafka与Spark整合应用

Spark 是一个快速、通用的计算引擎,起源于美国加州大学伯克利分校RAD 实验室的一个研究项目,现在已是Apache 的一个顶级项目。Spark包括以下组件: