凌云时刻 · 技术

导读:这一章节主要对和Listener相关的四个配置项做以详细解释。

作者 | 计缘

来源 | 凌云时刻(微信号:linuxpk)

概述

listenersadvertised.listenerslistener.security.protocol.mapinter.broker.listener.name这四个配置项可能是大家最容易混淆和最不容易理解的。

在解释这些配置项之前,我们先来明确几个概念。

  • 部署Broker的阿里云ECS称为Host Machine。

  • 在阿里云ECS里启动的Producer或者Consumer,比如使用Kafka CLI启动的称为Internal Client。

  • 在大家的IDEA中使用Java编写的,或者第三方的Producer/Consumer,称为External Client。

  • Host Machine具有外网IP和内网IP。

  • Internal Client可以同时和Host Machine的外网IP及内网IP通信。

  • External Client只能和Host Machine的外网IP通信。

  • 多个阿里云ECS之间可以同时通过外网IP及内网IP通信。

    • 既在这个特定的场景下,Host Machine之间可以同时通过外网IP及内网IP通信。

    • 再换句话说就是不同Host Machine上的Broker之间可以同时通过外网IP及内网IP通信。

如上图所示,是一个很常见的Kafka集群场景,涵盖了上述的概念。图中那些通信虚线箭头就是靠Kafka的Listener建立的,并且是通过Kafka中不同的Listener建立的,这些Listener分为Internal Listener和External Listener。如下图所示:

那么这些Listener的创建以及内外部如何通信都是由上面那四个配置项决定的。

listener.security.protocol.map

先来看listener.security.protocol.map配置项,在上一章节中介绍过,它是配置监听者的安全协议的,比如PLAINTEXTSSLSASL_PLAINTEXTSASL_SSL。因为它是以Key/Value的形式配置的,所以往往我们也使用该参数给Listener命名:

listener.security.protocol.map=EXTERNAL_LISTENER_CLIENTS:SSL,INTERNAL_LISTENER_CLIENTS:PLAINTEXT,INTERNAL_LISTENER_BROKER:PLAINTEXT

使用Key作为Listener的名称。就如上图所示,Internal Producer、External Producer、Internal Consumer、External Consumer和Broker通信以及Broker之间互相通信时都很有可能使用不同的Listener。这些不同的Listener有监听内网IP的,有监听外网IP的,还有不同安全协议的,所以使用Key来表示更加直观。当然这只是一种非官方的用法,Key本质上还是代表了安全协议,如果只有一个安全协议,多个Listener的话,那么这些Listener所谓的名称肯定都是相同的。

listeners

listeners就是主要用来定义Kafka Broker的Listener的配置项。

listeners=EXTERNAL_LISTENER_CLIENTS://阿里云ECS外网IP:9092,INTERNAL_LISTENER_CLIENTS://阿里云ECS内网IP:9093,INTERNAL_LISTENER_BROKER://阿里云ECS内网IP:9094

上面的配置表示,这个Broker定义了三个Listener,一个External Listener,用于External Producer和External Consumer连接使用。也许因为业务场景的关系,Internal Producer和Broker之间使用不同的安全协议进行连接,所以定义了两个不同协议的Internal Listener,分别用于Internal Producer和Broker之间连接使用。

通过之前的章节,我们知道Kafka是由Zookeeper进行管理的,由Zookeeper负责Leader选举,Broker Rebalance等工作。所以External Producer和External Consumer其实是通过Zookeeper中提供的信息和Broker通信交互的。所以listeners中配置的信息都会发布到Zookeeper中,但是这样就会把Broker的所有Listener信息都暴露给了外部Clients,在安全上是存在隐患的,我们希望只把给外部Clients使用的Listener暴露出去,此时就需要用到下面这个配置项了。

advertised.listeners

advertised.listeners参数的作用就是将Broker的Listener信息发布到Zookeeper中,供Clients(Producer/Consumer)使用。如果配置了advertised.listeners,那么就不会将listeners配置的信息发布到Zookeeper中去了:

advertised.listeners=EXTERNAL_LISTENER_CLIENTS://阿里云ECS外网IP:9092

这里在Zookeeper中发布了供External Clients(Producer/Consumer)使用的ListenerEXTERNAL_LISTENER_CLIENTS。所以advertised.listeners配置项实现了只把给外部Clients使用的Listener暴露出去的需求。

inter.broker.listener.name

这个配置项从名称就可以看出它的作用了,就是指定一个listener.security.protocol.map配置项中配置的Key,或者说指定一个或一类Listener的名称,将它作为Internal Listener。这个Listener专门用于Kafka集群中Broker之间的通信

inter.broker.listener.name=INTERNAL_LISTENER_BROKER

listener 和 advertised.listeners 的关系

先来看看KafkaConfig.scalaSocketServer.scala源码中的这几行代码片段:

// KafkaConfig.scala
...val ListenersProp = "listeners"...def dataPlaneListeners: Seq[EndPoint] = {Option(getString(KafkaConfig.ControlPlaneListenerNameProp)) match {case Some(controlPlaneListenerName) => listeners.filterNot(_.listenerName.value() == controlPlaneListenerName)case None => listeners}}...def listeners: Seq[EndPoint] = {Option(getString(KafkaConfig.ListenersProp)).map { listenerProp =>CoreUtils.listenerListToEndPoints(listenerProp, listenerSecurityProtocolMap)}.getOrElse(CoreUtils.listenerListToEndPoints("PLAINTEXT://" + hostName + ":" + port, listenerSecurityProtocolMap))}// SocketServer.scaladef startup(startupProcessors: Boolean = true) {this.synchronized {connectionQuotas = new ConnectionQuotas(config.maxConnectionsPerIp, config.maxConnectionsPerIpOverrides)createControlPlaneAcceptorAndProcessor(config.controlPlaneListener)createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataPlaneListeners)if (startupProcessors) {startControlPlaneProcessor()startDataPlaneProcessors()}}...private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int,endpoints: Seq[EndPoint]): Unit = synchronized {endpoints.foreach { endpoint =>val dataPlaneAcceptor = createAcceptor(endpoint)addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener)KafkaThread.nonDaemon(s"data-plane-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}", dataPlaneAcceptor).start()dataPlaneAcceptor.awaitStartup()dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)info(s"Created data-plane acceptor and processors for endpoint : $endpoint")}}

startup()方法是Kafka Broker创建启动Socket连接的入口,既用来创建Acceptor线程的入口,该线程负责处理Socket连接。 createDataPlaneAcceptorsAndProcessors()方法的第二个参数config.dataPlaneListeners可以看到取的就是listeners配置项的内容。

/**
* Create a server socket to listen for connections on.
*/
private def openServerSocket(host: String, port: Int): ServerSocketChannel = {val socketAddress =if (host == null || host.trim.isEmpty)new InetSocketAddress(port)elsenew InetSocketAddress(host, port)val serverChannel = ServerSocketChannel.open()serverChannel.configureBlocking(false)if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)serverChannel.socket().setReceiveBufferSize(recvBufferSize)try {serverChannel.socket.bind(socketAddress)info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostString, serverChannel.socket.getLocalPort))} catch {case e: SocketException =>throw new KafkaException("Socket server failed to bind to %s:%d: %s.".format(socketAddress.getHostString, port, e.getMessage), e)}serverChannel
}

跟到里面,可以看到如果没有配置listeners,那么会使用网卡地址创建Socket连接,对于阿里云ECS,就是内网IP。

再来看看KafkaServer.scala源码中的这几行代码片段:

...val brokerInfo = createBrokerInfo
val brokerEpoch = zkClient.registerBroker(brokerInfo)...private[server] def createBrokerInfo: BrokerInfo = {val endPoints = config.advertisedListeners.map(e => s"${e.host}:${e.port}")zkClient.getAllBrokersInCluster.filter(_.id != config.brokerId).foreach { broker =>val commonEndPoints = broker.endPoints.map(e => s"${e.host}:${e.port}").intersect(endPoints)require(commonEndPoints.isEmpty, s"Configured end points ${commonEndPoints.mkString(",")} in" +s" advertised listeners are already registered by broker ${broker.id}")}val listeners = config.advertisedListeners.map { endpoint =>if (endpoint.port == 0)endpoint.copy(port = socketServer.boundPort(endpoint.listenerName))elseendpoint}val updatedEndpoints = listeners.map(endpoint =>if (endpoint.host == null || endpoint.host.trim.isEmpty)endpoint.copy(host = InetAddress.getLocalHost.getCanonicalHostName)elseendpoint)val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toIntBrokerInfo(Broker(config.brokerId, updatedEndpoints, config.rack), config.interBrokerProtocolVersion, jmxPort)}

从上面的代码可以看到,advertised.listeners主要用于向Zookeeper注册Broker的连接信息,但是不参与创建Socket连接。

所以从这几处源码内容可以得出结论,Kafka Broker真正建立通信连接使用的是listeners配置项里的内容,而advertised.listeners只用于向Zookeeper注册Broker的连接信息,既向Client暴露Broker对外的连接信息(Endpoint)。

另外在KafkaConfig.scala源码中还有有这么几行代码:

val advertisedListenerNames = advertisedListeners.map(_.listenerName).toSet
val listenerNames = listeners.map(_.listenerName).toSetrequire(advertisedListenerNames.contains(interBrokerListenerName),s"${KafkaConfig.InterBrokerListenerNameProp} must be a listener name defined in ${KafkaConfig.AdvertisedListenersProp}. " +s"The valid options based on currently configured listeners are ${advertisedListenerNames.map(_.value).mkString(",")}")
require(advertisedListenerNames.subsetOf(listenerNames),s"${KafkaConfig.AdvertisedListenersProp} listener names must be equal to or a subset of the ones defined in ${KafkaConfig.ListenersProp}. " +s"Found ${advertisedListenerNames.map(_.value).mkString(",")}. The valid options based on the current configuration " +s"are ${listenerNames.map(_.value).mkString(",")}"

从上面的代码片段可以得出两个结论:

  • advertised.listeners配置项中配置的Listener名称或者说安全协议必须在listeners中存在。因为真正创建连接的是listeners中的信息。

  • inter.broker.listener.name配置项中配置的Listener名称或者说安全协议必须在advertised.listeners中存在。因为Broker之间也是要通过advertised.listeners配置项获取Internal Listener信息的。

小结

这一章节主要大家详细解释了Broker几个比较容易混淆和不好理解的配置项,解释了什么是内外部Listener,如何暴露Listener等。这些配置在我们搭建Kafka集群时至关重要。希望能给小伙伴们带来帮助。

END

往期精彩文章回顾

Kafka从上手到实践 - Kafka集群:配置Broker

Kafka从上手到实践:搭建Zookeeper集群

Kafka从上手到实践-Zookeeper CLI:CRUD zNode

Kafka从上手到实践 - 初步认知:Zookeeper

Kafka从上手到实践:Kafka Java Consumer

Kafka从上手到实践:Kafka Java Producer

Kafka CLI:Reseting Offset & Config CLI

Kafka CLI:Consumer CLI & Producer CLI

Kafka CLI:Topic CLI & Producer CLI

Kafka从上手到实践 - 实践真知:搭建单机Kafka

Kafka从上手到实践 - 庖丁解牛:Consumer

Kafka从上手到实践 - 庖丁解牛:Producer

Kafka从上手到实践 - 庖丁解牛:Partition

Kafka从上手到实践 - 庖丁解牛:Topic & Broker

Kafka从上手到实践 - 初步认知:MQ系统

长按扫描二维码关注凌云时刻

每日收获前沿技术与科技洞见

Kafka从上手到实践 - Kafka集群:Kafka Listeners | 凌云时刻相关推荐

  1. Kafka 详解(二)------集群搭建

    这里通过 VMware ,我们安装了三台虚拟机,用来搭建 kafka集群,虚拟机网络地址如下: hostname                      ipaddress             ...

  2. kafka tool 2.1连接kerberos的kafka(cdh6.3.2)集群。

    kafka tool 2.1连接kerberos的kafka(cdh6.3.2)集群. kafka tools 下载连接:https://www.kafkatool.com/download.html ...

  3. Kafka基于Zookeeper搭建高可用集群实战

    Kafka基于Zookeeper搭建高可用集群实战 1 前言 1.1 高可用的由来 为何需要Replication? 在Kafka在0.8以前的版本中,是没有Replication的,一旦某一个Bro ...

  4. ELK集群+Kafka集群+FileBeat——命运多舛的安装采坑之路

    欢迎大家关注我的公众号,添加我为好友! 开始的时候感觉日志监控是比较NB的技术,感觉很神奇,那么多日志,为什么一下子就能够找到自己想要的?后来初步了解到了ELK(ElasticSearch + Log ...

  5. kubernetes 入门实践-搭建集群

    ㅤㅤㅤ ㅤㅤㅤ ㅤㅤㅤㅤㅤㅤㅤㅤㅤㅤㅤㅤㅤㅤㅤ(一个人的真正伟大之处就在于他能够认识到自己的渺小 -- 保罗) ㅤㅤㅤ ㅤㅤㅤ ㅤㅤㅤㅤㅤㅤㅤㅤㅤ 上一篇:kubernetes 入门实践-核心概念 下 ...

  6. Kafka:你必须要知道集群内部工作原理的一些事!

    前言 上篇文章讲到了消息在 Partition 上的存储形式,本来准备接着来聊聊生产中的一些使用方式,想了想还有些很重要的工作组件原理没有讲清楚,比如一个 Topic 由 N 个 Partition ...

  7. Zookeeper集群 + Kafka集群 + KafkaOffsetMonitor 监控

    一.Zookeeper ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件.它是一个为分布式应用提供一 ...

  8. php连接kafka集群,Kafka集群环境配置

    Kafka集群环境配置 1 环境准备 1.1 集群规划 Node02 Node03 Node04 zk zk zk kafka kafka kafka 1.2 jar包下载 安装包:kafka_2.1 ...

  9. 搭建 ELK 集群 kafka filebeat kibana logstash zookeeper elasticsearch

    文章目录 一.前置准备 1. 部署总览 2. 技术选型 3. 全局配置 4. 软件下载 5. 操作声明 二.jdk安装 2.1. 上传 2.2. 解压 2.3. 设置环境变量 三.zk集群 3.1. ...

  10. kafka基础之介绍和分布式集群搭建

    一 kafka介绍 现在各个电商平台,社交平台等诸多系统可以不断生产出各种细信息,那么我们应该如何收集它,如何分析它,以及输入实时的做到以上需求.Kafka应运而生. Kafka是一个分布式的的流式系 ...

最新文章

  1. [javascript] 看知乎学习js闭包
  2. python3 报错 ‘builtin_function_or_method‘ object has no attribute 解决方法
  3. python—多线程之线程之间共享数据(Queue)
  4. 抚摸斯蒂芬·金 (图)
  5. 【音视频安卓开发 (五)】Android中获取音视频原始数据的方法
  6. python输出格式控制_Python3.x那些事儿:[50]多种多样的输出格式
  7. 5月第四周.COM域名增7.3万居首 域名.XXX净减22个
  8. (Windows) CodeBlocks 下载
  9. java 随机生成人员姓名
  10. APP上架各大应用市场对比
  11. 【毕业设计_课程设计】基于python的微信公众平台机器人的设计与实现
  12. 为了陪妹子打王者,没有天赋的我写了一个AI机器人替我操作
  13. 【服务器数据恢复】华为OceanStor服务器热备盘同步数据失败的数据恢复案例
  14. Windows 10驱动签名_win 10驱动数字签名_驱动签名注意事项
  15. Spark深入解析(三):Spark基础解析之Spark环境搭建(不同模式)
  16. 大数据应用统一集成平台CDAP简介
  17. 为什么华为a1路由器网速变慢_凭什么网速就比别人快?华为路由 A1 畅享版体验...
  18. [转载]將 DVD-9 影片備份成 DVD-5 的教學資料介紹與整理
  19. php 仿 js encodeURI
  20. DX9绘图-------VB6编程学习DX9游戏编程DirectX9编程2D小游戏源码冷风引擎CoolWind2D游戏引擎(8)

热门文章

  1. [C++基础]031_如何正确获取用户的输入
  2. spring实战笔记6---springMVC的请求过程
  3. DEL: Restore Boxes after VirtualBox Upgrade
  4. 第一个Net+Mysql的例子,比想象的简单很多
  5. 动态定义控件时事件触发的总结
  6. ORB_SLAM2之Pangolin的安装与问题处理
  7. ubuntu 程序卡主解决方案
  8. unity Scene窗口的任意比例放大和缩小
  9. 每天1万步就叫健康吗?
  10. C++从屏幕输入数字以空格分割,存入整型数组