一、什么是kafka?

是分布式(项目部署于多个服务器)的基于发布/订阅模式的消息队列,主要用于处理活跃的数据,如:登录、浏览、点击、分享等用户行为产生的数据,说白了就是一个消息系统(消息队列)。

进一步理解:

1.消息队列

消息(Message):网络中的两台计算机或者两个通讯设备之间传递的数据,如:文本、音频等。

队列(Queue):直接把它想象成羽毛球筒,羽毛球先进先出,是一种特殊的线性表,特殊之处在于,只能在头部删除元素,在尾部添加元素。

消息队列(MQ):保存消息的队列,相当于消息传输过程中的一个容器,主要有两个作用,一个是给外部提供存入消息的接口,另一个是提供取出消息的接口。

保存消息的队列,是消息在传输过程中的容器。主要提供生产和消费接口供外部调用,进行数据的存储和获取。

二、MQ的分类

1.主要有两大类:点对点(Peer-to-Peer)、发布/订阅(Publish/Subsribe)。

2.共同点:生产消息发送到队列中,消费者从队列中读取并消费消息。

3.不同点:

(1)点对点

组成:消息队列、发送者(Sender)、接受者(Receiver)。

注:一个生产者生产的消息只能有一个消费者消费,一旦被消费了,消息就不会存在于消息队列中。

(2)发布/订阅

组成:消息队列、发布者(Publisher)、订阅者(Subscriber)、主题(Topic)

注:每个消息可以有多个消费者,彼此互不影响,如:我在微信公共号发了一篇文章,关注我的人都能看到,即消息被多个人接受到(订阅者)。

三、常见的消息系统

ActiveMQ:实现了JMS(Java Message Service)规范,支持性较好,性能相对不高。

RabbitMQ:可靠性高、安全。

Kafka:分布式、高性能、跨语言。

RockeMQ:阿里开源的消息中间件,纯Java实现。

四、kafka特性

1.高吞吐量、低延迟:每秒可以处理几十万条消息,其延迟只有几毫秒,每个主题可以分多个分区,消费组对其分区进行消费。

2.可扩展:集群支持热扩展。

3.持久性、可靠性:可以持久化到本地磁盘,支持数据备份防止数据丢失。

4.容错性:允许节点中节点失败。

5.高并发:支持数千个客户端同时读写。

五、kafka的组成

1.Broder:kafka集群中包含多个kafka服务节点,每个kafka服务节点就是一个broker。

2.Topic:主题(相当于消息的类型),用来存储不同类别的消息(kafka消息数据村存放于硬盘)。

3.Partition:分区,每个Topic可以包含一个或多个分区,分区的数据量是在创建主题时决定的,

目的在于进行分布式存储。

4.Replication:副本,每个分区可以有多个副本,分布在不同的Broker上,会选出一个副本呢作为Leader,所有请求都会通过Leader完成,Follower只负责备份数据。

5.Message:消息,是通信的基本单位,每个消息都存于Partition。

6.Producer:消息的生产者,向topic发布消息。

7.Consumer:消息的消费之,订阅topic并读取其发布消息。

8.Consumer Group:每个Consumer属于一个特定的Consumer Group,多个Consumer可以属于同一个Consumer Group中。

9.Zookeeper:协调kafka正常运行,同时存储kafka元数据信息如,topic的数量等。注:发送给Topic本身的数据不是存在Zookeeper中,而是存在于磁盘文件中。

六、kafka的安装和配置

1.安装

1.解压kafka_2.12-2.3.0.tgzcd ~/software tar -zxf kafka_2.12-2.3.0.tgz2.配置# 创建存放数据的文件夹 cd kafka_2.12-2.3.0 mkdir data # 修改kafka配置文件 cd config vi server.properties #listeners=PLAINTEXT://:9092 # kafka默认监听的端口号为9092 log.dirs=../data # 指定数据的存放目录 zookeeper.connect=localhost:2181 # zookeeper的连接信息3.启动zookeeper使用kafka内置zookeepercd ~/software/kafka_2.12-2.3.0/bin/ ./zookeeper-server-start.sh ../config/zookeeper.properties4.使用外部zk(推荐)cd ~/software/zookeeper-3.4.13/bin/ ./zkServer.sh start5.启动kafka./kafka-server-start.sh ../config/server.properties & # &表示后台运行,也可使用-daemon选项 jps # 查看进程,jps是jdk提供的用来查看所有java进程的命令6.创建Topic(主题)./kafka-topics.sh \ --create \ --zookeeper localhost:2181 \ --replication-factor 1 \ --partitions 3 \ --topic hello# 查看Topic列表 ./kafka-topics.sh --list --zookeeper localhost:2181 # __consumer_offsets是kafka的内部Topic # 查看某一个具体的Topic ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic hello # 修改Topic:只能增加partition个数,不能减少,且不能修改replication-factor ./kafka-topics.sh --alter --zookeeper localhost:2181 --topic hello --partitions 5 # 删除Topic (需要启用topic删除功能) ./kafka-topics.sh --delete --zookeeper localhost:2181 --topic hello7.启动kafka的Producer(生产者 )./kafka-console-producer.sh --broker-list localhost:9092 --topic hello8.启动kafka的Consumer(消费者)./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello --from-beginning9.验证查看data数据存放目录:Topic的每个Partition对应一个目录,数据存储在目录下的00000000000000000000.log文件中查看zookeeper中的内容:get /brokers/topics/hello/partitions/0/state

2.配置

############################# Server Basics #############################
# broker的id,值为整数,且必须唯一,在一个集群中不能重复
broker.id=0############################# Socket Server Settings #############################
# kafka默认监听的端口为9092
#listeners=PLAINTEXT://:9092
# 处理网络请求的线程数量,默认为3个
num.network.threads=3
# 执行磁盘IO操作的线程数量,默认为8个
num.io.threads=8
# socket服务发送数据的缓冲区大小,默认100KB
socket.send.buffer.bytes=102400
# socket服务接受数据的缓冲区大小,默认100KB
socket.receive.buffer.bytes=102400
# socket服务所能接受的一个请求的最大大小,默认为100M
socket.request.max.bytes=104857600############################# Log Basics #############################
# kafka存储消息数据的目录
log.dirs=../data
# 每个topic默认的partition数量
num.partitions=1
# 在启动时恢复数据和关闭时刷新数据时每个数据目录的线程数量
num.recovery.threads.per.data.dir=1############################# Log Flush Policy #############################
# 消息刷新到磁盘中的消息条数阈值
#log.flush.interval.messages=10000
# 消息刷新到磁盘中的最大时间间隔
#log.flush.interval.ms=1000############################# Log Retention Policy #############################
# 日志保留小时数,超时会自动删除,默认为7天
log.retention.hours=168
# 日志保留大小,超出大小会自动删除,默认为1G
#log.retention.bytes=1073741824
# 日志分片策略,单个日志文件的大小最大为1G,超出后则创建一个新的日志文件
log.segment.bytes=1073741824
# 每隔多长时间检测数据是否达到删除条件
log.retention.check.interval.ms=300000############################# Zookeeper #############################
# Zookeeper连接信息,如果是zookeeper集群,则以逗号隔开
zookeeper.connect=localhost:2181
# 连接zookeeper的超时时间
zookeeper.connection.timeout.ms=6000# 是否可以删除topic,默认为false
delete.topic.enable=true

七、Kafka集群搭建

1.搭建Zookeeper集群

可以在不同的服务器上搭建kafka服务形成多个服务节点集群,

也可以在一台主机上启动多个zk服务,使用不同的端口就可以了。

拷贝多个zk目录
zookeeper1、zookeeper2、zookeeper3分别配置每个zkvi zookeeper1/conf/zoo.cfgclientPort=2181server.1=192.168.2.153:6661:7771server.2=192.168.2.153:6662:7772server.3=192.168.2.153:6663:7773echo 1 > zookeeper1/data/myid  vi zookeeper2/conf/zoo.cfgclientPort=2182server.1=192.168.2.153:6661:7771server.2=192.168.2.153:6662:7772server.3=192.168.2.153:6663:7773echo 2 > zookeeper2/data/myid    vi zookeeper3/conf/zoo.cfgclientPort=2183server.1=192.168.2.153:6661:7771server.2=192.168.2.153:6662:7772server.3=192.168.2.153:6663:7773echo 3 > zookeeper3/data/myid
启动zk集群

2.搭建Kafka集群

拷贝多个kafka目录
kafka1、kafka2、kafka3分别配置每个kafkavi kafka1/config/server.propertiesbroker.id=1listeners=PLAINTEXT://192.168.2.153:9091zookeeper.connect=192.168.2.153:2181,192.168.2.153:2182,192.168.2.153:2183vi kafka2/config/server.propertiesbroker.id=2listeners=PLAINTEXT://192.168.2.153:9092zookeeper.connect=192.168.2.153:2181,192.168.2.153:2182,192.168.2.153:2183vi kafka3/config/server.propertiesbroker.id=3listeners=PLAINTEXT://192.168.2.153:9093zookeeper.connect=192.168.2.153:2181,192.168.2.153:2182,192.168.2.153:2183
启动kafka集群
创建Topic./kafka-topics.sh \--create \--zookeeper 192.168.7.40:2181,192.168.7.40:2182,192.168.7.40:2183 \--replication-factor 3 \--partitions 5 \--topic aaa
生成数据/发布消息./kafka-console-producer.sh --broker-list 192.168.7.40:9091,192.168.7.40:9092,192.168.7.40:9093 --topic aaa
消费数据/订阅消息./kafka-console-consumer.sh --bootstrap-server 192.168.7.40:9091,192.168.7.40:9092,192.168.7.40:9093 --topic aaa --from-beginning

八、SpringBoot集成kafka

引入kafka的依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>配置kafka,编辑application.yml文件spring:kafka:# kafka服务器地址(可以多个)bootstrap-servers: 192.168.7.40:9091,192.168.7.40:9092,192.168.7.40:9093producer:# 每次批量发送消息的数量batch-size: 65536buffer-memory: 524288# key/value的序列化key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:# 指定一个默认的组名group-id: test# key/value的反序列化key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
创建生产者@RestControllerpublic class KafkaProducer {@Autowiredprivate KafkaTemplate template;/*** 发送消息到Kafka* @param topic   主题* @param message 消息*/@RequestMapping("/sendMsg")public String sendMsg(String topic, String message) {template.send(topic, message);return "success";}}
创建消费者@Componentpublic class KafkaConsumer {/*** 订阅指定主题的消息* @param record 消息记录*/@KafkaListener(topics = {"hello","world"})public void listen(ConsumerRecord record) {// System.out.println(record);System.out.println(record.topic()+","+record.value());}}
测试
访问http://localhost:8080/sendMsg?topic=hello&message=aaaa

You辉编程_kafka相关推荐

  1. You辉编程_JavaScript 编程

    一.js基础语法与表达式 初始js: 结构层:html 搭建结构.放置部件.描述语义 样式层:css  美化页面.实现布局 行为层:js   实现交互效果.数据收法.表单验证等 -补充: 1.js在声 ...

  2. You辉编程_CSS3知识

    1.CSS3简介 -css3增加了大量的样式.动画.3D特效和移动端特性 2.文本与字体属性 2.1常用文本样式属性 -color--十六进制表示法:#ff0000;color:rgba(225,0, ...

  3. You辉编程_MySql数据

    一.数据库的基本操作1.显示所有数据库 show databases;2.创建数据库 create database + 数据库名字;3.删除数据库 drop database + 数据库名字;4.使 ...

  4. You辉编程_MongoDB基础

    1.mongodb的使用 # 创建一个数据库(database) # 创建集合(collection) # 文档的增删改查 -show dbs:显示所有数据库 -user db_name:进入指定的数 ...

  5. You辉编程_WebApp组件化

    学前了解 1.Web开发:满足两个条件,一是html,css和js,代码在浏览器中运行. 2.移动web开发:html,css和js;代码在移动端(手机.平板等)浏览器中运行. 3.PC Web开发: ...

  6. You辉编程_JavaScript高级程序

    ES6 初始ES5 1. ES6简介 1.1 ES6是什么 基本数据类型:数字.字符串.布尔值.undefined.null 引用数据类型:对象.函数 ES6=ECMAScript这们语言的第6代标准 ...

  7. ea测试软件,在EA测试过程中,如何获得99.9%的数据质量,提高EA测试的准确性?...

    很多人都知道在进行EA测试的时候,通过MT4历史数据中心下载的数据质量只有90%,并且只有1分钟的高开低收4个报价,1分钟之内的tick数据时没有的,对EA的测试是不准确的.因此今天给大家介绍一下,如 ...

  8. CS计算机学习记录-从计算机二级开始

    1. 2022全国计算机二级考试公共基础:相当于计算机导论?[了解计算机的章节]✅ 2. 2022全国计算机二级Python ✅ 终于踏入了计算机的大门,从懂得了第一门编程语言之后,就开始理解这条路: ...

  9. c语言编程杨辉三角前八行思路,C语言----(杨辉三角)

    用C语言编程打印出杨辉三角的前10行.如下图所示: 1 1 1 1 2 1 1 3 3 1 1 4 6 4 1 1 5 10 10 5 1 1 6 15 20 15 6 1 ... ... 解法1: ...

最新文章

  1. runtime自动归档/解档
  2. 计算机制作印章,制作印章软件【处理办法】
  3. 为什么需要批判性思维 -- 读《学会提问》
  4. python基础小白题2
  5. RabbitMQ非root用户安装(Linux环境)
  6. linux 定时传送文件,Windows与Linux之间定时文件传输
  7. 剑指offer (04):二维数组中的查找 (C++ Python 实现)
  8. 从hive上下载文件
  9. 数控铣削图案及编程_数控铣削编程与操作设计有全套图纸.doc
  10. 利用Freessl部署免费SSL证书
  11. HotPower超级CRC计算器与第三方CRC计算器名词解释与对照及操作
  12. 网易互联网产品运营管培生面试经历--从群面到终面面试经验分享
  13. 【U3D小游戏】愤怒的小鸟(三)猪的相关
  14. mongodb 创建只读用户
  15. 大林算法计算机控制实验报告,实验二 大林算法实验报告
  16. STM32CubeMX学习笔记(48)——USB接口使用(MSC基于外部Flash模拟U盘)
  17. 不会搭建Android知识框架,活该你成不了技术大牛!
  18. Tita 推进企业绩效管理变革的最佳实践
  19. 怎么迁移电脑上的数据文件到另一台电脑?
  20. 单片机电子血压计原理与设计

热门文章

  1. bzoj1921 CTSC2010 jewelry
  2. 用VR手捏3D模型,PS直接与甲方对线,Adobe新技术确实给设计师炫到了
  3. 什么是firewalld,简介、策略及规则(Centos7防火墙)
  4. 干货 | 被抑郁情绪所困扰?来了解CBT吧!
  5. js File文件转换为二进制格式和base64转换为图片
  6. unity接入咪咕SDK遇到的坑
  7. python限制进程数_Python连载36-线程数量限制、Timer、可重入锁
  8. isNotBlank()方法和isNotEmpty()方法的区别
  9. 小红书内容营销的必读推荐 小红书运营策划方案上海氖天
  10. 2-37.1 EmpProject综合案例