上一篇介绍了概述和网络层模型实现《Kafka 源码分析之网络层(一)》,本编主要介绍在Processor中使用的nio selector的又一封装,负责具体数据的接收和发送。

PS:丰富的一线技术、多元化的表现形式,尽在“HULK一线技术杂谈”,点关注哦!

对nio的封装:Selector类

  • 所在文件: clients/src/main/java/org/apache/kafka/commmon/network/Selector.java

  • 源码中的注释:

A nioSelector interface for doing non-blocking multi-connection network I/O. This class works with NetworkSend} and NetworkReceive to transmit size-delimited network requests and responses.

重要函数解析:

(1) register(String id, SocketChannel socketChannel): 注册这个socketChannel到一个nio selector, 将其读事件添加到selector的监听队列; 这个socketChannel通常是服务器接收到的客户端的连接:

  1. SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_READ);

同时创建KafkaChannel, 负责实际的数据接收和发送:

  1. KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);

  2. key.attach(channel);

  3. this.channels.put(id, channel);

上面的id即为我们在上篇介绍的非常重要的ConnectionId。

(2) connect: 使用nio的SocketChannel连接到给定的地址,并且注册到nio selector,同时也创建了KafkaChannel,负责实际的数据接收和发送;

  1. SocketChannel socketChannel = SocketChannel.open();

  2. socketChannel.configureBlocking(false);

  3. Socket socket = socketChannel.socket();

  4. socket.setKeepAlive(true);

  5. socketChannel.connect(address);

  6. SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);

  7. KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);

  8. key.attach(channel);

  9. this.channels.put(id, channel);

(3) poll: 核心函数:

Do whatever I/O can be done on each connection without blocking. This includes completing connections, completing disconnections, initiating new sends, or making progress on in-progress sends or receives.

处理作为客户端的主动连接事件:

  1. if (key.isConnectable()) {

  2.        channel.finishConnect();

  3.         this.connected.add(channel.id());

  4.         this.sensors.connectionCreated.record();

  5. }

处理连接建立或接收后的ssl握手或sasl签权操作:

  1. if (channel.isConnected() && !channel.ready())

  2.      channel.prepare();

处理触发的读事件:

  1. if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {

  2.          NetworkReceive networkReceive;

  3.          while ((networkReceive = channel.read()) != null)

  4.                     addToStagedReceives(channel, networkReceive);

  5. }

使用一个while循环力求每次读事件触发时都读尽可能多的数据; channel.read()里会作拆包处理(后面会讲到),返回非null表示当前返回的NetworkReceive里包含了完整的应用层协议数据;

处理触发的写事件:

  1. if (channel.ready() && key.isWritable()) {

  2.         Send send = channel.write();

  3.          if (send != null) {

  4.                 this.completedSends.add(send);

  5.                  this.sensors.recordBytesSent(channel.id(), send.size());

  6.          }

  7. }

需要发送数据通过调用Selector::send方法,设置封装了写数据的NetworkSend,再将这个NetworkSend通过KafkaChannel::setSend接口设置到KafkaChannel,同时将写事件添加到selector的监听队列中,等待写事件被触发后,通过KafkaChannel::write将数据发送出去;

addToCompletedReceives()

将当前接收到的完整的的request到添加到completedReceives中,上一篇中介绍的SocketServer会作completedReceives中取出这些request作处理;

封装对单个连接的读写操作:KafkaChannel类

  • 所在文件: clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java

  • 包括transportLayer和authenticator, 完成ssh握手,sasl签权,数据的接收和发送;

传输层:TransportLayer类

  • 所在文件 clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java

  • 两个子类: PlanintextTransportLayer和SslTransportLayer

  • PlanintextTransportLayer的实现主要是通过NetworkReceive和NetworkSend;

  • SslTransportLayer的实现主要是通过SocketChannel,ByteBuffers和SSLEngine实际了加密数据的接收和发送(看到ssl就头大啊,这部分先忽略~~~);

Kafka协议的包结构:

  • 前4个字节固定, 值是后面的实际数据的长度;

  • NetworkReceive: 接收时先接收4个字节, 获取到长度,然后再接收实际的数据;

  • NetworkSend: 发送时实际数据前先加上4个字节的数据长度再发送;

上图:

希望这两篇网络层相关的源码分析文章能够帮助大家更好的理解和使用kafka,后续我们会带来更多的源码分析文章,敬请期待~,大家有想了解的技术,也可以留言给我们哦。

扫描下方二维码了解更多内容

Kafka 源码分析之网络层(二)相关推荐

  1. Kafka 源码分析之网络层(一)

    Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据.小编会给大家带来几期 Kafka 相关的源码分析文章.这一系列文章是基于kafka 0.9.1版本,今天 ...

  2. idea 线程内存_Java线程池系列之-Java线程池底层源码分析系列(二)

    课程简介: 课程目标:通过本课程学习,深入理解Java线程池,提升自身技术能力与价值. 适用人群:具有Java多线程基础的人群,希望深入理解线程池底层原理的人群. 课程概述:多线程的异步执行方式,虽然 ...

  3. Kafka源码分析10:副本状态机ReplicaStateMachine详解 (图解+秒懂+史上最全)

    文章很长,建议收藏起来,慢慢读! Java 高并发 发烧友社群:疯狂创客圈 奉上以下珍贵的学习资源: 免费赠送 经典图书:<Java高并发核心编程(卷1)> 面试必备 + 大厂必备 +涨薪 ...

  4. kafka源码分析-consumer的分区策略

    kafka源码分析-consumer的分区策略 1.AbstractPartitionAssignor 2.RangeAssignor 3.RoundRobinAssignor 4.StickyAss ...

  5. kafka源码分析之一server启动分析

    0. 关键概念 关键概念 Concepts Function Topic 用于划分Message的逻辑概念,一个Topic可以分布在多个Broker上. Partition 是Kafka中横向扩展和一 ...

  6. apache kafka源码分析-Producer分析---转载

    原文地址:http://www.aboutyun.com/thread-9938-1-1.html 问题导读 1.Kafka提供了Producer类作为java producer的api,此类有几种发 ...

  7. PDF阅读器系列之--MuPDF源码分析过程(二)

    博客找回来了,在那更新 http://blog.csdn.net/sky_pjf 前 时间好快,又一周过了,发现自己太忙了,博客都没去管-- 序 *MuPDF开源框架现在一直都在维护,我一般都会隔一周 ...

  8. 【转】ABP源码分析四十二:ZERO的身份认证

    ABP Zero模块通过自定义实现Asp.Net Identity完成身份认证功能, 对Asp.Net Identity做了较大幅度的扩展.同时重写了ABP核心模块中的permission功能,以实现 ...

  9. 【转】ABP源码分析三十二:ABP.SignalR

    Realtime Realtime是ABP底层模块提供的功能,用于管理在线用户.它是使用SignalR实现给在线用户发送通知的功能的前提 IOnlineClient/OnlineClient: 封装在 ...

最新文章

  1. Eclipse的编码
  2. SAP S/4HANA Central Procurement – 采购的未来
  3. Linux系统端口聚合技术bonding
  4. 腾讯Android自动化测试实战3.3.2 ListView列表遍历
  5. 操作系统,,,也考完了【流坑】
  6. 读书:《带人的技术》
  7. 社保必须交满15年才能享受吗?
  8. Python异常处理 -跳过异常继续执行
  9. 强化学习组队学习task04—— DQN 算法及 Actor-Critic 算法
  10. active mq topic消费后删除_天天在用消息队列,却不知道为啥要用 MQ ,这就尴尬了...
  11. php 工资 2018,2018年PHP程序员的进阶之路
  12. 精彩PuTTY 中文教程(解决乱码、X窗口、自动登陆等问题)
  13. 小鬼授权系统源码全解密源码 附授权代码
  14. 终于有人把流量运营讲明白了
  15. HDU5391米勒拉宾
  16. Eureka自我保护机制、健康检查的作用
  17. Android登录界面用SharedPreferences实现记住密码功能
  18. A Benchmark and Simulator for UAV Tracking(论文翻译)
  19. 象棋联机java代码_java实现简单网络象棋游戏
  20. 【FPGA】多功能ALU

热门文章

  1. Sqoop2开启Kerberos安全模式
  2. vue-cli4.3 npm run dev启动报错
  3. Springboot环境下mybatis配置多数据源配置
  4. ios 扫码枪外设 键盘模式_多平台连接:雷柏XK100蓝牙键盘评测
  5. python求列表的平均值的用法_python如何求列表平均值?_后端开发
  6. matplotlib绘图_手把手教你使用Matplotlib绘图实战
  7. C语言sizeof和strlen的含义,用法和区别
  8. android 自定义 打包文件类型,Android Studio配置打包生成自定义文件名
  9. const定义常量_go语言基本语法——常量constant
  10. 养老不用愁,这种机器人可以让老年人自主地进行日常生活