从零开始搭建Kafka+SpringBoot分布式消息系统
前言
由于kafka强依赖于zookeeper,所以需先搭建好zookeeper集群。由于zookeeper是由java编写的,需运行在jvm上,所以首先应具备java环境。
(ps:默认您的centos系统可联网,本教程就不教配置ip什么的了)
(ps2:没有wget的先装一下:yum install wget)
(ps3:人啊,就是要条理。东边放一点,西边放一点,过段时间就不知道自己装在哪里了。本教程所有下载均放在/usr/local目录下)
(ps4:kafka可能有内置zookeeper,感觉可以越过zookeeper教程,但是这里也配置出来了。我没试过)
一、配置jdk
因为oracle 公司不允许直接通过wget 下载官网上的jdk包。所以你直接wget以下地址下载下来的是一个只有5k的网页文件而已,并不是需要的jdk包。(垄断地位就是任性)。
(请通过java -version判断是否自带jdk,我的没带)
1、官网下载
下面是jdk8的官方下载地址:
https://www.oracle.com/technetwork/java/javase/downloads/java-archive-javase8u211-later-5573849.html
2、上传解压
这里通过xftp上传到服务器指定位置:/usr/local
对压缩文件进行解压:
tar -zxvf jdk-8u221-linux-x64.tar.gz
对解压后的文件夹进行改名:
mv jdk1.8.0_221 jdk1.8
3、配置环境变量
vim /etc/profile
#java environment
export JAVA_HOME=/usr/local/jdk1.8
export CLASSPATH=.:${JAVA_HOME}/jre/lib/rt.jar:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/lib/tools.jar
export PATH=$PATH:${JAVA_HOME}/bin
操作之后的界面如下:
运行命令使环境生效
source /etc/profile
二、搭建zookeeper集群
1、下载zookeeper
创建zookeeper目录,在该目录下进行下载:
mkdir /usr/local/zookeeper
这一步如果出现连接被拒绝时可多试几次,我就是第二次请求才成功的。
wget http://archive.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
等待下载完成之后解压:
tar -zxvf zookeeper-3.4.6.tar.gz
重命名为zookeeper1
mv zookeeper-3.4.6 zookeeper1
cp -r zookeeper1 zookeeper2
cp -r zookeeper1 zookeeper3
2、创建data、logs文件夹
在zookeeper1目录下创建
在data目录下新建myid文件。内容为1
3、修改zoo.cfg文件
cd /usr/local/zookeeper/zookeeper1/conf/
cp zoo_sample.cfg zoo.cfg
进行过上面两步之后,有zoo.cfg文件了,现在修改内容为:
dataDir=/usr/local/zookeeper/zookeeper1/data
dataLogDir=/usr/local/zookeeper/zookeeper1/logs
server.1=192.168.233.11:2888:3888
server.2=192.168.233.11:2889:3889
server.3=192.168.233.11:2890:3890
4、搭建zookeeper2
首先,复制改名。
cd /usr/local/zookeeper/
cp -r zookeeper1 zookeeper2
然后修改具体的某些配置:
vim zookeeper2/conf/zoo.cfg
将下图三个地方1改成2
vim zookeeper2/data/myid
同时将myid中的值改成2
5、搭建zookeeper3
同上,复制改名
cp -r zookeeper1 zookeeper3
vim zookeeper3/conf/zoo.cfg
修改为3
vim zookeeper3/data/myid
修改为3
6、测试zookeeper集群
cd /usr/local/zookeeper/zookeeper1/bin/
由于启动所需代码比较多,这里简单写了一个启动脚本:
vim start
start的内容如下
cd /usr/local/zookeeper/zookeeper1/bin/
./zkServer.sh start ../conf/zoo.cfg
cd /usr/local/zookeeper/zookeeper2/bin/
./zkServer.sh start ../conf/zoo.cfg
cd /usr/local/zookeeper/zookeeper3/bin/
./zkServer.sh start ../conf/zoo.cfg
下面是连接脚本:
vim login
login内容如下:
./zkCli.sh -server 192.168.233.11:2181,192.168.233.11:2182,192.168.233.11:2183
脚本编写完成,接下来启动:
sh start
sh login
启动集群成功,如下图:
这里zookeeper就告一段落了,由于zookeeper占用着输入窗口,这里可以在xshell右键标签,新建ssh渠道。然后就可以在新窗口继续操作kafka了!
三、搭建kafka集群
1、下载kafka
首先创建kafka目录:
mkdir /usr/local/kafka
然后在该目录下载
cd /usr/local/kafka/
wget https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz
下载成功之后解压:
tar -zxvf kafka_2.11-1.1.0.tgz
2、修改集群配置
首先进入conf目录下:
cd /usr/local/kafka/kafka_2.11-1.1.0/config
修改server.properties
修改内容:
broker.id=0
log.dirs=/tmp/kafka-logs
listeners=PLAINTEXT://192.168.233.11:9092
复制两份server.properties
cp server.properties server2.properties
cp server.properties server3.properties
修改server2.properties
vim server2.properties
修改主要内容为:
broker.id=1
log.dirs=/tmp/kafka-logs1
listeners=PLAINTEXT://192.168.233.11:9093
如上,修改server3.properties
修改内容为:
broker.id=2
log.dirs=/tmp/kafka-logs2
listeners=PLAINTEXT://192.168.233.11:9094
3、启动kafka
这里还是在bin目录编写一个脚本:
cd ../bin/
vim start
脚本内容为:
./kafka-server-start.sh ../config/server.properties &
./kafka-server-start.sh ../config/server2.properties &
./kafka-server-start.sh ../config/server3.properties &
通过jps命令可以查看到,共启动了3个kafka。
4、创建Topic
cd /usr/local/kafka/kafka_2.11-1.1.0
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
kafka打印了几条日志
在启动的zookeeper中可以通过命令查询到这条topic!
ls /brokers/topics
查看kafka状态
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
可以看到此时有三个节点 1 , 2 , 0
Leader 是1 ,
因为分区只有一个 所以在0上面,
Replicas:主从备份是 1,2,0,
ISR(in-sync):现在存活的信息也是 1,2,0
5、启动生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
由于不能按删除,不能按左右键去调整,所以语句有些乱啊。em…
6、启动消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
可以看出,启动消费者之后就会自动消费。
在生产者又造了一条。
消费者自动捕获成功!
四、集成springboot
先贴一张kafka兼容性目录:
不满足的话启动springboot的时候会抛异常的!!!ps:该走的岔路我都走了o(╥﹏╥)o
(我的kafka-clients是1.1.0,spring-kafka是2.2.2,中间那列暂时不用管)
回归正题,搞了两个小时,终于搞好了,想哭…
遇到的问题基本就是jar版本不匹配。
上面的步骤我也都会相应的去修改,争取大家按照本教程一遍过!!!
1、pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.1.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.gzky</groupId><artifactId>study</artifactId><version>0.0.1-SNAPSHOT</version><name>study</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-redis</artifactId><version>1.3.8.RELEASE</version></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId></dependency><!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.0.RELEASE</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
pom文件中,重点是下面这两个版本。
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.1.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent>
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.0.RELEASE</version>
</dependency>
2、application.yml
spring:redis:cluster:#设置key的生存时间,当key过期时,它会被自动删除;expire-seconds: 120#设置命令的执行时间,如果超过这个时间,则报错;command-timeout: 5000#设置redis集群的节点信息,其中namenode为域名解析,通过解析域名来获取相应的地址;nodes: 192.168.233.11:9001,192.168.233.11:9002,192.168.233.11:9003,192.168.233.11:9004,192.168.233.11:9005,192.168.233.11:9006kafka:# 指定kafka 代理地址,可以多个bootstrap-servers: 192.168.233.11:9092,192.168.233.11:9093,192.168.233.11:9094producer:retries: 0# 每次批量发送消息的数量batch-size: 16384buffer-memory: 33554432# 指定消息key和消息体的编解码方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:# 指定默认消费者group idgroup-id: test-groupauto-offset-reset: earliestenable-auto-commit: trueauto-commit-interval: 100# 指定消息key和消息体的编解码方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerserver:port: 8085servlet:#context-path: /rediscontext-path: /kafka
没有配置Redis的可以把Redis部分删掉,也就是下图:
3、生产者
package com.gzky.study.utils;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;/*** kafka生产者工具类** @author biws* @date 2019/12/17**/
@Component
public class KfkaProducer {private static Logger logger = LoggerFactory.getLogger(KfkaProducer.class);@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;/*** 生产数据* @param str 具体数据*/public void send(String str) {logger.info("生产数据:" + str);kafkaTemplate.send("testTopic", str);}
}
4、消费者
package com.gzky.study.utils;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** kafka消费者监听消息** @author biws* @date 2019/12/17**/
@Component
public class KafkaConsumerListener {private static Logger logger = LoggerFactory.getLogger(KafkaConsumerListener.class);@KafkaListener(topics = "testTopic")public void onMessage(String str){//insert(str);//这里为插入数据库代码logger.info("监听到:" + str);System.out.println("监听到:" + str);}}
5、对外接口
package com.gzky.study.controller;import com.gzky.study.utils.KfkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;/*** kafka对外接口** @author biws* @date 2019/12/17**/
@RestController
public class KafkaController {@AutowiredKfkaProducer kfkaProducer;/*** 生产消息* @param str* @return*/@RequestMapping(value = "/sendKafkaWithTestTopic",method = RequestMethod.GET)@ResponseBodypublic boolean sendTopic(@RequestParam String str){kfkaProducer.send(str);return true;}
}
6、postman测试
这里首先应该在服务器启动监听器(kafka根目录),下面命令必须是具体的服务器ip,不能是localhost,是我踩过的坑:
推荐此处重启一下集群
关闭kafka命令:
cd /usr/local/kafka/kafka_2.11-1.1.0/bin
./kafka-server-stop.sh ../config/server.properties &
./kafka-server-stop.sh ../config/server2.properties &
./kafka-server-stop.sh ../config/server3.properties &
此处应该jps看一下,等待所有的kafka都关闭(关不掉的kill掉),再重新启动kafka:
./kafka-server-start.sh ../config/server.properties &
./kafka-server-start.sh ../config/server2.properties &
./kafka-server-start.sh ../config/server3.properties &
等待kafka启动成功后,启动消费者监听端口:
cd /usr/local/kafka/kafka_2.11-1.1.0
bin/kafka-console-consumer.sh --bootstrap-server 192.168.233.11:9092 --from-beginning --topic testTopic
曾经我乱输的测试信息全部被监听过来了!
启动springboot服务
然后用postman生产消息:
然后享受成果,服务器端监听成功。
项目中也监听成功!
从零开始搭建Kafka+SpringBoot分布式消息系统相关推荐
- kafka 怎么样连接图形化界面_从零开始搭建Kafka+SpringBoot分布式消息系统
前言 由于kafka强依赖于zookeeper,所以需先搭建好zookeeper集群.由于zookeeper是由java编写的,需运行在jvm上,所以首先应具备java环境. (ps:默认您的cent ...
- java kafka分布式_Kafka分布式消息系统
1.简介 Kafka是一个分布式消息系统,使用Scala语言进行编写,具有高水平扩展以及高吞吐量特性. 目前流行的消息队列主要有三种:ActiveMQ.RabbitMQ.Kafka ActiveMQ. ...
- 分布式消息系统Kafka初步
http://my.oschina.net/ielts0909/blog/92972 终于可以写kafka的文章了,Mina的相关文章我已经做了索引,在我的博客中置顶了,大家可以方便的找到.从这一篇开 ...
- Apache Kafka:下一代分布式消息系统
简介 Apache Kafka是分布式发布-订阅消息系统.它最初由LinkedIn公司开发,之后成为Apache项目的一部分.Kafka是一种快速.可扩展的.设计内在就是分布式的,分区的和可复制的提交 ...
- 大数据开发hadoop核心的分布式消息系统:Apache Kafka 你知道吗
简介 Apache Kafka是分布式发布-订阅消息系统.它最初由LinkedIn公司开发,之后成为Apache项目的一部分.Kafka是一种快速.可扩展的.设计内在就是分布式的,分区的和可复制的提交 ...
- Kafka:用于日志处理的分布式消息系统
文章目录 摘要 常用术语 关键词 1. 简介 2. 相关工作 3. Kafka架构和设计原则 3.1 单分区的效率 3.1.1 简单的存储 3.1.2 高效的传输 3.1.3 无状态代理 3.2 分布 ...
- 分布式消息系统 Kafka 简介
分布式消息系统 Kafka 简介 阅读目录 5.1 吞吐量 5.2 负载均衡 5.3 拉取系统 5.4 可扩展性 5.5 消息删除策略 6.1 消息队列 6.2 行为跟踪 6.3 元信息监控 6.4 ...
- Centos_7.2 下构建 Kafka_2.13 分布式消息系统的单机版
如何在 Centos_7.2 下构建 Kafka_2.13 分布式消息系统的单例模式 一.前言 本文对于 kafka 分布式消息系统,是一个不错的入口点,首先从安装开始,这样就可以对 kafka 有一 ...
- 乐视秒杀架构解读:从零开始搭建百万每秒订单系统
http://dbaplus.cn/news-21-475-1.html 在各种秒杀活动大行其道的今天,订单系统的性能与稳定日益重要.乐视集团作为这一技术的佼佼者,在多次的电商狂欢节中都能抢占商机.拔 ...
最新文章
- 【云计算】云上建站快速入门:博客、论坛、CMS、电子商务网站统统
- 屏蔽微软的SignalR
- 第四节:python if语句用法
- Theano2.1.5-基础知识之打印出theano的图
- JS异步模式与Promise模式
- 2018高职计算机474分排名,2018年高职分类考试招生录取分数线出炉
- shapefile导入oracle,shp2sdo.exe用法:shpfile导入OracleSpatial
- Makefile和Cmake的联系与区别
- 3星|《财经》2017年第29期:未来,国有资本的收益和变现都是补贴社保的渠道...
- 构建直接路由模式(DR)的LVS
- 超简代码版设计模式系列六
- ubuntu 中文版 man
- 向身边优秀的人学习,让自己变得优秀
- 怎样开启成功的“数据分析师”职业生涯(R、Python、机器学习、通信和数据可视化、数据直觉)
- c语言字符三维数组定义时赋值,c语言中三维数组的赋值顺序?
- linux 中输入bash,Linux上Bash Shell编程
- 【19保研】保研预报名招生信息汇总!
- 图片使用内存法进行浮雕处理_无锡浮雕景观雕塑制作安装
- 【微信小程序】NodeJs调用云开发HTTP API错误代码47001 / data format error错误
- 大家的淘宝直播应该这么做!(一)
热门文章
- SpringBoot中MBG的使用
- mac 磁盘清理,一下多出好几十G
- 人生在世,不作为是不对的。!
- 华为首发鸿蒙手机,华为首发鸿蒙手机亮相,麒麟9000+55W快充+120Hz,依旧一机难求...
- TextRank论文阅读
- 用css+html完成学校官网
- scrcpy投屏工具的在harmonyOS开发上的使用
- SPOJ 5 The Next Palindrome
- 自动计算税金 CALCULATE_TAX_FROM_GROSSAMOUNT
- 英特尔又做了个“违背祖宗的决定”