参考博客:
看完这篇Kafka,你也许就会了Kafka

一、kafka安装使用

步骤:

brew install zookeeper
brew install kafka

安装位置在/usr/local/Cellar,安装完成后会有两个文件夹zookeeperkafka
配置文件在/usr/local/etc/kafka,里面有server.propertieszookeeper.properties
启动zookeeper和kafaka:

#第一种方式,使用 brew services 插件启动
brew services start zookeeper
brew services start kafka#第二种方式,直接启动
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &
kafka-server-start /usr/local/etc/kafka/server.properties

创建topic:

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1

查看topic:

kafka-topics --list --zookeeper localhost:2181

生产者消息

kafka-console-producer --broker-list localhost:9092 --topic test1

新建一个shell窗口,作为消费者消息窗口

kafka-console-consumer --bootstrap-server localhost:9092 --topic test1 --from-beginning

效果:生产者消息窗口输入什么,消费者消息窗口输出什么。
kafka命令大全:

Option Description
–alter Alter the number of partitions, eplica assignment, and/or configuration for the topic.
–at-min-isr-partitions if set when describing topics, only show partitions whose isr count is equal to the configured minimum. Not supported with the --zookeeper option.
–bootstrap-server <String: server to connect to> REQUIRED: The Kafka server to connect to. In case of providing this, a direct Zookeeper connection won’t be required.
–command-config <String: command config property file> Property file containing configs to be passed to Admin Client. This is used only with --bootstrap-server option for describing and altering broker configs.
–config <String: name=value> A topic configuration override for the topic being created or altered. The following is a list of valid configurations:
cleanup.policy
compression.type
delete.retention.ms
file.delete.delay.ms
flush.messages
flush.ms
follower.replication.throttled.
replicas
index.interval.bytes
leader.replication.throttled.replicas
max.compaction.lag.ms
max.message.bytes
message.downconversion.enable
message.format.version
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas
preallocate # retention.bytes # retention.ms
segment.bytes
segment.index.bytes
segment.jitter.ms
segment.ms
unclean.leader.election.enable
See the Kafka documentation for full etails on the topic configs. It is supported only in combination with --create if --bootstrap-server option is used (the kafka-configs CLI supports altering topic configs with a --bootstrap-server option).
–create Create a new topic.
–delete Delete a topic
–delete-config <String: name> A topic configuration override to be removed for an existing topic (see the list of configurations under the --config option). Not supported with the --bootstrap-server option.
–describe List details for the given topics.
–disable-rack-aware Disable rack aware replica assignment
–exclude-internal exclude internal topics when running list or describe command. The internal topics will be listed by default
–force Suppress console prompts
–help Print usage information.
–if-exists if set when altering or deleting or describing topics, the action will only execute if the topic exists.
–if-not-exists if set when creating topics, the action will only execute if the topic does not already exist.
–list List all available topics.
–partitions <Integer: # of partitions> The number of partitions for the topic being created or altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected). If not supplied for create, defaults to the cluster default.
–replica-assignment <String:
broker_id_for_part1_replica1:
broker_id_for_part1_replica2,
broker_id_for_part2_replica1: broker_id_for_part2_replica2,
…>
A list of manual partition-to-broker assignments for the topic being created or altered.
–replication-factor <Integer: replication factor> The replication factor for each partition in the topic being created. If not supplied, defaults to the cluster default.
–topic <String: topic> The topic to create, alter, describe or delete. It also accepts a regular expression, except for --create option. Put topic name in double quotes and use the ‘’ prefix to escape regular expression symbols; e.g. “test.topic”.
–topics-with-overrides if set when describing topics, only show topics that have overridden configs
–unavailable-partitions if set when describing topics, only show partitions whose leader is not available
–under-min-isr-partitions if set when describing topics, only show partitions whose isr count is less than the configured minimum. Not supported with the --zookeeper option.
–under-replicated-partitions if set when describing topics, only show under replicated partitions
–version Display Kafka version.
–zookeeper <String: hosts> DEPRECATED, The connection string for the zookeeper connection in the form host:port. Multiple hosts can be given to allow fail-over.

使用--delete --topic demoName命令删除的时候会出现一条这样的消息:

Topic demoName is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

所以只是被标记删除,并没有真正地删除,要真正删除,需要在config/server.properties中设置delete.topic.enable=true

二、kafka简介

kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要用于大数据实时处理领域。
kafka基础架构主要存在生产者Producer、kafka集群Broker、消费者Consumer、注册消息Zookeeper。

  • Producer:消息生产者,向kafka中发布消息的角色。
  • Consumer:消息消费者,即从kafka中拉取消息消费的客户端。
  • Consumer Group:消费者组,一组中存在多个消费者,消费者消费Broker中当前Topic的不同分区的消息,消费者之间互不影响,所有的消费者都属于某个消费组,即消费者组是逻辑上的一个订阅者。某一个分区中的消息只能够被一个消费者组中的一个消费者所消费
  • Broker:经纪人,一台kafka服务器就是一个Broker,一个集群由多个Broker组成,一个Broker可以容纳多个topic
  • Topic:主题,可以理解为一个队列。生产者和消费者都是面向一个topic。
  • Partition:分区,为了实现扩展性,一个非常大的topic可以分布到多个Broker上,一个Topic可以分为多个Partition,每个Partition是一个有序的队列(分区有序不能保证全局有序)。
  • Replication:副本,为保证集群中某个节点发生故障,节点上Partition数据不丢失,kafka可以正常地工作,kafka提供了副本机制,一个topic的每个Partition有若干个副本,一个leader和多个follower。
  • Leader:每个分区多个副本的主角色,生产者发送数据的对象,以及消费者消费数据的对象。
  • Follower:每个分区多个副本的从角色,实时地从Leader中同步数据,leader发生故障的时候,某个follower会成为新的leader。

  • kafka工作流程:Topic是逻辑上的改变,Partition是物理上的概念,每个Partition对应着一个log文件,该log文件存储的就是producer生产的数据。Producer生产的数据会被不断追加到该log文件的末端,且每条数据都有自己的offset,Consumer组中的每个Consumer都会实时记录自己消费到了哪个offset,以便于出错恢复的时候可以从上次的位置接续消费。
  • kafka的文件存储:一个Topic分为多个Partition,一个Partition分为多个segment,一个segment对应两个文件.log.index
    其中.index文件存储消息的offset+真实的起始偏移量,.log存储的是真实的数据。
    为快速定位大文件中消息位置,kafka采用了分片和索引的机制来加速定位。
  • 生产者分区策略
    生产者ISR:为保证producer发送的数据能够可靠地发送到指定的topic中,topic中的每个Partition收到producer发送的数据后,都需要向producer发送ack,即acknowledgment,如果producer收到ack就会进行下一轮的发送,否则重新发送数据。
  • 生产者分区分配策略
    • 生产者ISR(in-sync replica set)【速率和leader相差低于10s的follower集合

      • 副本数据同步策略:全部follower同步完成才发送ack。这种方式比半数follower同步完成即发送ack的容错率要高。虽然这样网络延迟较高,但网络延迟对kafka的影响较小,而方案二会造成大量数据冗余。
    • 生产者ack机制
      • kafka为用户提供了三种可靠性级别,用户可根据可靠性和延迟的要求进行权衡选择不同的配置。
        参数0:producer不等待Broker的ack,提供了最低的延迟,Broker接收到还没有写入磁盘就已经返回,但Broker故障时可能丢失数据
        参数1:producer等待Broker的ack,Partition的leader落盘成功后返回ack,如果follower同步成功之前leader故障,那么将丢失数据。(只是leader落盘)。
        参数-1(all):producer等待Broker的ack、Partition的leader和ISR的follower全部落盘成功才返回ack,但如果是在follower同步完成后,Broker发送ack之前,此时leader发生故障的话会造成数据重复
        总结:producer返回ack,0无落盘直接返,1只leader落盘返,-1全部落盘然后返。
    • 数据一致性问题
      • LEO(Log End Offset):每个副本最后的一个offset
      • HW(High Watermark):高水位,指消费者能见到的最大的offset,ISR队列中最小的LEO。
      • follower故障:follower发生故障后会被临时提出ISR,等待该follower恢复后,follower会读取本地磁盘记录的上次HW,并将log文件高于HW的部分截掉,从HW开始向leader进行同步,等待该follower的LEO≥该Partition的HW,即follower追上leader之后,就可以重新加入ISR了。
      • leader故障:leader发生故障后,会从ISR中选出一个新的leader,为了保证多个副本之间的数据一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader中同步数据。
      • 这只能保证副本之间的数据一致性,并不能保证数据不丢失或数据重复
    • Exactly Once(精确的一次)
      • ack级别设置为-1,保证producer答server之间不会丢失数据,即At Least Once,至少一次语义,不能保证数据不重复。ack级别设置为0,保证生产者每条消息只会被发送一次,即At Most Once,至多一次语义,保证数据不重复却不能保证数据不丢失。对于重要的数据,要求数据不重复也不丢失,即Exactly Once
        0.11版本的kafka引入了幂等性,幂等性指代Producer无论向Server发送了多少次重复数据,Server端都只会持久化一条数据。启用幂等性,即在Producer的参数中设置enable.idempotence=true即可。但幂等性无法保证跨分区会话的Exactly Once。
  • 消费者分区分配策略
    • 消费方式:Consumer采用pull拉的方式从Broker中读取数据。
      push推的方式很难适应消费速率不同的消费者,典型问题是拒绝服务以及网络堵塞。而pull的方式可以让Consumer根据自己的消费处理能力以适当的速度消费消息。
    • kafka的两种分配策略:
      Round-Robin:主要采用轮询的方式分配所有的分区,会导致每个Consumer所承载的Partition数量不一致,从而导致各个Consumer压力不均。
      Range:采用重分配方式,首先计算各个Consumer将会承载的Partition数量,然后将指定数量的Partition分配给该Consumer。按照range的方式进行分配,本质是以此遍历每个topic,然后将这个topic按照其订阅的Consumer数量进行平均分配,多出来的则按照Consumer的字典序挨个分配,这样会导致在前面的Consumer得到更多的Partition,导致各个Consumer压力不均。
    • 消费者offset的存储:kafka0.9版本之前,Consumer默认将offset保存在zookeeper中,0.9版本之后默认保存在kafka一个内置的topic中,为__consumer_offsets
    • 同一个消费者组中的消费者,同一时刻只能有一个消费者消费
  • 高效读写 & zookeeper的作用
    • kafka的高效读写

      • 顺序读磁盘:kafka的Producer生产数据写入log文件中时采用的是顺序写的方式。顺序写之所以快和磁盘的机械结构有关,省去了大量的磁头寻址的时间。
      • 零拷贝技术:这种方式只用将磁盘文件的数据复制到页面缓存一次,然后将数据从页面缓存直接发送到网络中,从而避免了重复复制的操作。
    • kafka中zookeeper的作用
      • kafka集群中有一个Broker会被选举为Controller,负责管理Broker的上下线、所有topic的分区副本分配和leader的选举(leader选举策略为先到先得)等工作。controller的工作管理依赖于zookeeper。
  • 事务

kafka从0.11版本开始引入事务支持,事务可以保证kafka在Exactly Once语义的基础上,生产和消费的跨分区会话,要么全部成功,要么全部失败。

  • Producer事务

mac使用brew安装kafka相关推荐

  1. mac使用brew安装mysql

    mac使用brew安装mysql 首先使用brew install mysql@5.7 配置环境变量 启动mysql 设置密码 首先使用brew install mysql@5.7 如果想查看版本,直 ...

  2. mac上brew安装swoole扩展,采坑之路

    1.官网下载最新扩展 wget http://pear.php.net/go-pear.phar 说明:如果提醒你需要更新,则更新命令 2.开始安装 sudo pecl install swoole ...

  3. Mac使用Homebrew安装Kafka

    1.使用brew install命令安装Kafka $ brew install kafka 安装过程将依赖安装 zookeeper 软件位置 /usr/local/Cellar/zookeeper ...

  4. Mac之brew安装

    brew 是MacOS上的包管理工具,可以简化 macOS 和 Linux 操作系统上软件的安装. 为什么有 brew?因为 mac 平台的 appstore 非常的不好用,审核也很严,因此有很多一些 ...

  5. mac使用brew安装旧版软件(php,mysql)等异常

    mac上使用brew安装php,提示has been disabled because it is deprecated upstream! 这是因为php7.2官方已经不再维护,所以Hombrew将 ...

  6. Mac使用brew安装Python3.*并设为默认版本

    mac系统虽然默认打开的是python2,但是也默认自带了python3,只要在终端用命令 python3 就会打开python3 下面讲的是用brew安装最新的python3 brew instal ...

  7. Mac 使用brew安装phpredis扩展

    1.cd 到 PHP 安装目录,例:/opt/homebrew/opt/php@7.4/bin cd /opt/homebrew/opt/php@7.4/bin 2.下载 phpredis git c ...

  8. mac使用brew安装nginx并使用

    文章目录 安装brew 使用brew安装nginx 查看nginx版本 启动nginx 查看nginx配置 修改nginx配置文件 方式1-命令行 方式2-找到并打开配置文件 配置nginx入口 .n ...

  9. brew 下载java8,mac使用brew安装Java8

    homebrew不多说,java8也不多说. brew安装不上java8的例子太多了. 最后的做法无非这么几个,安装openjdk版本,或者安装其他的版本,或者直接去官网装. 我今天就要硬装!就要用b ...

最新文章

  1. Git使用教程-命令总结大全
  2. linux环境下运行open error,错误:运行OpenCL代码时clGetPlatformIDs -1001(Linux)
  3. jzoj1082-合并果子【堆,贪心】
  4. 速度之王 — LZ4压缩算法与其他算法的比较
  5. Hystrix 线程池隔离与接口限流
  6. 统一的Ajax提交封装,一劳永逸好工具(带跨域处理)
  7. mysql配置所有ip连接_Mysql查看用户连接数配置及每个IP的请求情况
  8. python显示邮件发送成功失败_python stmp module 163邮箱发送邮件不成功
  9. 惊艳!Uber 的豪华开源深度学习“全家桶”
  10. reverse-nodes-in-k-group
  11. Deteming the User Intent of Web Search Engine
  12. ctab法提取dna流程图_每周实验新品:创新核酸提取技术、离心管自动打标设备...
  13. app录制回放 jmeter_Jmeter使用之脚本录制回放
  14. 京东员工p级别分几级_一文揭秘字节跳动、华为、京东的薪资职级
  15. 什么是bug ,bug的生命周期都有什么?你想知道的全在这里了。
  16. 软件工程阶段性总结(三)——软件设计和编码
  17. 包姓女孩清秀文雅的名字
  18. MX_Player_Pro_专业精简版AC3/DTS/EAC3 By.SOLDIER-就要应用网91apps.cn
  19. 复盘红米手机慢问题,针对小米手机 miui系统优化设置,实测红米note8,和k20 pro可行,流畅度起码提升了20%,能马上感觉到。
  20. 这种国家的外贸不做也罢

热门文章

  1. 在Windows上安装Gradle
  2. 阿里高层大变动:出轨风波20个月后,蒋凡不再负责核心淘宝业务......
  3. 旅游网站后台信息管理——简单版(增删改查)
  4. Java实现收银台系统
  5. 51单片机学习笔记6 单片机通过控制三极管来控制蜂鸣器和继电器
  6. 麦肯锡新报告《在元宇宙创造价值》, 2030 年元宇宙的价值可能会增长到 5 万亿美元
  7. 如何将AS/400英文界面改为中文界面?
  8. Excel 2010 安装日历控件 注册
  9. 通过股市悖论学会一种投资
  10. 计算机音乐盒子制作步骤,自制音乐盒方法|用筷子做浪漫的天使之心DIY音乐盒教程...