目录

目录 1

1. 前言 2

2. Broker默认端口号 2

3. 安装Kafka 2

4. 启动Kafka 2

5. 创建Topic 2

6. 列出所有Topic 3

7. 删除Topic 3

8. 查看Topic 3

9. 增加topic的partition数 4

10. 生产消息 4

11. 消费消息 4

12. 查看有哪些消费者Group 4

13. 查看新消费者详情 5

14. 查看Group详情 5

15. 删除Group 5

16. 设置consumer group的offset 5

17. RdKafka自带示例 6

18. 平衡leader 6

19. 自带压测工具 6

20. 查看topic指定分区offset的最大值或最小值 6

21. 查看__consumer_offsets 6

22. 获取指定consumer group的位移信息 7

23. 20) 查看kafka的zookeeper 7

24. 如何增加__consumer_offsets的副本数? 9

25. 问题 11

附1:进程监控工具process_monitor.sh 12

附2:批量操作工具 12

附2.1:批量执行命令工具:mooon_ssh 12

附2.2:批量上传文件工具:mooon_upload 13

附2.3:使用示例 13

附3:批量设置broker.id和listeners工具 15

附4:批量设置hostname工具 16

附5:Kafka监控工具kafka-manager 16

附6:kafka的安装 16

附7:__consumer_offsets 17

1. 前言

本文内容主要来自两个方面:一是网上的分享,二是自研的随手记。日记月累,收录kafka各种命令,会持续更新。

在0.9.0.0之后的Kafka,出现了几个新变动,一个是在Server端增加了GroupCoordinator这个角色,另一个较大的变动是将topic的offset 信息由之前存储在zookeeper上改为存储到一个特殊的topic(__consumer_offsets)中。

Kafka的瓶颈容易发生在网卡,而不是CPU、内存和磁盘,所以应当考虑log的压缩。

相关网址:

1) Kafka官网:Apache Kafka

2) 下载地址:Apache Kafka

3) 客户端库:Clients - Apache Kafka - Apache Software Foundation

4) librdkafka库:https://github.com/edenhill/librdkafka

5) confluent-kafka-go:https://github.com/confluentinc/confluent-kafka-go

2. Broker默认端口号

9092,建议安装时,在zookeeper中指定kafka的根目录,比如“/kafka”,而不是直接使用“/”,这样多套kafka也可共享同一个zookeeper集群。

3. 安装Kafka

相比Hadoop、HBase、Spark,甚至Redis等,Kafka的安装到跑起来比较简单,参考官方的介绍即可:Apache Kafka。

Kafka依赖Zookeeper,本身自带了Zookeeper,不过建议另外安装Zookeeper。

4. 启动Kafka

kafka-server-start.sh config/server.properties

后台常驻方式,请带上参数“-daemon”,如:

/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

5. 创建Topic

参数--topic指定Topic名,--partitions指定分区数,--replication-factor指定备份数:

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

注意,如果配置文件server.properties指定了kafka在zookeeper上的目录,则参数也要指定,否则会报无可用的brokers,如:

kafka-topics.sh --create --zookeeper localhost:2181/kafka --replication-factor 1 --partitions 1 --topic test

6. 列出所有Topic

kafka-topics.sh --list --zookeeper localhost:2181

注意,如果配置文件server.properties指定了kafka在zookeeper上的目录,则参数也要指定,否则会报无可用的brokers,如:

kafka-topics.sh --list --zookeeper localhost:2181/kafka

输出示例:

__consumer_offsets

my-replicated-topic

test

7. 删除Topic

1) kafka-topics.sh --zookeeper localhost:2181 --topic test --delete

2) kafka-topics.sh --zookeeper localhost:2181/kafka --topic test --delete

3) kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper localhost:2181 --topic test

8. 查看Topic

kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

注意,如果配置文件server.properties指定了kafka在zookeeper上的目录,则参数也要指定,否则会报无可用的brokers,如:

kafka-topics.sh --describe --zookeeper localhost:2181/kafka --topic test

输出示例:

Topic:test PartitionCount:3 ReplicationFactor:2 Configs:

Topic: test Partition: 0 Leader: 140 Replicas: 140,214 Isr: 140,214

Topic: test Partition: 1 Leader: 214 Replicas: 214,215 Isr: 214,215

Topic: test Partition: 2 Leader: 215 Replicas: 215,138 Isr: 215,138

9. 增加topic的partition数

kafka-topics.sh --zookeeper localhost:2181 --alter --topic test --partitions 5

10. 生产消息

kafka-console-producer.sh --broker-list localhost:9092 --topic test

11. 消费消息

1) 从头开始

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

2) 从尾部开始

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset latest

3) 指定分区

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset latest --partition 1

4) 取指定个数

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset latest --partition 1 --max-messages 1

5) 新消费者(ver>=0.9)

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties

12. 查看有哪些消费者Group

1) 分ZooKeeper方式(老)

kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181/kafka --list

2) API方式(新)

访问腾讯云上的 Kafka 只能此方法,因为腾讯云上的 Kafka 没有暴露 Zookeeper 。

kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9092 --list

输出示例:

test

console-consumer-37602

console-consumer-75637

console-consumer-59893

13. 查看新消费者详情

仅支持offset存储在zookeeper上的:

kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group test

14. 查看Group详情

kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9092 --group test --describe

输出示例:

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID

test  1         87             87             0   -           -    -

15. 删除Group

老版本的ZooKeeper方式可以删除Group,新版本则自动删除,当执行:

kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9092 --group test --delete

输出如下提示:

Option '[delete]' is only valid with '[zookeeper]'.

Note that there's no need to delete group metadata for the new consumer

as the group is deleted when the last committed offset for that group expires.

16. 设置consumer group的offset

执行zkCli.sh进入zookeeper命令行界面,假设需将group为testgroup的topic的offset设置为2018,则:set /consumers/testgroup/offsets/test/0 2018

如果kakfa在zookeeper中的根目录不是“/”,而是“/kafka”,则:

set /kafka/consumers/testgroup/offsets/test/0 2018

另外,还可以使用kafka自带工具kafka-run-class.sh kafka.tools.UpdateOffsetsInZK修改,命令用法:

kafka.tools.UpdateOffsetsInZK$ [earliest | latest] consumer.properties topic

从用法提示可以看出,只能修改为earliest或latest,没有直接修改zookeeper灵活。

17. RdKafka自带示例

rdkafka_consumer_example -b 127.0.0.1:9092 -g test test

rdkafka_consumer_example -e -b 127.0.0.1:9092 -g test test

18. 平衡leader

kafka-preferred-replica-election.sh --zookeeper localhost:2181/chroot

19. 自带压测工具

kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100 --producer-props bootstrap.servers=localhost:9092

20. 查看topic指定分区offset的最大值或最小值

time为-1时表示最大值,为-2时表示最小值:

kafka-run-class.sh kafka.tools.GetOffsetShell --topic hive-mdatabase-hostsltable --time -1 --broker-list 127.0.0.1:9092 --partitions 0

21. 查看__consumer_offsets

需consumer.properties中设置exclude.internal.topics=false:

1) 0.11.0.0之前版本

kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning

2) 0.11.0.0之后版本(含)

kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning

22. 获取指定consumer group的位移信息

需consumer.properties中设置exclude.internal.topics=false:

1) 0.11.0.0版本之前:

kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9091,localhost:9092,localhost:9093 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"

2) 0.11.0.0版本以后(含):

kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9091,localhost:9092,localhost:9093 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"

23. 20) 查看kafka的zookeeper

1) 查看Kakfa在zookeeper的根目录

[zk: localhost:2181(CONNECTED) 0] ls /kafka

[cluster, controller_epoch, controller, brokers, admin, isr_change_notification, consumers, config]

2) 查看brokers

[zk: localhost:2181(CONNECTED) 1] ls /kafka/brokers

[ids, topics, seqid]

3) 查看有哪些brokers(214和215等为server.properties中配置的broker.id值):

[zk: localhost:2181(CONNECTED) 2] ls /kafka/brokers/ids

[214, 215, 138, 139]

4) 查看broker 214,下列数据显示该broker没有设置JMX_PORT:

[zk: localhost:2181(CONNECTED) 4] get /kafka/brokers/ids/214

{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://test-204:9092"],"jmx_port":-1,"host":"test-204","timestamp":"1498467464861","port":9092,"version":4}

cZxid = 0x200002400

ctime = Mon Jun 26 16:57:44 CST 2017

mZxid = 0x200002400

mtime = Mon Jun 26 16:57:44 CST 2017

pZxid = 0x200002400

cversion = 0

dataVersion = 0

aclVersion = 0

ephemeralOwner = 0x45b9d9e841f0136

dataLength = 190

numChildren = 0

5) 查看controller,下列数据显示broker 214为controller:

[zk: localhost:2181(CONNECTED) 9] get /kafka/controller

{"version":1,"brokerid":214,"timestamp":"1498467946988"}

cZxid = 0x200002438

ctime = Mon Jun 26 17:05:46 CST 2017

mZxid = 0x200002438

mtime = Mon Jun 26 17:05:46 CST 2017

pZxid = 0x200002438

cversion = 0

dataVersion = 0

aclVersion = 0

ephemeralOwner = 0x45b9d9e841f0136

dataLength = 56

numChildren = 0

6) 查看kafka集群的id:

[zk: localhost:2181(CONNECTED) 13] get /kafka/cluster/id

{"version":"1","id":"OCAEJy4qSf29bhwOfO7kNQ"}

cZxid = 0x2000023e7

ctime = Mon Jun 26 16:57:28 CST 2017

mZxid = 0x2000023e7

mtime = Mon Jun 26 16:57:28 CST 2017

pZxid = 0x2000023e7

cversion = 0

dataVersion = 0

aclVersion = 0

ephemeralOwner = 0x0

dataLength = 45

numChildren = 0

7) 查看有哪些topics:

[zk: localhost:2181(CONNECTED) 16] ls /kafka/brokers/topics

[test, my-replicated-topic, test1, test2, test3, test123, __consumer_offsets, info]

8) 查看topic下有哪些partitions:

[zk: localhost:2181(CONNECTED) 19] ls /kafka/brokers/topics/__consumer_offsets/partitions

[44, 45, 46, 47, 48, 49, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43]

9) 查看“partition 0”的状态:

[zk: localhost:2181(CONNECTED) 22] get /kafka/brokers/topics/__consumer_offsets/partitions/0/state

{"controller_epoch":2,"leader":215,"version":1,"leader_epoch":1,"isr":[215,214]}

cZxid = 0x2000024c6

ctime = Mon Jun 26 18:02:07 CST 2017

mZxid = 0x200bc4fc3

mtime = Mon Aug 27 18:58:10 CST 2018

pZxid = 0x2000024c6

cversion = 0

dataVersion = 1

aclVersion = 0

ephemeralOwner = 0x0

dataLength = 80

numChildren = 0

24. 如何增加__consumer_offsets的副本数?

可使用kafka-reassign-partitions.sh来增加__consumer_offsets的副本数,方法如下,构造一JSON文件reassign.json:

{

"version":1,

"partitions":[

{"topic":"__consumer_offsets","partition":0,"replicas":[1,2,3]},

{"topic":"__consumer_offsets","partition":1,"replicas":[2,3,1]},

{"topic":"__consumer_offsets","partition":2,"replicas":[3,1,2]},

{"topic":"__consumer_offsets","partition":3,"replicas":[1,2,3]},

...

{"topic":"__consumer_offsets","partition":100,"replicas":[2,3,1]}

]

}

然后执行:

kafka-reassign-partitions.sh --zookeeper localhost:2181/kafka --reassignment-json-file reassign.json --execute

“[1,2,3]”中的数字为broker.id值。

如果执行报错“Partitions reassignment failed due to Partition reassignment data file is empty”,可能是因为reasign.json文件格式不对,比如成下列格式了(中间的没有以逗号结尾):

{"topic":"__consumer_offsets","partition":29,"replicas":[5,3,2]},

{"topic":"__consumer_offsets","partition":30,"replicas":[1,4,3]}

{"topic":"__consumer_offsets","partition":31,"replicas":[2,5,4]}

{"topic":"__consumer_offsets","partition":32,"replicas":[3,2,5]}

{"topic":"__consumer_offsets","partition":33,"replicas":[4,3,1]},

如果执行遇到下列错误:

Partitions reassignment failed due to Partition replica lists may not contain duplicate entries: __consumer_offsets-16 contains multiple entries for 2. __consumer_offsets-39 contains multiple entries for 2. __consumer_offsets-40 contains multiple entries for 3. __consumer_offsets-44 contains multiple entries for 3

原因是一个分区的两个副本被指定在同一个broker上,以16号分区为列,有两个副本落在了broker 2上:

{"topic":"__consumer_offsets","partition":16,"replicas":[2,5,2]},

执行成功后的输出:

$ ../bin/kafka-reassign-partitions.sh --zookeeper 192.168.1.35.31:2181/kafka --reassignment-json-file __consumer_offsets.reassign --execute

Current partition replica assignment

{"version":1,"partitions":[{"topic":"__consumer_offsets","partition":22,"replicas":[2],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":30,"replicas":[5],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":8,"replicas":[3],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":21,"replicas":[1],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":4,"replicas":[4],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":27,"replicas":[2],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":7,"replicas":[2],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":9,"replicas":[4],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":46,"replicas":[1],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":25,"replicas":[5],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":35,"replicas":[5],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":41,"replicas":[1],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":33,"replicas":[3],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":23,"replicas":[3],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":49,"replicas":[4],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":47,"replicas":[2],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":16,"replicas":[1],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":28,"replicas":[3],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":31,"replicas":[1],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":36,"replicas":[1],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":42,"replicas":[2],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":3,"replicas":[3],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":18,"replicas":[3],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":37,"replicas":[2],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":15,"replicas":[5],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":24,"replicas":[4],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":38,"replicas":[3],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":17,"replicas":[2],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":48,"replicas":[3],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":19,"replicas":[4],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":11,"replicas":[1],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":13,"replicas":[3],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":2,"replicas":[2],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":43,"replicas":[3],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":6,"replicas":[1],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":14,"replicas":[4],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":20,"replicas":[5],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":0,"replicas":[5],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":44,"replicas":[4],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":39,"replicas":[4],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":12,"replicas":[2],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":45,"replicas":[5],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":5,"replicas":[5],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":26,"replicas":[1],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":29,"replicas":[4],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":34,"replicas":[4],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":10,"replicas":[5],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":32,"replicas":[2],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":40,"replicas":[5],"log_dirs":["any"]}]}

Save this to use as the --reassignment-json-file option during rollback

Successfully started reassignment of partitions.

25. 问题

1) -190,Local: Unknown partition

比如单机版只有一个分区,但prodcue参数的分区值为1等。

2) Rdkafka程序日志“delivery failed. errMsg:[Local: Message timed out]”

同一个程序,在有些机器上会这个错误,有些机器则工作正常,相关的issues:

https://github.com/edenhill/librdkafka/issues/474

实测是因为在运行Kafka应用程序的机器上没有配置Kafka Brokers机器的hosts。

另外的解决办法是在server.properties配置listeners和advertised.listeners,并且使用IP而不是hostname作为值。

3) Name or service not known (after 9020595078ms in state INIT)

event_callback: type(0), severity(3), (-193)kafka-204:9092/214: Failed to resolve 'kafka-204:9092': Name or service not known (after 9020595078ms in state INIT)

原因是运行kafka应用程序(非kafka本身)的机器不能识别主机名kafka-204(Kafka Brokers机器可以识别),

解决办法是在server.properties配置listeners和advertised.listeners,并且使用IP而不是hostname作为值。

附1:进程监控工具process_monitor.sh

process_monitor.sh为shell脚本,本身含详细的使用说明和帮助提示。适合放在crontab中,检测到进程不在时,3秒左右时间重拉起。支持不同用户运行相同程序,也支持同一用户带不同参数运行相同程序。

下载网址:

https://github.com/eyjian/libmooon/blob/master/shell/process_monitor.sh

使用示例:

* * * * * /usr/local/bin/process_monitor.sh "/usr/local/jdk/bin/java kafkaServer" "/data/kafka/bin/kafka-server-start.sh -daemon /data/kafka/config/server.properties"

由于所有的java程序均运行在JVM中,所以程序名均为java,“kafkaServer”用于限定只监控kafka。如果同一用户运行多个kafka实例,则需加端口号区分,并且要求端口号为命令行参数,和“kafkaServer”共同组成匹配模式。

当检测到进程不存在时,则执行第三列的重启指令“/data/kafka/bin/kafka-server-start.sh -daemon /data/kafka/config/server.properties”。

使用示例2,监控zooekeeper:

* * * * * /usr/local/bin/process_monitor.sh "/usr/local/jdk/bin/java -Dzookeeper" "/data/zookeeper/bin/zkServer.sh start"

附2:批量操作工具

适用用来批量安装kafka和日常运维。

下载网址:

https://github.com/eyjian/libmooon/releases

监控工具有两个版本:一是C++版本,另一是GO版本。当前C++版本比较成熟,GO版本相当简略,但C++版本依赖C++运行时库,不同环境需要特定编译,而GO版本可不依赖C和C++运行时库,所以不需编译即可应用到广泛的Linux环境。

使用简单,直接执行命令,即会提示用法。

附2.1:批量执行命令工具:mooon_ssh

参数名

默认值

说明

-u

用户名参数,可用环境变量U替代

-p

密码参数,可用环境变量P替代

-h

IP列表参数,可用环境变量H替代

-P

22,可修改源码,编译为常用端口号

SSH端口参数,可用环境变量PORT替代

-c

在远程机器上执行的命令,建议单引号方式指定值,除非要执行的命令本身已经包含了单引号有冲突。使用双引号时,要注意转义,否则会被本地shell解释

-v

1

工具输出的详细度

附2.2:批量上传文件工具:mooon_upload

参数名

默认值

说明

-u

用户名参数,可用环境变量U替代

-p

密码参数,可用环境变量P替代

-h

IP列表参数,可用环境变量H替代

-P

22,可修改源码,编译为常用端口号

SSH端口参数,可用环境变量PORT替代

-s

以逗号分隔的,需要上传的本地文件列表,可以带相对或绝对目录

-d

文件上传到远程机器的目录,只能为单个目录

附2.3:使用示例

1) 使用示例1:上传/etc/hosts

mooon_upload -s=/etc/hosts -d=/etc

2) 使用示例2:检查/etc/profile文件是否一致

mooon_ssh -c='md5sum /etc/hosts'

3) 使用示例3:批量查看crontab

mooon_ssh -c='crontab -l'

4) 使用示例4:批量清空crontab

mooon_ssh -c='rm -f /tmp/crontab.empty;touch /tmp/crontab.empty'

mooon_ssh -c='crontab /tmp/crontab.emtpy'

5) 使用示例5:批量更新crontab

mooon_ssh -c='crontab /tmp/crontab.online'

6) 使用示例6:取远端机器IP

因为awk用单引号,所以参数“-c”的值不能使用单引号,所以内容需要转义,相对其它来说要复杂点:

mooon_ssh -c="netstat -ie | awk -F[\\ :]+ 'BEGIN{ok=0;}{if (match(\$0, \"eth1\")) ok=1; if ((1==ok) && match(\$0,\"inet\")) { ok=0; if (7==NF) printf(\"%s\\n\",\$3); else printf(\"%s\\n\",\$4);} }'"

不同的环境,IP在“netstat -ie”输出中的位置稍有不同,所以awk中加了“7==NF”判断,但仍不一定适用于所有的环境。需要转义的字符包含:双引号、美元符和斜杠。

7) 使用示例7:批量查看kafka进程(环境变量方式)

$ export H=192.168.31.9,192.168.31.10,192.168.31.11,192.168.31.12,192.168.31.13

$ export U=kafka

$ export P='123456'

mooon_ssh -c='/usr/local/jdk/bin/jps -m'

[192.168.31.15]

50928 Kafka /data/kafka/config/server.properties

125735 Jps -m

[192.168.31.15] SUCCESS

[192.168.31.16]

147842 Jps -m

174902 Kafka /data/kafka/config/server.properties

[192.168.31.16] SUCCESS

[192.168.31.17]

51409 Kafka /data/kafka/config/server.properties

178771 Jps -m

[192.168.31.17] SUCCESS

[192.168.31.18]

73568 Jps -m

62314 Kafka /data/kafka/config/server.properties

[192.168.31.18] SUCCESS

[192.168.31.19]

123908 Jps -m

182845 Kafka /data/kafka/config/server.properties

[192.168.31.19] SUCCESS

================================

[192.168.31.15 SUCCESS] 0 seconds

[192.168.31.16 SUCCESS] 0 seconds

[192.168.31.17 SUCCESS] 0 seconds

[192.168.31.18 SUCCESS] 0 seconds

[192.168.31.19 SUCCESS] 0 seconds

SUCCESS: 5, FAILURE: 0

8) 使用示例8:批量停止kafka进程(参数方式)

$ mooon_ssh -c='/data/kafka/bin/kafka-server-stop.sh' -u=kafka -p='123456' -h=192.168.31.15,192.168.31.16,192.168.31.17,192.168.31.18,192.168.31.19

[192.168.31.15]

No kafka server to stop

command return 1

[192.168.31.16]

No kafka server to stop

command return 1

[192.168.31.17]

No kafka server to stop

command return 1

[192.168.31.18]

No kafka server to stop

command return 1

[192.168.31.19]

No kafka server to stop

command return 1

================================

[192.168.31.15 FAILURE] 0 seconds

[192.168.31.16 FAILURE] 0 seconds

[192.168.31.17 FAILURE] 0 seconds

[192.168.31.18 FAILURE] 0 seconds

[192.168.31.19 FAILURE] 0 seconds

SUCCESS: 0, FAILURE: 5

附3:批量设置broker.id和listeners工具

为shell脚本,有详细的使用说明和帮助提示,依赖mooon_ssh和mooon_upload:

https://github.com/eyjian/libmooon/blob/master/shell/set_kafka_id_and_ip.sh

附4:批量设置hostname工具

为shell脚本,有详细的使用说明和帮助提示,依赖mooon_ssh和mooon_upload:

https://github.com/eyjian/libmooon/blob/master/shell/set_hostname.sh

附5:Kafka监控工具kafka-manager

官网:https://github.com/yahoo/kafka-manager

kafka-manager的数据主要来源两个方便:一是kafka的zookeeper数据,二是kafka的JMX数据。

kafka-manager要求JDK版本不低于1.8,从源码编译kafka-manager相对复杂,但编译拿到二进制包后,只需修改application.conf中的“kafka-manager.zkhosts”值,即可开始启动kafka-manager。“kafka-manager.zkhosts”值,不是kafka的zookeeper配置值,而是kafka-manager自己用的zookeeper配置,所以两者可以为不同的zookeeper,注意值用双引号引起来。

crontab启动示例:

JMX_PORT=9999

* * * * * /usr/local/bin/process_monitor.sh "/usr/local/jdk/bin/java kafkaServer" "/data/kafka/bin/kafka-server-start.sh -daemon /data/kafka/config/server.properties"

指定JMX_PORT不是必须的,但建议设置,这样kafka-manager可以更详细的查看brokers。

crontab中启动kafka-manager示例(指定服务端口为8080,不指定的默认值为9000):

* * * * * /usr/local/bin/process_monitor.sh "/usr/local/jdk/bin/java kafka-manager" "/data/kafka/kafka-manager/bin/kafka-manager -Dconfig.file=/data/kafka/kafka-manager/conf/application.conf -Dhttp.port=8080 > /dev/null 2>&1"

process_monitor.sh下载:

https://github.com/eyjian/libmooon/blob/master/shell/process_monitor.sh

注意crontab的用户密码有效,crontab才能正常执行。

附6:kafka的安装

最基本的两个配置项为server.properties文件中的:

1) Broker.id

2) zookeeper.connect

其中broker.id每个节点要求不同,zookeeper.connect值建议指定目录,不要直接放在zookeeper根目录下。另外也建议设置listeners值,不然需要客户端配置hostname和IP的映射关系。

因broker.id和listeners的原因,每个节点的server.properties不一致,可利用工具set_kafka_id_and_ip.sh实现批量的替换,以简化kafka集群的部署。set_kafka_id_and_ip.sh下载地址:https://github.com/eyjian/libmooon/blob/master/shell/set_kafka_id_and_ip.sh。

crontab中启动kafka示例:

JMX_PORT=9999

* * * * * /usr/local/bin/process_monitor.sh "/usr/local/jdk/bin/java kafkaServer" "/data/kafka/bin/kafka-server-start.sh -daemon /data/kafka/config/server.properties"

设置JMX_PORT是为方便kafka-manager管理kafka。

附7:__consumer_offsets

__consumer_offsets是kafka内置的Topic,在0.9.0.0之后的Kafka,将topic的offset 信息由之前存储在zookeeper上改为存储到内置的__consumer_offsets中。

server.properties中的配置项num.partitions和default.replication.factor对__consumer_offsets无效,而是受offsets.topic.num.partitions和offsets.topic.replication.factor两个控制。

Kafka常用命令收录相关推荐

  1. Kafka学习之四 Kafka常用命令

    2019独角兽企业重金招聘Python工程师标准>>> Kafka学习之四 Kafka常用命令 Kafka常用命令 以下是kafka常用命令行总结: 1.查看topic的详细信息 . ...

  2. Docker 常用命令收录 -- 持续更新

    Docker 常用命令收录 容器操作 docker build -t friendlyname . # 使用当前目录下的内容创建Dockerfile镜像文件 docker run -p 4000:80 ...

  3. Kafka常用命令(1):kafka-topics

    Kafka常用命令之:kafka-topics 概述 1. 创建Topic: --create 2. 查看Topic详细信息: --describe 3. 查看Topic列表: --list 4.修改 ...

  4. Kafka常用命令行命令

    文章目录 Kafka常用命令 kafka的基本操作(命令行操作) 1.启动集群: 2.查看当前服务器中的所有topic(在kafka目录下) 3.创建主题topic(在kafka目录下) 4.删除to ...

  5. kafka创建topic_一网打尽Kafka常用命令、脚本及配置,宜收藏!

    前言 通过前面 7 篇文章的介绍,小伙伴们应该对 Kafka 运行工作原理有一个相对比较清晰的认识了. Kafka是什么?一起来看看吧! Kafka 安装及简单命令使用 Kafka中消息如何被存储到B ...

  6. kafka常用命令及问题解决

    常用命令 Kafka内部提供了许多管理脚本,这些脚本都放在$KAFKA_HOME/bin目录下,而这些类的实现都是放在源码的kafka/core/src/main/scala/kafka/tools/ ...

  7. kafka java api 删除_Kafka入门系列—6. Kafka 常用命令及Java API使用

    常用命令 启动Zookeeper ./zkServer.sh start-foreground 可选参数: ./zkServer.sh {start|start-foreground|stop|res ...

  8. 一网打尽Kafka常用命令、脚本及配置,宜收藏!

    前言 通过前面 7 篇文章的介绍,小伙伴们应该对 Kafka 运行工作原理有一个相对比较清晰的认识了. Kafka是什么?一起来看看吧! Kafka 安装及简单命令使用 Kafka中消息如何被存储到B ...

  9. linux系统kafka常用命令

    进入kafka目录(非bin目录) cd /usr/local/kafka/kafka_2.11-2.1.0 Kafka启动命令(备注:先进入kafka目录) 常规模式启动kafka bin/kafk ...

最新文章

  1. 抽奖 开源 html5,抽奖转盘.html · smilestone/awardRotate - Gitee.com
  2. matlab 图像操作函数的详解
  3. SpringDataRedis对Redis的数据类型的常用操作API的使用代码举例
  4. webpack加载postcss,以及autoprefixer的loader
  5. EL中的param和params
  6. javascript --- 抽象相等
  7. javascript获取时间差
  8. 并不是每个女生都能穿出这种效果......
  9. c#异常处理_C#异常处理能力问题和解答 套装2
  10. 【第二周】四人小组:车辆管理系统
  11. 如何建立最初的三层架构[转]
  12. matlab按图像边缘抠图_干货:PS抠图的九种方法,最后一个简直是万能
  13. 【算法系列】-开根号
  14. android自定义数字键盘
  15. 【菜鸟学习论文】2020_Wasserstein Distances for Stereo Disparity Estimation
  16. 区块链技术及应用概述
  17. DevOps ACA 阿里云效软件测试和质量保证(八)
  18. 饿了么的树形控件的使用
  19. nvl2与nvl使用区别
  20. 进制转换与进制转换表达式

热门文章

  1. 电脑如何设置开机密码?详细教程来了
  2. Android App实战项目之实现手写签名APP功能(附源码,简单易懂 可直接实用)
  3. 疫情下的情人节 餐饮业再亏700亿!
  4. HTML实现公告文字滚动效果
  5. Hibernate的多种关系映射(oto、otm、mtm)
  6. BeeGFS-Mon对接Grafana
  7. 诺基亚 Lumia 920T 今日发布 处理器升级
  8. 算法 | 03 字符串(KMP)
  9. java 堆外内存 查看_JAVA堆外内存排查小结
  10. 2.石头游戏(坑爹)