Kafka序列化器,分区器,拦截器,消息累加器

  • 目录
    • 概 述
  • 小结
  • 参考资料和推荐阅读

LD is tigger forever,CG are not brothers forever, throw the pot and shine forever.
Modesty is not false, solid is not naive, treacherous but not deceitful, stay with good people, and stay away from poor people.
talk is cheap, show others the code and KPI, Keep progress,make a better result.
Survive during the day and develop at night。

目录

概 述

整个Kafka生产者客户端由两条线程协调运行。这两条线程分别为主线程和sender线程(发送线程)。主线程的作用就是:由KafkaProducer创建消息,然后通过可能的拦截器,序列化器,分区器的作用之后缓存到消息累加器。send线程的作用就是:负责将消息累加器中的消息发送到kafka中。

拦截器
  拦截器是在Kafka0.10.0.0版本中就已经引入的一个功能,Kafka一共有两种拦截器。生产者拦截器和消费者拦截器。

生产者拦截器
生产者拦截器可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息,修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求。当然生产者拦截器的使用也很方便,主要是自定义实现org.apache.kafka.clients.producer.ProducerInterceptor接口。ProducerInterceptor接口中包含3个方法:
//producer会在消息序列化器,分区器之前调用拦截器的onSend()方法来对消息进行定制化操作。
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
//producer会在消息被应答之前或者发送失败时调用生产者的 onAcknowledgement()方法
//不过该方法运行在Producerde I/O线程中,所以方法中的实现代码逻辑越简单越好,否则会影响消息的发送速度
public void onAcknowledgement(RecordMetadata metadata, Exception exception);
//用于在关闭拦截器时执行一些资源的清理工作
public void close();

KafkaProducer 在将消息序列化和计算分区之前会调用生产者拦截器的onSend()方法来对消息进行相应的定制化操作。一般来说最好不要修改消息 ProducerRecord 的 topic、key 和 pritition 等信息,如果修改需要保证对其有准确判断,否则会出现与预想不一致的偏差。比如修改 key 不仅会影响分区的计算还会影响 broker 端日志压缩(Log Compaction)功能。

KafkaProducer 会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的 onAcknowledgement() 方法,优先于用户设定的 Callback 之前执行。这个方法运行在 Producer的I/O线程中,所以这个方法的实现逻辑约简单越好,否则会影响消息的发送。

close() 方法主要用于在关闭拦截器时执行一些资源的清理工作。在这3个方法中抛出的异常都会被捕获并记录到日志中,但并不会再向上传递。
Kafka中不仅可以指定一个拦截器还可以指定多个拦截器形成一个拦截器链。拦截器链会根据配置时配置的拦截器顺序来执行(配置的时候,各个拦截器之间使用逗号隔开)。如果拦截器链中的某个拦截器的执行需要依赖上一个拦截器的输出,那么就有可能产生“副作用”。如果第一个拦截器因为异常执行失败,那么第二个也就不能继续执行。在拦截链中,如果某个拦截器执行失败,那么下一个拦截器会接着从上一个执行成功的拦截器继续执行。

序列化器

生产者需要用序列化器(Serializer)将key和value序列化成字节数组才可以将消息传入Kafka。消费者需要用反序列化器(Deserializer)把从Kafka中收到的字节数组转化成相应的对象。除了字符串类型的序列化器,还有 ByteArray、ByteBuffer、Bytes、Double、Integer、Long 这几种类型,它们都实现了 org.apache.kafka.common.serialization.Serializer 接口,此接口有3个方法:

// 用来配置当前类
public void configure(Map<String, ?> configs, boolean isKey)

// 用来执行序列化操作
public byte[] serializer(String topic, T data)

//用来关闭当前的序列化器
public void close()

如果Kafka客户端提供的几种序列化器都无法满足应用需求,可以选择如 Avro、JSON等通用的序列化工具实现,或者使用自定义类型的序列化器来实现。如何使用自定义的序列化器呢?只需要将 KafkaProducer 的 value.serializer参数设置为类的权限定名字即可。

分区器
消息通过send()方法发送broker的过程中,有可能会经过拦截器,序列化器,之后,就会需要确定消息要发往的分区。如果ProducerRecord中指定了partition字段,那么就不需要分区器的作用。因为partition代表的就是索要发往的分区号。
  Kafka提供的默认分区器是org.apache.kafka.clients.producer.internals.DefaultPartitioner。它实现了org.apache.kafka.clients.producer.Partitioner接口,这个接口定义了2个方法:
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

public void close();
partition() 方法用来计算分区号,返回值为int类型。方法中的参数分别表示主题、键、序列化后的键、值、序列化后的值以及集群的元数据信息,通过这些信息可以实现分区器。close()方法在关闭分区器的时候回收一些资源。Pratitioner接口还有一个父接口:org.apache.kafka.common.Configurable,该接口中只有一个方法:

Configurable 接口中的 configure() 方法主要是用来获取配置信息及初始化数据。
  在默认分区器 DefaultPartitioner 的实现中,close()是空方法,而 partition() 方法中定义了主要的分区分配逻辑。如果 key 不为 null,那么默认分区器会对 key 进行哈希(采用 MurmurHash2 算法,具备高运算性能及低碰撞率)根据最终得到的哈希值,与分区的数量取模运算得到分区编号来匹配分区,相同key得到的哈希值是一样的,所以当key一致,分区数量不变的情况下,会将消息写入同一个分区(注意:在不改变主题分区数量的情况下,key 与分区之间的映射可以保持不变。不过,一旦主题增加了分区,那么就难以保证key与分区的映射关系)。如果,key 是 null,那么消息会以轮询的方式写入分区。(注意:如果 key 不为null,那么计算得到的分区号会是所有分区中的一个。如果 key 为 null 并且有可用的分区的时候,那么计算得到的分区号仅为可用分区中的任意一个。)
  除了使用 Kafka 提供的默认分区器进行分配,还可以使用自定义的分区器,只需要和 DefaultPartitioner 一样 实现 Partitioner 接口即可。实现自定义的 DemoPartitioner 类之后,需要通过配置参数partitioner.class来显式指定这个分区器。
  RecordAccumulator也叫消息累加器,主要用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源消耗来提升性能。 是在客户端开辟出的一块内存区域。
this.accumulator = new RecordAccumulator(logContext,
config.getInt(“batch.size”),
this.totalMemorySize,
this.compressionType,
config.getLong(“linger.ms”),
retryBackoffMs,
this.metrics,
this.time,
this.apiVersions,
this.transactionManager);

参数buffer.memory,默认为33554432B,及32MB. 指定RecordAccumulator缓存的大小。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足。这个时候producer的send方法要么被阻塞,要么抛出异常。这个取决于参数max.block.ms的配置,此参数默认值为60000,及60秒。
 RecordAccumulator的内部为每个分区都维护了一个双端队列。队列中的具体内容就是ProducerBatch(消息批次).即Deque< ProducerBatch >。通俗来讲,ProducerBatch为一个消息批次,可以将较小的ProducerRecord拼凑成一个较大的ProducerBatch.来减少网络请求次数提高吞吐量。当然ProducerRecord即为我们的消息。
  每次有消息要写入到累加器中的时候,会先去寻找对应的双端队列,从队列的尾部获取一个ProducerBatch, 事先会判断消息的大小,如果大小在被写入的Producerbatch批次范围内,则写入。如果没找到ProducerBatch或者消息大小大于被写入的批次范围无法写入的时候,就会新建一个ProducerBatch,根据参数batch.size来创建batch大小。如果消息大于batch.size的大小,就以评估的大小创建ProducerBatch.
  消息写入缓存时,追加到双端队列的尾部Sender线程读取消息时,从双端队列的头部读取。注意ProducerBatch中可以包含很多个ProducerRecord(消息)。
  在RecordAccumulator的内部还有一个BufferPool,主要来实现ButeBuffer的复用。不过BufferPool只针对特定大小的ByteBuffer进行管理。超过大小是不会进入BufferPool的。可以通过参数batch.size来指定。默认为16384B.
  当每条消息流入消息累加器,会先寻找每个分区中的ProducerBatch双端队列。如果找不到,会判断消息的大小,如果小于batch.size。则用batch.size的大小来创建ProducerBatch。
————————————————
版权声明:本文为CSDN博主「K. Bob」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/ThreeAspects/article/details/108130254

小结

kafka 相关序列化

参考资料和推荐阅读

1.链接: 参考资料.

Kafka序列化器,分区器,拦截器,消息累加器相关推荐

  1. kafka 自定义Interceptor(通过拦截器对消息进行定制化处理)

    文章目录 1. 说明 1.1 configure(configs) 1.2 onSend(ProducerRecord) 1.3 onAcknowledgement(RecordMetadata, E ...

  2. WebServices中使用cxf开发日志拦截器以及自定义拦截器

    首先下载一个cxf实例,里面包含cxf的jar包.我下的是apache-cxf-2.5.9 1.为什么要设置拦截器? 为了在webservice请求过程中,能动态操作请求和响应数据, CXF设计了拦截 ...

  3. 使用struts2中默认的拦截器以及自定义拦截器

    转自:http://blog.sina.com.cn/s/blog_82f01d350101echs.html 如何使用struts2拦截器,或者自定义拦截器.特别注意,在使用拦截器的时候,在Acti ...

  4. struts2 拦截器_Struts2令牌拦截器示例

    struts2 拦截器 Struts 2 token interceptor can be used to handle multiple form submission problem. While ...

  5. flume拦截器及自定义拦截器

    拦截器做什么呢? 时间拦截器 以时间拦截器为例.会在Event的header中添加一个属性进去,属性的key叫做timestamp, value是当前的毫秒值. 问题是写到header然后呢?有啥用呢 ...

  6. springboot中配置了拦截器后,拦截器无效的解决方案之一

    springboot中配置了拦截器后,拦截器无效的解决方案之一 参考文章: (1)springboot中配置了拦截器后,拦截器无效的解决方案之一 (2)https://www.cnblogs.com/ ...

  7. 从struts2拦截器到自定义拦截器

    http://www.cnblogs.com/withyou/p/3170440.html 拦截器可谓struts2的核心了,最基本的bean的注入就是通过默认的拦截器实现的,一般在struts2.x ...

  8. Struts2内置拦截器和自定义拦截器

    内置拦截器 Struts2中内置类许多的拦截器,它们提供了许多Struts2的核心功能和可选的高级特性.这些内置的拦截器在struts-default.xml中配置.只有配置了拦截器,拦截器才可以正常 ...

  9. java spring 拦截器_Spring MVC拦截器(Interceptor)的配置及使用

    在开发一个网站时可能有这样的需求:某些页面只希望几个特定的用户浏览.对于这样的访问权限控制,应该如何实现呢?拦截器就可以实现上述需求.在 Struts 2 框架中,拦截器是其重要的组成部分,Sprin ...

  10. 拦截器,利用拦截器进行登陆权限控制

    拦截器,登录权限控制demo 1. 拦截器demo 2. 登录权限控制 地址: https://github.com/sevenyoungairye/spring-mvc-interceptor 1. ...

最新文章

  1. dfasdfasdfas
  2. .net程序员的盲点(八):泛型
  3. 科大星云诗社动态20210311
  4. java 映射类_将数据库类型映射到具体的Java类
  5. 深入理解r2dbc-mysql
  6. Vue中computed分析
  7. 思岚科技受邀2018高交会 携多项“黑科技”亮相
  8. matlab 离散傅里叶变换_Matlab中的傅里叶变换
  9. 计算机软件国民经济行业代码,国民经济行业类别及代码.DOC
  10. win10 Security Center服务无法禁用,启动类型灰色不可改解决方法
  11. [论文笔记] EMNLP2019: A Lexicon-Based Graph Neural Network for Chinese NER
  12. 学校计算机硬件管理制度,学校规章制度之计算机硬件管理制度.doc
  13. 光流与Lucas-Kanade 光流法
  14. 多测师肖sir_高级金牌讲师_项目数据
  15. SQL SERVER 读取数据库中所有表名
  16. 基础为零?如何将 C++ 编译成 WebAssembly
  17. Oracle--使用同义词
  18. 重磅!GitHub突然宣布,对全球人免费开放全部核心功能
  19. 健身气功----八段锦
  20. ARM-translation table walk

热门文章

  1. 上网行为安全之终端识别和管理技术
  2. linux vi如何输入井号,Linux Vi命令用法详解
  3. python 安装Cython
  4. JavaScript数组扁平化
  5. matlab如何求有约束最优化最大值,6.4.2有约束最优化问题的求解-东北大学数学系.ppt...
  6. 前端页面如何获取高德地图
  7. mac 下安装Microsoft Remote Desktop远程桌面客户端
  8. C++ primer (5th) 随想与学习笔记 6 优先级晦涩难通 新标准更清晰
  9. 简单创意的思维导图怎么画
  10. vue3+tsx 踩坑