You辉编程_kafka
一、什么是kafka?
是分布式(项目部署于多个服务器)的基于发布/订阅模式的消息队列,主要用于处理活跃的数据,如:登录、浏览、点击、分享等用户行为产生的数据,说白了就是一个消息系统(消息队列)。
进一步理解:
1.消息队列
消息(Message):网络中的两台计算机或者两个通讯设备之间传递的数据,如:文本、音频等。
队列(Queue):直接把它想象成羽毛球筒,羽毛球先进先出,是一种特殊的线性表,特殊之处在于,只能在头部删除元素,在尾部添加元素。
消息队列(MQ):保存消息的队列,相当于消息传输过程中的一个容器,主要有两个作用,一个是给外部提供存入消息的接口,另一个是提供取出消息的接口。
保存消息的队列,是消息在传输过程中的容器。主要提供生产和消费接口供外部调用,进行数据的存储和获取。
二、MQ的分类
1.主要有两大类:点对点(Peer-to-Peer)、发布/订阅(Publish/Subsribe)。
2.共同点:生产消息发送到队列中,消费者从队列中读取并消费消息。
3.不同点:
(1)点对点
组成:消息队列、发送者(Sender)、接受者(Receiver)。
注:一个生产者生产的消息只能有一个消费者消费,一旦被消费了,消息就不会存在于消息队列中。
(2)发布/订阅
组成:消息队列、发布者(Publisher)、订阅者(Subscriber)、主题(Topic)
注:每个消息可以有多个消费者,彼此互不影响,如:我在微信公共号发了一篇文章,关注我的人都能看到,即消息被多个人接受到(订阅者)。
三、常见的消息系统
ActiveMQ:实现了JMS(Java Message Service)规范,支持性较好,性能相对不高。
RabbitMQ:可靠性高、安全。
Kafka:分布式、高性能、跨语言。
RockeMQ:阿里开源的消息中间件,纯Java实现。
四、kafka特性
1.高吞吐量、低延迟:每秒可以处理几十万条消息,其延迟只有几毫秒,每个主题可以分多个分区,消费组对其分区进行消费。
2.可扩展:集群支持热扩展。
3.持久性、可靠性:可以持久化到本地磁盘,支持数据备份防止数据丢失。
4.容错性:允许节点中节点失败。
5.高并发:支持数千个客户端同时读写。
五、kafka的组成
1.Broder:kafka集群中包含多个kafka服务节点,每个kafka服务节点就是一个broker。
2.Topic:主题(相当于消息的类型),用来存储不同类别的消息(kafka消息数据村存放于硬盘)。
3.Partition:分区,每个Topic可以包含一个或多个分区,分区的数据量是在创建主题时决定的,
目的在于进行分布式存储。
4.Replication:副本,每个分区可以有多个副本,分布在不同的Broker上,会选出一个副本呢作为Leader,所有请求都会通过Leader完成,Follower只负责备份数据。
5.Message:消息,是通信的基本单位,每个消息都存于Partition。
6.Producer:消息的生产者,向topic发布消息。
7.Consumer:消息的消费之,订阅topic并读取其发布消息。
8.Consumer Group:每个Consumer属于一个特定的Consumer Group,多个Consumer可以属于同一个Consumer Group中。
9.Zookeeper:协调kafka正常运行,同时存储kafka元数据信息如,topic的数量等。注:发送给Topic本身的数据不是存在Zookeeper中,而是存在于磁盘文件中。
六、kafka的安装和配置
1.安装
1.解压kafka_2.12-2.3.0.tgzcd ~/software tar -zxf kafka_2.12-2.3.0.tgz2.配置# 创建存放数据的文件夹 cd kafka_2.12-2.3.0 mkdir data # 修改kafka配置文件 cd config vi server.properties #listeners=PLAINTEXT://:9092 # kafka默认监听的端口号为9092 log.dirs=../data # 指定数据的存放目录 zookeeper.connect=localhost:2181 # zookeeper的连接信息3.启动zookeeper使用kafka内置zookeepercd ~/software/kafka_2.12-2.3.0/bin/ ./zookeeper-server-start.sh ../config/zookeeper.properties4.使用外部zk(推荐)cd ~/software/zookeeper-3.4.13/bin/ ./zkServer.sh start5.启动kafka./kafka-server-start.sh ../config/server.properties & # &表示后台运行,也可使用-daemon选项 jps # 查看进程,jps是jdk提供的用来查看所有java进程的命令6.创建Topic(主题)./kafka-topics.sh \ --create \ --zookeeper localhost:2181 \ --replication-factor 1 \ --partitions 3 \ --topic hello# 查看Topic列表 ./kafka-topics.sh --list --zookeeper localhost:2181 # __consumer_offsets是kafka的内部Topic # 查看某一个具体的Topic ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic hello # 修改Topic:只能增加partition个数,不能减少,且不能修改replication-factor ./kafka-topics.sh --alter --zookeeper localhost:2181 --topic hello --partitions 5 # 删除Topic (需要启用topic删除功能) ./kafka-topics.sh --delete --zookeeper localhost:2181 --topic hello7.启动kafka的Producer(生产者 )./kafka-console-producer.sh --broker-list localhost:9092 --topic hello8.启动kafka的Consumer(消费者)./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello --from-beginning9.验证查看data数据存放目录:Topic的每个Partition对应一个目录,数据存储在目录下的00000000000000000000.log文件中查看zookeeper中的内容:get /brokers/topics/hello/partitions/0/state
2.配置
############################# Server Basics #############################
# broker的id,值为整数,且必须唯一,在一个集群中不能重复
broker.id=0############################# Socket Server Settings #############################
# kafka默认监听的端口为9092
#listeners=PLAINTEXT://:9092
# 处理网络请求的线程数量,默认为3个
num.network.threads=3
# 执行磁盘IO操作的线程数量,默认为8个
num.io.threads=8
# socket服务发送数据的缓冲区大小,默认100KB
socket.send.buffer.bytes=102400
# socket服务接受数据的缓冲区大小,默认100KB
socket.receive.buffer.bytes=102400
# socket服务所能接受的一个请求的最大大小,默认为100M
socket.request.max.bytes=104857600############################# Log Basics #############################
# kafka存储消息数据的目录
log.dirs=../data
# 每个topic默认的partition数量
num.partitions=1
# 在启动时恢复数据和关闭时刷新数据时每个数据目录的线程数量
num.recovery.threads.per.data.dir=1############################# Log Flush Policy #############################
# 消息刷新到磁盘中的消息条数阈值
#log.flush.interval.messages=10000
# 消息刷新到磁盘中的最大时间间隔
#log.flush.interval.ms=1000############################# Log Retention Policy #############################
# 日志保留小时数,超时会自动删除,默认为7天
log.retention.hours=168
# 日志保留大小,超出大小会自动删除,默认为1G
#log.retention.bytes=1073741824
# 日志分片策略,单个日志文件的大小最大为1G,超出后则创建一个新的日志文件
log.segment.bytes=1073741824
# 每隔多长时间检测数据是否达到删除条件
log.retention.check.interval.ms=300000############################# Zookeeper #############################
# Zookeeper连接信息,如果是zookeeper集群,则以逗号隔开
zookeeper.connect=localhost:2181
# 连接zookeeper的超时时间
zookeeper.connection.timeout.ms=6000# 是否可以删除topic,默认为false
delete.topic.enable=true
七、Kafka集群搭建
1.搭建Zookeeper集群
可以在不同的服务器上搭建kafka服务形成多个服务节点集群,
也可以在一台主机上启动多个zk服务,使用不同的端口就可以了。
拷贝多个zk目录
zookeeper1、zookeeper2、zookeeper3分别配置每个zkvi zookeeper1/conf/zoo.cfgclientPort=2181server.1=192.168.2.153:6661:7771server.2=192.168.2.153:6662:7772server.3=192.168.2.153:6663:7773echo 1 > zookeeper1/data/myid vi zookeeper2/conf/zoo.cfgclientPort=2182server.1=192.168.2.153:6661:7771server.2=192.168.2.153:6662:7772server.3=192.168.2.153:6663:7773echo 2 > zookeeper2/data/myid vi zookeeper3/conf/zoo.cfgclientPort=2183server.1=192.168.2.153:6661:7771server.2=192.168.2.153:6662:7772server.3=192.168.2.153:6663:7773echo 3 > zookeeper3/data/myid
启动zk集群
2.搭建Kafka集群
拷贝多个kafka目录
kafka1、kafka2、kafka3分别配置每个kafkavi kafka1/config/server.propertiesbroker.id=1listeners=PLAINTEXT://192.168.2.153:9091zookeeper.connect=192.168.2.153:2181,192.168.2.153:2182,192.168.2.153:2183vi kafka2/config/server.propertiesbroker.id=2listeners=PLAINTEXT://192.168.2.153:9092zookeeper.connect=192.168.2.153:2181,192.168.2.153:2182,192.168.2.153:2183vi kafka3/config/server.propertiesbroker.id=3listeners=PLAINTEXT://192.168.2.153:9093zookeeper.connect=192.168.2.153:2181,192.168.2.153:2182,192.168.2.153:2183
启动kafka集群
创建Topic./kafka-topics.sh \--create \--zookeeper 192.168.7.40:2181,192.168.7.40:2182,192.168.7.40:2183 \--replication-factor 3 \--partitions 5 \--topic aaa
生成数据/发布消息./kafka-console-producer.sh --broker-list 192.168.7.40:9091,192.168.7.40:9092,192.168.7.40:9093 --topic aaa
消费数据/订阅消息./kafka-console-consumer.sh --bootstrap-server 192.168.7.40:9091,192.168.7.40:9092,192.168.7.40:9093 --topic aaa --from-beginning
八、SpringBoot集成kafka
引入kafka的依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>配置kafka,编辑application.yml文件spring:kafka:# kafka服务器地址(可以多个)bootstrap-servers: 192.168.7.40:9091,192.168.7.40:9092,192.168.7.40:9093producer:# 每次批量发送消息的数量batch-size: 65536buffer-memory: 524288# key/value的序列化key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:# 指定一个默认的组名group-id: test# key/value的反序列化key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
创建生产者@RestControllerpublic class KafkaProducer {@Autowiredprivate KafkaTemplate template;/*** 发送消息到Kafka* @param topic 主题* @param message 消息*/@RequestMapping("/sendMsg")public String sendMsg(String topic, String message) {template.send(topic, message);return "success";}}
创建消费者@Componentpublic class KafkaConsumer {/*** 订阅指定主题的消息* @param record 消息记录*/@KafkaListener(topics = {"hello","world"})public void listen(ConsumerRecord record) {// System.out.println(record);System.out.println(record.topic()+","+record.value());}}
测试
访问http://localhost:8080/sendMsg?topic=hello&message=aaaa
You辉编程_kafka相关推荐
- You辉编程_JavaScript 编程
一.js基础语法与表达式 初始js: 结构层:html 搭建结构.放置部件.描述语义 样式层:css 美化页面.实现布局 行为层:js 实现交互效果.数据收法.表单验证等 -补充: 1.js在声 ...
- You辉编程_CSS3知识
1.CSS3简介 -css3增加了大量的样式.动画.3D特效和移动端特性 2.文本与字体属性 2.1常用文本样式属性 -color--十六进制表示法:#ff0000;color:rgba(225,0, ...
- You辉编程_MySql数据
一.数据库的基本操作1.显示所有数据库 show databases;2.创建数据库 create database + 数据库名字;3.删除数据库 drop database + 数据库名字;4.使 ...
- You辉编程_MongoDB基础
1.mongodb的使用 # 创建一个数据库(database) # 创建集合(collection) # 文档的增删改查 -show dbs:显示所有数据库 -user db_name:进入指定的数 ...
- You辉编程_WebApp组件化
学前了解 1.Web开发:满足两个条件,一是html,css和js,代码在浏览器中运行. 2.移动web开发:html,css和js;代码在移动端(手机.平板等)浏览器中运行. 3.PC Web开发: ...
- You辉编程_JavaScript高级程序
ES6 初始ES5 1. ES6简介 1.1 ES6是什么 基本数据类型:数字.字符串.布尔值.undefined.null 引用数据类型:对象.函数 ES6=ECMAScript这们语言的第6代标准 ...
- ea测试软件,在EA测试过程中,如何获得99.9%的数据质量,提高EA测试的准确性?...
很多人都知道在进行EA测试的时候,通过MT4历史数据中心下载的数据质量只有90%,并且只有1分钟的高开低收4个报价,1分钟之内的tick数据时没有的,对EA的测试是不准确的.因此今天给大家介绍一下,如 ...
- CS计算机学习记录-从计算机二级开始
1. 2022全国计算机二级考试公共基础:相当于计算机导论?[了解计算机的章节]✅ 2. 2022全国计算机二级Python ✅ 终于踏入了计算机的大门,从懂得了第一门编程语言之后,就开始理解这条路: ...
- c语言编程杨辉三角前八行思路,C语言----(杨辉三角)
用C语言编程打印出杨辉三角的前10行.如下图所示: 1 1 1 1 2 1 1 3 3 1 1 4 6 4 1 1 5 10 10 5 1 1 6 15 20 15 6 1 ... ... 解法1: ...
最新文章
- runtime自动归档/解档
- 计算机制作印章,制作印章软件【处理办法】
- 为什么需要批判性思维 -- 读《学会提问》
- python基础小白题2
- RabbitMQ非root用户安装(Linux环境)
- linux 定时传送文件,Windows与Linux之间定时文件传输
- 剑指offer (04):二维数组中的查找 (C++ Python 实现)
- 从hive上下载文件
- 数控铣削图案及编程_数控铣削编程与操作设计有全套图纸.doc
- 利用Freessl部署免费SSL证书
- HotPower超级CRC计算器与第三方CRC计算器名词解释与对照及操作
- 网易互联网产品运营管培生面试经历--从群面到终面面试经验分享
- 【U3D小游戏】愤怒的小鸟(三)猪的相关
- mongodb 创建只读用户
- 大林算法计算机控制实验报告,实验二 大林算法实验报告
- STM32CubeMX学习笔记(48)——USB接口使用(MSC基于外部Flash模拟U盘)
- 不会搭建Android知识框架,活该你成不了技术大牛!
- Tita 推进企业绩效管理变革的最佳实践
- 怎么迁移电脑上的数据文件到另一台电脑?
- 单片机电子血压计原理与设计
热门文章
- bzoj1921 CTSC2010 jewelry
- 用VR手捏3D模型,PS直接与甲方对线,Adobe新技术确实给设计师炫到了
- 什么是firewalld,简介、策略及规则(Centos7防火墙)
- 干货 | 被抑郁情绪所困扰?来了解CBT吧!
- js File文件转换为二进制格式和base64转换为图片
- unity接入咪咕SDK遇到的坑
- python限制进程数_Python连载36-线程数量限制、Timer、可重入锁
- isNotBlank()方法和isNotEmpty()方法的区别
- 小红书内容营销的必读推荐 小红书运营策划方案上海氖天
- 2-37.1 EmpProject综合案例