主题的管理包括创建主题、 查看主题信息、修改主题和删除主题等操作。可以通过 Kafka 提供的 kafka-topics.sh 脚本来执行这些操作,这个脚本位于 $KAFKA_HOME/bin/目录下,其核 心代码仅 有一行,具体如下 :

exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"

可以看到其实质上是调用了 kafka.admin.TopicCommand 类来执行主题管理的操作 。

主题的管理并非只有使用 kafka-topics.sh 脚本这一种方式,我们还可以通过

KafkaAdminClient 的方式实现(这种方式实质上是通过发送 CreateTopicsRequest 、DeleteTopicsRequest 等请求来实现的,对于 XXXRequest 系列的细节在深入服务器协议涉及中会有详细的介绍),甚至我们还可以通过直接操纵日志文件和 ZooKeeper节点来实现。下面按照创建主题、 查看主题信息 、修改主题、 删除主题 的顺序来介绍其中的操作细节。

1、创建主题

如果 broker端配置参数 auto.create.topics .enable 设置为 true(默认值就是 true) , 那么当生产者向一个尚未创建的主题发送消息时,会自动创建一个分区数为 num.partitions (默认值为 1)、副本因子为 default.replication.factor (默认值为 1)的主题。除此之外,当一个消费者开始从未知主题中读取消息时,或者当任意一个客户端向未知主题发送元数据请求时,都会按照配置参数num.partitions 和 default.replication.factor的值来创建一个相应的主题。很多时候,这种自动创建主题的行为都是非预期的。除非有特殊应用需求,否则不建议将 auto.create.topics.enable 参数设置为 true,这个参数会增加主题的管理与维护的难度。

更加推荐也更加通用的方式是通过 kafka-topics.sh 脚本来创建主题。下面通过创建一个主题 topic-create来回顾一下这种创建主题的方式,示例如下:

bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic topic-create --partitions 4 --replication-factor 2

上面的示例中创建了一个分区数为 4、 副本因子为 2 的主题。示例中的环境是一个包含 3个 broker 节点的集群,每个节点的名称和 brokerId 的对照关系如下:

node1 brokerId=0
node2 brokerId=1
node3 brokerId=2

在执行完脚本之后, Kafka 会在 log.dir 或 log.dirs 参数所配置的目录下创建相应的主题分区,默认情况下这个目录为/tmp/kafka-logs/。我们来查看一下 node1节点中创建的主题分区,参考如下:

可以看到 node1节点中创建了 2 文件夹 topic-create-0 和 topic-create-1 , 对应主题 topic-create 的 2 个分区编号为 0 和 1的分区,命名方式可以概括为 <topic>-<partition>。严谨地说,其实<topic>-<partition>这类文件夹对应的不是分区,分区同主题一样是一个逻辑的概念而没有物理上的存在。并且这里我们也只是看到了 2 个分区,而我们创建的是 4 个分区,其余 2 个分区被分配到了node2和node3节点中, 参考如下:

三个broker节点一共创建了 8个文件夹,这个数字8实质上是分区数4与副本因子2的乘积 。每个副本 (或者更确切地说应该是日志,副本与日志一一对应)才真正对应了一个命名形式如<topic>-<partition>的 文件夹。

主题、分区、副本和 Log (日志)的关系如图 4-1 所示,主题和分区都是提供给上层用户 的抽象, 而在副本层面或更加确切地说是Log层面才有实际物理上的存在。同一个分区中的多个副本必须分布在不同的 broker 中,这样才能提供有效的数据冗余。对于示例中的分区数为 4、 副本因子为 2、 broker数为 3 的情况下, 按照 2、 3、 3 的分区副本个数分配给各个 broker是最优 的选择 。再比如在分区数为 3、副本因子为 3,并且 broker 数同样为 3 的情况下,分配 3、 3、 3 的分区副本个数给各个 broker是最优的选择,也就是每个 broker 中都拥有所有分区的一个副本。

我们不仅可以通过日志文件的根目录来查看集群中各个 broker 的分区副本的分配情况,还可以通过 ZooKeeper 客户端来获取。当创建一个主题时会在 ZooKeeper 的/brokers/topics/目录下创建一个同名的实节点,该节点中记录了该主题的分区副本分配方案。示例如下 :

示例数据中的 ”2”:[1,2]表示分区 2 分配了 2 个副本,分别在 brokerld 为 1和 2 的 broker节点中 。

kafka-topics.sh 脚本中的 zookeeper、 partations、 replication-factor 和 topic 这 4个参数分别代表 ZooKeeper连接地址、分区数、副本因 子和主题名称。另一个 create 参数表示的是创建主题的指令类型,在 kafka-topics.sh 脚本中 对应的还有 list、describe、 alter 和 delete 这 4 个同级别的指令类型 ,每个类型所需要的参数也不尽相同。

还可以通过 describe 指令类型来查看分区副本的分配细节,示例如下:

示例中的 Topic 和 Partation 分别表示主题名称和分区号 。 PartitionCount 表示主 题中分区的个数 , ReplicationFactor 表示副本因子 , 而 Configs 表示创建或修改主题时指定的参数配置 。Leader 表示分区的 leader 副本所对应的 brokerld, Isr 表示分区的 ISR 集合, Replicas表示分区的所有的副本分配情况,即 AR集合,其中的数字都表示的是brokerld。

使用 kafka-topics.sh 脚本创建主题的指令格式归纳如下 :

kafka-topics.sh --zookeeper <String:hosts> -create --topic [String:topic] --partitions <Integer: #of partations> -replication-factor <Integer:replication factor>

到目前为止,创建主题时的分区副本都是按照既定的内部逻辑来进行分配的。 kafka-topics.sh 脚本中还提供了一个replica-assignment 参数来手动指定分区副本的分配方案。 replica-assignment 参数的用法归纳如下 :

--replica-assignment <String:broker_id_for_part1_replica1:broker_id_for_part1_replica2,broker_id_for_part2_replica1:broker_id_for_part2_replica2 , ...>

这种方式根据分区号的数值大小按照从小到大的顺序进行排列, 分区与分区之间用逗号“,” 隔开,分区内多个副本用冒号“:”隔开。并且在使用 replica-assignment 参数创建主题时不需要原本必备 的 partitions 和 replication-factor 这两个参数。

我们可以通过 replica-assignment 参数来创建一个与主题 topic-create相同的分配方案 的主题 topic-create-same 和不同的分配方案的主题 topic-create-same, 示例如下:

注意同一个分区内的副本不能有重复,比如指定了 0:0,1:1这种,就会报出 AdminCommand­FailedException 异常,示例如下:

如果分区之间所指定的副本数不同,比如 0:1 , 0 ,1:1 这种就会报出 AdminOperationException 异常, 示例如下:

当然,类似 0:1 , , 0:1, 1:0。这种企图跳过一个分区的行为也是不被允许的, 示例如下:

在创建主题时我们还可以通过 config参数来设置所要创建主题的相关参数, 通过这个参数可以覆盖原本的默认配置。在创建主题时可以同时设置多个参数,具体的用法归纳如下 :

--config <String:name1=value1> --config <String:name2=value2>

下面的示例使用 了 config 参数来创建一个主题 topic-config:

示例中设置了 cleanup.policy 参数为 compact,以及 max.message.bytes 参数为 10000,这两个参数都是主题端的配置,我们再次通过 describe 指令来查看所创建的主题信息:

可以看到 Configs 一栏 中包含了创建时所设置的参数。我们还可以通过 ZooKeeper 客户端查看所设置的参数,对应的 ZooKeeper 节点为/config/topics/[topic] , 示例如下 :

创建主题时对于主题名称的命名方式也很有讲究 。首先是不能与已经存在的主题同名,如果创建了同名的主题就会报错。我们尝试创建一个己经存在的主题 topic-create, 示例如下 :

通过上面的示例可以看出,在发生命名冲突时会报出 TopicExistsException 的异常信息。在 kafka-topics.sh 脚本中还提供了一个if-not-exists 参数, 如果在创建主题时带上了这个参 数,那么在发生命名冲突时将不做任何处理(既不创建主题,也不报错)。如果没有发生命名冲突,那么和不带 if-not-exists 参数的行为一样正常创建主题。我们再次尝试创建一个已经存在的主题 topic-create,示例如下 :

通过上面的示例可以看出,在添加 if-not-exists 参数之后,并没有像第一次创建主题 时的那样出现“Created topic ”topic-create”.”的提示信息。通过 describe 指令查看主题中的分区数和副本因子数, 还是同第一次创建时的一样分别为 4 和 2, 也并没有被覆盖,如此便证 实了 if-not-exists 参数可以在发生命名冲突时不做任何处理。在实际应用中,如果不想在 创建主题的时候跳出 TopicExistsException 的异常信息,不妨试一下这个参数。

kafka-topics.sh 脚本在创建主题时还会检测是否包含“.”或“_”字符。为什么要检测这两个字符呢? 因为在 Kafka 的内部做埋点时会根据主题的名称来命名 metrics 的名称,并且会将点号“.”改成下画线 “_"。假设遇到一个名称为“topic.1_2'’的主题,还有一个名称为“topic_1.2” 的主题,那么最后的 metrics 的名称都会为“topic_1_2”,这样就发生了名称冲突。举例如下, 首先创建一个以“topic.1_2”为名称的主题,提示 WARNING 警告, 之后再创建“topic.1_2” 时发生 InvalidTopicException 异常。

注意要点:主题的命名同样不推荐(虽然可以这样做)使用双下画线“__”开头,因为以双下画线开头的主题一般看作 Kafka的内部主题,比如 __consumer_offsets和 __transaction_state。 主题的名称必须由大小写字母、数字、点号 “.”、连接线“-”、下画线“_”组成,不能为空, 不能只有点号 “.”, 也不能只有双点号“..” ,且长度不能超过 249。

Kafka 从 0.10.x 版本开始支持指定 broker 的机架信息(机架的名称)。如果指定了机架信息,则在分区副本分配时会尽可能地让分区副本分配到不同的机架上。指定机架信息是通过 broker端参数 broker.rack来配置的,比如配置当前 broker所在的机架为“RACK1”:

broker.rack=RACK1

如果一个集群中有部分 broker指定了机架信息,并且其余的 broker没有指定机架信息,那 么在执行 kafka-topics.sh 脚本创建主题时会报出的 AdminOperationException 的异常,示例如下 :

此时若要成功创建主题, 要么将集群中的所有 broker都加上机架信息或都去掉机架信息,要么使用 disable-rack-aware参数来忽略机架信息,示例如下:

如果集群中 的所有 broker都有机架信息,那么也可以使用 disable-rack-aware 参数来忽略机架信息对分区副本的分配影响,有关分区副本的分配细节会在 4.1.2 节中做详细介绍。

本节开头就提及了 kafka-topics.sh 脚本实质上是调用了 kafka.admin.TopicCommand 类,通过向 TopicCommand 类中传入一些关键参数来实现主题的管理。我 们也可以直接调用 TopicCommand 类中的 main()函数来直接管理主题,比如这里创建一个分区数为 1、副本因子为 1 的主题 topic-create-api, 如代码清单 4-1 所示。

2、分区副本的分配

上节中多处提及了分区副本的分配,读者对此或许有点迷惑,在生产者和消费者中也都有分区分配的概念。生产者的分区分配是指为每条消息指定其所要发往的分区,消费者中的分区分配是指为消费者指定其可以消费消息的分区,而这里的分区分配是指为集群制定创建主题时的分区副本分配方案,即在哪个 broker中创建哪些分区的副本。

在创建主题时,如果使用了 replica-assignment参数,那么就按照指定的方案来进行分区副本的创建;如果没有使用replica-assignment参数,那么就需要按照内部的逻辑来计算分配方案了。使用kafka-topics.sh脚本创建主题时的内部分配逻辑按照机架信息划分成两种策略:未指定机架信息和指定机架信息。如果集群中所有的 broker节点都没有配置broker.rack参数,或者使用disable-rack-aware参数来创建主题,那么采用的就是未指定机架信息的分配策略,否则采用的就是指定机架信息的分配策略。

首先看一下未指定机架信息的分配策略,具体的实现涉及代码的逻辑细节,未指定机架信息的分配策略比较容易理解,这里通过源码来逐一进行分析。所对应的具体实现为kafka.admin.AdminUtils.scala文件中的 assignReplicasToBrokersRackUnaware()方法,该方法的内容如下:

该方法参数列表中的 fixedStartIndex和 startPartitionld值是从上游的方法中调用传下来的,都是-1,分别表示第一个副本分配的位置和起始分区编号。 assignReplicasUnaware()方法的核心是遍历每个分区 partition,然后从 brokerArray( brokerId的列表)中选取
replicationFactor个 brokerId分配给这个 partition。

该方法首先创建一个可变的Map用来存放该方法将要返回的结果,即分区 partition和分配副本的映射关系。由于 fixedStartIndex为-1,所以 startIndex是一个随机数,用来计算一个起始分配的 brokerId,同时又因为 startPartitionId为-1,所以 currentPartitionld的值为0,可见默认情况下创建主题时总是从编号为0的分区依次轮询进行分配。

nextReplicaShift表示下一次副本分配相对于前一次分配的位移量,从字面上理解有点绕口。举个例子:假设集群中有3个 broker节点,对应于代码中的 brokerArray,创建的某个主题中有3个副本和6个分区,那么首先从 partitionEd( partition的编号)为0的分区开始进行分配,假设第一次计算(由 rand.nextInt(brokerArray.length)随机产生)得到的 nextReplicaShift值为1,第一次随机产生的 startIndex值为2,那么 partitionId为0的第一个副本的位置(这里指的是brokerArray的数组下标) firstReplicalndex=(currentPartitionId+ startIndex)%brokerArray.length=(0+2)%3=2,第二个副本的位置为 replicaIndex(firstReplicalndex, nextReplicaShift,j, brokerArray.length)= replicalndex(2, nextReplicaShift+1,0,3)=?,这里引入了一个新的方法 replicalndex(),不
过这个方法很简单,具体如下:

继续计算 replicalndex(2, nextReplicaShift+1,0,3)= replicalndex(2,2,0,3)=(2+(1+(2+0)%(3-1))%3=0。继续计算下一个副本的位置 replicalndex(2,2,1,3)=(2+(1+(2+1)%(3-1))%3=1。所以 partitionId为0的副本分配位置列表为[2,0,1],如果 brokerArray正好是从0开始编号的,也正好是顺序不间断的,即 brokerArray为[0,1,2],那么当前 partitionId为0的副本分配策略为[2,0,1]。如果 brokerId不是从0开始的,也不是顺序的(有可能之前集群的其中几个 broker下线了),最终的 brokerArray为[2,5,8],那么 partitionId为0的分区的副本分配策略为[8,2,5]。为了便于说明问题,可以简单假设 brokerArray就是[0,1,2]。

同样计算下一个分区,即 partitionId为1的副本分配策略。此时 nextReplicaShift的值还是2,没有满足自增的条件。这个分区的 firstReplicalndex=(1+2)%3=0。第二个副本的位置replicalndex(0,2,0,3)=(0+(1+(2+0)%(3-1))%3=1,第三个副本的位置 replicalndex(0,2,1,3)=2,最终 partitionId为2的分区分配策略为[0,1,2]。

依次类推,更多的分配细节可以参考下面的示例, topic-test2的分区分配策略和上面陈述的一致:

我们无法预先获知 startIndex和 nextReplicaShift的值,因为都是随机产生的。 startIndex和nextReplicaShift的值可以通过最终的分区分配方案来反推,比如上面的 topic-test,第一个分区(即 partitionId=0的分区)的第一个副本为2,那么可由2=(0+startIndex)%3推断出 startIndex为2。之所以 startIndex选择随机产生,是因为这样可以在多个主题的情况下尽可能地均匀分布分区副本,如果这里固定为一个特定值,那么每次的第一个副本都是在这个 broker上,进而导致少数几个 broker所分配到的分区副本过多而其余 broker分配到的分区副本过少,最终导致负载不均衡。尤其是某些主题的副本数和分区数都比较少,甚至都为1的情况下,所有的副本都落到了那个指定的 broker上。与此同时,在分配时位移量 nextReplicaShift也可以更好地使分区副本分配得更加均匀。

相比较而言,指定机架信息的分配策略比未指定机架信息的分配策略要稍微复杂一些,但主体思想并没相差很多,只是将机架信息作为附加的参考项。假设目前有3个机架rack1、rack2和rack3,Kafka集群中的9个 broker点都部署在这3个机架之上,机架与 broker节点的对照关系如下:

如果不考虑机架信息,那么对照 assignReplicasToBrokersRackUnaware()方法里的 brokerArray变量的值为[0,1,2,3,4,5,6,7,8]。指定基架信息的 assignReplicasTo BrokersRackAware方法里的 brokerArray的值在这里就会被转换为[0,3,6,1,4,7,2,5,8],显而易见,这是轮询各个机架而产生的结果,如此新的brokerArray(确切地说是 arrangedBrokerList)中包含了简单的机架分配信息。之后的步骤也和 assignReplicasToBrokersRackUnaware()方法类似,同样包含 startIndex、currentPartiionld、nextReplicaShift的概念,循环为每一个分区分配副本。分配副本时,除了处理第一个副本,其余的也调用 replicalndex()方法来获得一个 broker,但这里和
assignReplicasToBrokersRackUnaware()不同的是,这里不是简单地将这个 broker添加到当前分区的副本列表之中,还要经过一层筛选,满足以下任意一个条件的 broker不能被添加到当前分区的副本列表之中:

  • 如果此 broker所在的机架中已经存在一个 broker拥有该分区的副本,并且还有其他的机架中没有任何一个 broker拥有该分区的副本。
  • 如果此 broker中已经拥有该分区的副本,并且还有其他 broker中没有该分区的副本。

当创建一个主题时,无论通过 kafka-topics.sh脚本,还是通过其他方式(KafkaAdminClient)创建主题时,实质上是在 ZooKeeper中的/brokers/topics节点下创建与该主题对应的子节点并写入分区副本分配方案,并且在/config/topics/节点下创建与该主题对应的子节点并写入主题相关的配置信息、(这个步骤可以省略不执行)。而 Kafka创建主题的实质性动作是交由控制器异步去完成的,有关控制器的更多细节可以参考《深入服务器》的相关内容。

知道了 kafka-topics.sh脚本的实质之后,我们可以直接使用 ZooKeeper的客户端在/brokers/topics节点下创建相应的主题节点并写入预先设定好的分配方案,这样就可以创建一个新的主题了。这种创建主题的方式还可以绕过一些原本使用 kafka-topics.sh脚本创建主题时的一些限制,比如分区的序号可以不用从0开始连续累加了。首先我们通过 ZooKeeper客户端创建一个除了与主题 topic-create名称不同其余都相同的主题 topic-create-zk,示例如下:

通过查看主题 topic-create-zk的分配情况,可以看到与主题 topic-create的信息没有什么差别。

我们再创建一个另类的主题,分配情况和主题 topic-create一样,唯独分区号已经与主题topic-create-special大相径庭,示例如下:

可以看到分区号为10、21、33和40,而通过单纯地使用 kafka-topics.sh脚本是无法实现的。

3、查看主题

kafka-topics.sh脚本有5种指令类型: create、list、describe、alter和 delete。其中list和 describe指令可以用来方便地查看主题信息,在前面的内容中我们已经接触过了 describe指令的用法,本节会对其做更细致的讲述。

通过list指令可以查看当前所有可用的主题,示例如下:

前面的章节我们都是通过describe指令来查看单个主题信息的,如果不使用--topic指定主题,则会展示出所有主题的详细信息。--topic还支持指定多个主题,示例如下:

在使用 describe指令查看主题信息时还可以额外指定 topics-with-overrides、under-replicated-partitions和unavailable-partitions这三个参数来增加一些附加功能。

增加 topics-with-overrides参数可以找出所有包含覆盖配置的主题,它只会列出包含了与集群不一样配置的主题。注意使用 topics-with-overrides参数时只显示原本只使用 describe指令的第一行信息,参考示例如下:

under-replicated-partitions和 navailable-partitions参数都可以找出有问题的分区。通过 under-replicated-partitions参数可以找出所有包含失效副本的分区。包含失效副本的分区可能正在进行同步操作,也有可能同步发生异常,此时分区的ISR集合小于AR集合。对于通过该参数査询到的分区要重点监控,因为这很可能意味着集群中的某个 broker已经失效或同步效率降低等。有关失效副本的更多细节可以参阅《副本剖析》。

举个例子,参照主题 topic-create的环境,我们将集群中的node2节点下线,之后再通过这个参数来查看 topIc-create的信息,参考如下:

我们再将node2节点恢复,执行同样的命令,可以看到没有任何信息显示:

通过 unavailable-partitions参数可以查看主题中没有 leader副本的分区,这些分区已经处于离线状态,对于外界的生产者和消费者来说处于不可用的状态。

举个例子,参考主题 topic-create的环境,我们将集群中的node2和node3节点下线,之后再通过这个参数来查看 topIc-create的信息,参考如下:

我们再将node2和node3恢复,执行同样的命令,可以看到没有任何信息:

4、修改主题

当一个主题被创建之后,依然允许我们对其做一定的修改,比如修改分区个数、修改配置等,这个修改的功能就是由kafka-topics.sh脚本中的 alter指令提供的。

我们首先来看如何增加主题的分区数。以前面的主题 topic-config为例,当前分区数为1修改为3,示例如下:

注意上面提示的告警信息:当主题中的消息包含key时(即key不为null),根据key计算分区的行为就会受到影响。当 topic-config的分区数为1时,不管消息的key为何值,消息都会发往这一个分区;当分区数增加到3时,就会根据消息的key来计算分区号,原本发往分
区0的消息现在有可能会发往分区1或分区2。如此还会影响既定消息的顺序,所以在增加分区数时一定要三思而后行。对于基于key计算的主题而言,建议在一开始就设置好分区数量,避免以后对其进行调整。

目前Kafka只支持增加分区数而不支持减少分区数。比如我们再将主题 topic-config的分区数修改为1,就会报出 InvalidPartitionException的异常,示例如下

为什么不支持减少分区?

按照Kafka现有的代码逻辑,此功能完全可以实现,不过也会使代码的复杂度急剧増大。实现此功能需要考虑的因素很多,比如删除的分区中的消息该如何处理?如果随着分区一起消失则消息的可靠性得不到保障;如果需要保留则又需要考虑如何保留。直接存储到现有分区的尾部,消息的时间戳就不会递增,如此对于 Spark、Flink这类需要消息时间戳(事件时间)的组件将会受到影响;如果分散插入现有的分区,那么在消息量很大的时候,内部的数据复制会占用很大的资源,而且在复制期间,此主题的可用性又如何得到保障?与此同时,顺序性问题、事务性问题,以及分区和副本的状态机切换问题都是不得不面对的。反观这个功能的收益点却是很低的,如果真的需要实现此类功能,则完全可以重新创建一个分区数较小的主题,然后将现有主题中的消息按照既定的逻辑复制过去即可。

在创建主题时有一个if-not-exists参数来忽略一些异常,在这里也有对应的参数,如果所要修改的主题不存在,可以通过if-exists参数来忽略异常。下面修改一个不存在的主题 topic-unknown的分区,会报出错误信息“Topic topic-unknown does not exist”,示例如下:

除了修改分区数,我们还可以使用 kafka-topics.sh脚本的ater指令来变更主题的配置。在创建主题的时候我们可以通过 config参数来设置所要创建主题的相关参数,通过这个参数可以覆盖原本的默认配置。在创建完主题之后,我们还可以通过alter指令配合 config参数增加或修改一些配置以覆盖它们配置原有的值。

下面的示例中演示了将主题 topic-config的max.message.bytes配置值从1000修改为2000,示例如下:

我们再次覆盖主题 topic-config的另一个配置 segment.bytes(看上去相当于增加动作),示例如下:

我们可以通过 delete-config参数来删除之前覆盖的配置,使其恢复原有的默认值。下面的示例将主题 topic-config中所有修改过的3个配置都删除:

注意到在变更(增、删、改)配置的操作执行之后都会提示一段告警信息,指明了使用kafka-topics.sh脚本的 alter指令来变更主题配置的功能已经过时(deprecated),将在未来的版本中删除,并且推荐使用 kafka-configs.sh脚本来实现相关功能。

5、配置管理

kafka-configs.sh脚本是专门用来对配置进行操作的,这里的操作是指在运行状态下修改原有的配置,如此可以达到动态变更的目的。 kafka-configs.sh脚本包含变更配置 alter和查看配置 describe这两种指令类型。同使用 kafka- topics.sh脚本变更配置的原则一样,增、删、改的行为都可以看作变更操作,不过 kafka-configs.sh脚本不仅可以支持操作主题相关的配置,还可以支持操作 broker、用户和客户端这3个类型的配置。

kafka-configs.sh脚本使用 entity-type参数来指定操作配置的类型,并且使用 entity-name参数来指定操作配置的名称。比如查看主题 topic-config的配置可以按如下方式执行:

bin/kafka-configs.sh --zookeeper localhost:2181/kafka --describe --entity-type topics --entity-name topic-config

--describe指定了查看配置的指令动作,--entity-type指定了查看配置的实体类型,--entity-name指定了查看配置的实体名称。 entity-type只可以配置4个值: topics、brokers、clients和 users,entity-type与 entity-name的对应关系如表4-1所示。

使用alter指令变更配置时,需要配合add-config和 delete-config这两个参数起使用。add-config参数用来实现配置的增、改,即覆盖原有的配置; delete-config参数用来实现配置的删,即删除被覆盖的配置以恢复默认值。下面的示例演示了add-config参数的用法,覆盖了主题 topic-config的两个配置cleanup.policy和max.message.bytes(示例执行之前主题 topic-config无任何被覆盖的配置):

上面示例中还使用了两种方式来查看主题 topic-config中配置信息,注意比较这两者之间的差别。
使用 delete-config参数删除配置时,同add-config参数一样支持多个配置的操作,多个配置之间用逗号“,”分隔,下面的示例中演示了如何删除上面刚刚增加的主题配置:

使用 kafka-configs.sh脚本来变更(alter)配置时,会在 ZooKeeper中创建一个命名形式为/config/<entity-type>/<entity-name>的节点,并将变更的配置写入这个节点,比如对于主题 topic-config而言,对应的节点名称为/config/topics/topic-config,节点中的数据内容为:

可以推导出节点内容的数据格式为:

其中 property-name代表属性名, property-value代表属性值。增加配置实际上是往节点内容中添加属性的键值对,修改配置是在节点内容中修改相应属性的属性值,删除配置是删除相应的属性键值对。
变更配置时还会在 ZooKeeper中的/config/changes/节点下创建一个以“config_change_”为前缀的持久顺序节点(PERSISTENT_SEQUENTIAL),节点命名形式可以归纳为/config/changes/config_change_<segNO>。比如示例中的主题 topic-config与此对应的节点名称和节点内容如下:

segNo是一个单调递增的10位数字的字符串,不足位则用0补齐。
查看(describe)配置时,就是从/config/<entity-type>/<entity-name>节点中获取相应的数据内容。如果使用kafka-configs.sh脚本查看配置信息时没有指定entity-name参数的值,则会查看 entity-type所对应的所有配置信息。示例如下

6、主题端参数

与主题相关的所有配置参数在 broker层面都有对应参数,比如主题端参数 cleanup.policy对应 broker层面的log.cleanup.policy。如果没有修改过主题的任何配置参数那么就会使用 broker端的对应参数作为其默认值。可以在创建主题时覆盖相应参数的默认值,
也可以在创建完主题之后变更相应参数的默认值。比如在创建主题的时候没有指定cleanup.policy参数的值,那么就使用log.cleanup.policy参数所配置的值作为cleanup.policy的值。

与主题相关的参数也有很多,由于篇幅限制,在前面的配置变更的示例中难以一一列出所有的参数,但是从配置变更的角度而言,其操作方式都是一样的。为了便于读者查阅,表4-2列出了主题端参数与 broker端参数的对照关系。

7、删除主题

如果确定不再使用一个主题,那么最好的方式是将其删除,这样可以释放一些资源,比如磁盘、文件句柄等。kafka-topics.sh脚本中的 delete指令就可以用来删除主题,比如删除一个主题 topic-delete:


可以看到在执行完删除命令之后会有相关的提示信息,这个提示信息和 broker端配置参数delete.topic.enable有关。必须将 delete.topic.enable参数配置为true才能够删除主题,这个参数的默认值就是true,如果配置为 false,那么删除主题的操作将会被忽略。在实际生产环境中,建议将这个参数的值设置为true如果要删除的主题是Kafka的内部主题,那么删除时就会报错。截至Kafka2.0.0, Kafka的内部一共包含2个主题,分别为__consumer_offsets和__transaction_state。下面的示例中尝试删除内部主题 __consumer_offsets:

尝试删除一个不存在的主题也会报错。比如下面的示例中尝试删除一个不存在的主题topic-unknown:

这里同alter指令一样,也可以通过if-exists参数来忽略异常,参考如下:

使用 kafka-topics.sh脚本删除主题的行为本质上只是在 ZooKeeper中的/admin/delete_topics路径下创建一个与待删除主题同名的节点,以此标记该主题为待删除的状态。与创建主题相同的是,真正删除主题的动作也是由 Kafka的控制器负责完成的。
了解这一原理之后,我们可以直接通过 ZooKeeper的客户端来删除主题。下面示例中使用ZooKeeper客户端 zkCli.sh来删除主题 topic-delete:

我们还可以通过手动的方式来删除主题。主题中的元数据存储在 ZooKeeper中的/brokers/topics和/config/topics路径下,主题中的消息数据存储在log.dir或log.dirs配置的路径下,我们只需要手动删除这些地方的内容即可。下面的示例中演示了如何删除主题topic-delete,总共分3个步骤,第一步和第二步的顺序可以互换。
第一步,删除 ZooKeeper中的节点/config/topics/topic-delete
[zk: localhost:2181/kafka (CONNECTED) 7] rmr /config/topics/topic-delete
第二步,删除 ZooKeeper中的节点/brokers/topics/topic-delete及其子节点。
[zk: localhost:2181/kafka (CoNNECTED) 8] delete /brokers/topics/topic-delete
第三步,删除集群中所有与主题 topic-delete有关的文件。

注意,删除主题是一个不可逆的操作。一旦删除之后,与其相关的所有消息数据会被全部删除,所以在执行这一操作的时候也要三思而后行。介绍到这里,基本上 kafka-topics.sh脚本的使用也就讲完了,为了方便读者查阅,表4-3中列出了所有 kafka-topics.sh脚本中的参数。读者也可以通过执行无任何参数的 kafka-topics.sh脚本,或者执行 kafka-topics.sh -help来查看帮助信息。

Kafka主题(Topic)的管理相关推荐

  1. kafka java api 删除_使用Java API创建(create),查看(describe),列举(list),删除(delete)Kafka主题(Topic)...

    使用Kafka的同学都知道,我们每次创建Kafka主题(Topic)的时候可以指定分区数和副本数等信息,如果将这些属性配置到server.properties文件中,以后调用Java API生成的主题 ...

  2. 使用Java API创建(create),查看(describe),列举(list),删除(delete)Kafka主题(Topic)--转载...

    原文:http://blog.csdn.net/changong28/article/details/39325079 使用Kafka的同学都知道,我们每次创建Kafka主题(Topic)的时候可以指 ...

  3. kafka专题:kafka的Topic主题、Partition分区、消费组偏移量offset等基本概念详解

    文章目录 1. kafka集群整体架构 2. kafka相关元素的基本概念 2.1 主题Topic和分区Partition 2.2 kafka消息存储在哪里? 2.3 分区副本 2.4 消费组和偏移量 ...

  4. Kafka的灵魂伴侣Logi-KafkaManger(2)之kafka针对Topic粒度的配额管理(限流)

    推荐一款非常好用的kafka管理平台,kafka的灵魂伴侣 滴滴开源Logi-KafkaManager 一站式Kafka监控与管控平台 技术交流 有想进滴滴LogI开源用户群的加我个人微信: jjdl ...

  5. kafka中Topic、Partition、Groups、Brokers概念辨析

    kafka消息队列有两种消费模式,分别是点对点模式和订阅/发布模式.具体比较可以参考Kafka基础–消息队列与消费模式. 下图是一个点对点的Kafka结构示意图,其中有以下几个部分: producer ...

  6. Kafka中topic的Partition,Kafka为什么这么快,Consumer的负载均衡及consumerGroup的概念(来自学习笔记)

    1.1. Kafka中topic的Partition  在Kafka文件存储中,同一个topic下有多个不同partition,每个partition为一个目录,partiton命名规则为topic ...

  7. kafka整合ldap权限管理

    1.先登录kafka账号 kinit -kt /home/zhc/kafka.keytab kafka 2.查询kafka所有的角色 kafka-sentry -lr 3.若没有角色,创建角色 kaf ...

  8. 使用redis kv数据库维护kafka主题分区的offset

    目的 将kafka的offset保存到外部的redis数据库中,再次读取的时候也从外部的redis数据库读取 主要步骤 1 从kafka获取要读取的消息的开始offset 2 通过offset读取数据 ...

  9. RocketMQ (六) 主题-Topic

    目录 1.什么是Topic? 2.模型关系 3.内部属性 3.1 主题名称 3.2 队列列表 3.3 消息类型 4.行为约束 5.使用建议 1.什么是Topic? Topic是 RocketMQ 中消 ...

最新文章

  1. iOS开发如何实现消息推送机制
  2. UI控件Telerik UI for WinForms发布R1 2019|附下载
  3. 中国人写的编译器-值得看看
  4. c语言转义字符%,c语言转义字符
  5. Dubbo 学习总结(12)—— 十年再出发,Dubbo 3.0 Preview 即将在 3 月发布
  6. PD连接远程mysql_PowerDesigner连接远程Oracle数据库 | 学步园
  7. 无法找到脚本文件 C:/Windows/explorer.exe:574323188.vbs
  8. 理解之软件需求说明书
  9. 在iOS上使用AirPrint实现无线打印功能
  10. 支持Android 11安卓Flash播放器终极版源码方案2022(2:网页中嵌入)
  11. 波士顿房价预测实验报告
  12. 使用Pytorch搭建U-Net网络并基于DRIVE数据集训练(语义分割)学习笔记
  13. 大屏可视化色彩设计基本知识
  14. 群体智能与进化计算_液态大脑与固态大脑——圣塔菲最新群体智能文集
  15. 执行cmd命令提示不是内部或外部命令
  16. git: Couldn‘t find remote ref
  17. FPGA设计开发(基础课题):分频器设计
  18. google adwords express使用心得
  19. mysql 全局不重复_如何批量生成MySQL不重复手机号大表实例代码
  20. ThingsBoard添加高德地图

热门文章

  1. 关于attach和detach的疑问
  2. 分类模型评价指标KS与IV的比较
  3. 记录自己折腾不止的人生,留住时光的一抹轨迹
  4. 网神综合能力过硬 服务水利系统信息化
  5. 标题 穿越雷区 java_标题:穿越雷区
  6. 你是我无法斑驳的阳光
  7. 如何改变讨好型人格?
  8. CentOS 7 查看系统时间、更新系统时间 、修改系统时间
  9. 一个专科生的Java学习历程
  10. [词性] 二十三、情态动词 2 [ have to ] [ ought to ] [ dare ] [ be able to ] [ needn‘t ] [ had better ]