前言

由于kafka强依赖于zookeeper,所以需先搭建好zookeeper集群。由于zookeeper是由java编写的,需运行在jvm上,所以首先应具备java环境。
(ps:默认您的centos系统可联网,本教程就不教配置ip什么的了)
(ps2:没有wget的先装一下:yum install wget)
(ps3:人啊,就是要条理。东边放一点,西边放一点,过段时间就不知道自己装在哪里了。本教程所有下载均放在/usr/local目录下)
(ps4:kafka可能有内置zookeeper,感觉可以越过zookeeper教程,但是这里也配置出来了。我没试过)

一、配置jdk

因为oracle 公司不允许直接通过wget 下载官网上的jdk包。所以你直接wget以下地址下载下来的是一个只有5k的网页文件而已,并不是需要的jdk包。(垄断地位就是任性)。
(请通过java -version判断是否自带jdk,我的没带)

1、官网下载

下面是jdk8的官方下载地址:

https://www.oracle.com/technetwork/java/javase/downloads/java-archive-javase8u211-later-5573849.html

2、上传解压

这里通过xftp上传到服务器指定位置:/usr/local

对压缩文件进行解压:

tar -zxvf jdk-8u221-linux-x64.tar.gz

对解压后的文件夹进行改名:

mv jdk1.8.0_221 jdk1.8

3、配置环境变量

vim /etc/profile
#java environment
export JAVA_HOME=/usr/local/jdk1.8
export CLASSPATH=.:${JAVA_HOME}/jre/lib/rt.jar:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/lib/tools.jar
export PATH=$PATH:${JAVA_HOME}/bin

操作之后的界面如下:

运行命令使环境生效

source /etc/profile

二、搭建zookeeper集群

1、下载zookeeper

创建zookeeper目录,在该目录下进行下载:

mkdir /usr/local/zookeeper

这一步如果出现连接被拒绝时可多试几次,我就是第二次请求才成功的。

wget http://archive.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz

等待下载完成之后解压:

tar -zxvf zookeeper-3.4.6.tar.gz

重命名为zookeeper1

mv zookeeper-3.4.6 zookeeper1
cp -r zookeeper1 zookeeper2
cp -r zookeeper1 zookeeper3

2、创建data、logs文件夹

在zookeeper1目录下创建

在data目录下新建myid文件。内容为1

3、修改zoo.cfg文件

cd /usr/local/zookeeper/zookeeper1/conf/
cp zoo_sample.cfg zoo.cfg

进行过上面两步之后,有zoo.cfg文件了,现在修改内容为:

dataDir=/usr/local/zookeeper/zookeeper1/data
dataLogDir=/usr/local/zookeeper/zookeeper1/logs
server.1=192.168.233.11:2888:3888
server.2=192.168.233.11:2889:3889
server.3=192.168.233.11:2890:3890

4、搭建zookeeper2

首先,复制改名。

cd /usr/local/zookeeper/
cp -r zookeeper1 zookeeper2

然后修改具体的某些配置:

vim zookeeper2/conf/zoo.cfg

将下图三个地方1改成2

vim zookeeper2/data/myid

同时将myid中的值改成2

5、搭建zookeeper3

同上,复制改名

cp -r zookeeper1 zookeeper3

vim zookeeper3/conf/zoo.cfg

修改为3

vim zookeeper3/data/myid

修改为3

6、测试zookeeper集群

cd /usr/local/zookeeper/zookeeper1/bin/

由于启动所需代码比较多,这里简单写了一个启动脚本:

vim start

start的内容如下

cd /usr/local/zookeeper/zookeeper1/bin/
./zkServer.sh start ../conf/zoo.cfg
cd /usr/local/zookeeper/zookeeper2/bin/
./zkServer.sh start ../conf/zoo.cfg
cd /usr/local/zookeeper/zookeeper3/bin/
./zkServer.sh start ../conf/zoo.cfg

下面是连接脚本:

vim login

login内容如下:

./zkCli.sh -server 192.168.233.11:2181,192.168.233.11:2182,192.168.233.11:2183

脚本编写完成,接下来启动:

sh start
sh login

启动集群成功,如下图:

这里zookeeper就告一段落了,由于zookeeper占用着输入窗口,这里可以在xshell右键标签,新建ssh渠道。然后就可以在新窗口继续操作kafka了!

三、搭建kafka集群

1、下载kafka

首先创建kafka目录:

mkdir /usr/local/kafka

然后在该目录下载

cd /usr/local/kafka/
wget https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz

下载成功之后解压:

tar -zxvf kafka_2.11-1.1.0.tgz

2、修改集群配置

首先进入conf目录下:

cd /usr/local/kafka/kafka_2.11-1.1.0/config

修改server.properties
修改内容:

broker.id=0
log.dirs=/tmp/kafka-logs
listeners=PLAINTEXT://192.168.233.11:9092

复制两份server.properties

cp server.properties server2.properties
cp server.properties server3.properties

修改server2.properties

vim server2.properties

修改主要内容为:

broker.id=1
log.dirs=/tmp/kafka-logs1
listeners=PLAINTEXT://192.168.233.11:9093

如上,修改server3.properties
修改内容为:

broker.id=2
log.dirs=/tmp/kafka-logs2
listeners=PLAINTEXT://192.168.233.11:9094

3、启动kafka

这里还是在bin目录编写一个脚本:

cd ../bin/
vim start

脚本内容为:

./kafka-server-start.sh ../config/server.properties &
./kafka-server-start.sh ../config/server2.properties &
./kafka-server-start.sh ../config/server3.properties &

通过jps命令可以查看到,共启动了3个kafka。

4、创建Topic

cd /usr/local/kafka/kafka_2.11-1.1.0
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

kafka打印了几条日志

在启动的zookeeper中可以通过命令查询到这条topic!

ls /brokers/topics

查看kafka状态

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

可以看到此时有三个节点 1 , 2 , 0

Leader 是1 ,
因为分区只有一个 所以在0上面,
Replicas:主从备份是 1,2,0,
ISR(in-sync):现在存活的信息也是 1,2,0

5、启动生产者

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic

由于不能按删除,不能按左右键去调整,所以语句有些乱啊。em…

6、启动消费者

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

可以看出,启动消费者之后就会自动消费。

在生产者又造了一条。

消费者自动捕获成功!

四、集成springboot

先贴一张kafka兼容性目录:

不满足的话启动springboot的时候会抛异常的!!!ps:该走的岔路我都走了o(╥﹏╥)o
(我的kafka-clients是1.1.0,spring-kafka是2.2.2,中间那列暂时不用管)

回归正题,搞了两个小时,终于搞好了,想哭…
遇到的问题基本就是jar版本不匹配。
上面的步骤我也都会相应的去修改,争取大家按照本教程一遍过!!!

1、pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.1.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.gzky</groupId><artifactId>study</artifactId><version>0.0.1-SNAPSHOT</version><name>study</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-redis</artifactId><version>1.3.8.RELEASE</version></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId></dependency><!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.0.RELEASE</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

pom文件中,重点是下面这两个版本。

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.1.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent>
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.0.RELEASE</version>
</dependency>

2、application.yml

spring:redis:cluster:#设置key的生存时间,当key过期时,它会被自动删除;expire-seconds: 120#设置命令的执行时间,如果超过这个时间,则报错;command-timeout: 5000#设置redis集群的节点信息,其中namenode为域名解析,通过解析域名来获取相应的地址;nodes: 192.168.233.11:9001,192.168.233.11:9002,192.168.233.11:9003,192.168.233.11:9004,192.168.233.11:9005,192.168.233.11:9006kafka:# 指定kafka 代理地址,可以多个bootstrap-servers: 192.168.233.11:9092,192.168.233.11:9093,192.168.233.11:9094producer:retries: 0# 每次批量发送消息的数量batch-size: 16384buffer-memory: 33554432# 指定消息key和消息体的编解码方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:# 指定默认消费者group idgroup-id: test-groupauto-offset-reset: earliestenable-auto-commit: trueauto-commit-interval: 100# 指定消息key和消息体的编解码方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerserver:port: 8085servlet:#context-path: /rediscontext-path: /kafka

没有配置Redis的可以把Redis部分删掉,也就是下图:

3、生产者

package com.gzky.study.utils;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;/*** kafka生产者工具类** @author biws* @date 2019/12/17**/
@Component
public class KfkaProducer {private static Logger logger = LoggerFactory.getLogger(KfkaProducer.class);@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;/*** 生产数据* @param str 具体数据*/public void send(String str) {logger.info("生产数据:" + str);kafkaTemplate.send("testTopic", str);}
}

4、消费者

package com.gzky.study.utils;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** kafka消费者监听消息** @author biws* @date 2019/12/17**/
@Component
public class KafkaConsumerListener {private static Logger logger = LoggerFactory.getLogger(KafkaConsumerListener.class);@KafkaListener(topics = "testTopic")public void onMessage(String str){//insert(str);//这里为插入数据库代码logger.info("监听到:" + str);System.out.println("监听到:" + str);}}

5、对外接口

package com.gzky.study.controller;import com.gzky.study.utils.KfkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;/*** kafka对外接口** @author biws* @date 2019/12/17**/
@RestController
public class KafkaController {@AutowiredKfkaProducer kfkaProducer;/*** 生产消息* @param str* @return*/@RequestMapping(value = "/sendKafkaWithTestTopic",method = RequestMethod.GET)@ResponseBodypublic boolean sendTopic(@RequestParam String str){kfkaProducer.send(str);return true;}
}

6、postman测试

这里首先应该在服务器启动监听器(kafka根目录),下面命令必须是具体的服务器ip,不能是localhost,是我踩过的坑:

推荐此处重启一下集群
关闭kafka命令:

cd /usr/local/kafka/kafka_2.11-1.1.0/bin
./kafka-server-stop.sh ../config/server.properties &
./kafka-server-stop.sh ../config/server2.properties &
./kafka-server-stop.sh ../config/server3.properties &

此处应该jps看一下,等待所有的kafka都关闭(关不掉的kill掉),再重新启动kafka:

./kafka-server-start.sh ../config/server.properties &
./kafka-server-start.sh ../config/server2.properties &
./kafka-server-start.sh ../config/server3.properties &

等待kafka启动成功后,启动消费者监听端口:

cd /usr/local/kafka/kafka_2.11-1.1.0
bin/kafka-console-consumer.sh --bootstrap-server 192.168.233.11:9092 --from-beginning --topic testTopic

曾经我乱输的测试信息全部被监听过来了!

启动springboot服务

然后用postman生产消息:

然后享受成果,服务器端监听成功。

项目中也监听成功!

从零开始搭建Kafka+SpringBoot分布式消息系统相关推荐

  1. kafka 怎么样连接图形化界面_从零开始搭建Kafka+SpringBoot分布式消息系统

    前言 由于kafka强依赖于zookeeper,所以需先搭建好zookeeper集群.由于zookeeper是由java编写的,需运行在jvm上,所以首先应具备java环境. (ps:默认您的cent ...

  2. java kafka分布式_Kafka分布式消息系统

    1.简介 Kafka是一个分布式消息系统,使用Scala语言进行编写,具有高水平扩展以及高吞吐量特性. 目前流行的消息队列主要有三种:ActiveMQ.RabbitMQ.Kafka ActiveMQ. ...

  3. 分布式消息系统Kafka初步

    http://my.oschina.net/ielts0909/blog/92972 终于可以写kafka的文章了,Mina的相关文章我已经做了索引,在我的博客中置顶了,大家可以方便的找到.从这一篇开 ...

  4. Apache Kafka:下一代分布式消息系统

    简介 Apache Kafka是分布式发布-订阅消息系统.它最初由LinkedIn公司开发,之后成为Apache项目的一部分.Kafka是一种快速.可扩展的.设计内在就是分布式的,分区的和可复制的提交 ...

  5. 大数据开发hadoop核心的分布式消息系统:Apache Kafka 你知道吗

    简介 Apache Kafka是分布式发布-订阅消息系统.它最初由LinkedIn公司开发,之后成为Apache项目的一部分.Kafka是一种快速.可扩展的.设计内在就是分布式的,分区的和可复制的提交 ...

  6. Kafka:用于日志处理的分布式消息系统

    文章目录 摘要 常用术语 关键词 1. 简介 2. 相关工作 3. Kafka架构和设计原则 3.1 单分区的效率 3.1.1 简单的存储 3.1.2 高效的传输 3.1.3 无状态代理 3.2 分布 ...

  7. 分布式消息系统 Kafka 简介

    分布式消息系统 Kafka 简介 阅读目录 5.1 吞吐量 5.2 负载均衡 5.3 拉取系统 5.4 可扩展性 5.5 消息删除策略 6.1 消息队列 6.2 行为跟踪 6.3 元信息监控 6.4 ...

  8. Centos_7.2 下构建 Kafka_2.13 分布式消息系统的单机版

    如何在 Centos_7.2 下构建 Kafka_2.13 分布式消息系统的单例模式 一.前言 本文对于 kafka 分布式消息系统,是一个不错的入口点,首先从安装开始,这样就可以对 kafka 有一 ...

  9. 乐视秒杀架构解读:从零开始搭建百万每秒订单系统

    http://dbaplus.cn/news-21-475-1.html 在各种秒杀活动大行其道的今天,订单系统的性能与稳定日益重要.乐视集团作为这一技术的佼佼者,在多次的电商狂欢节中都能抢占商机.拔 ...

最新文章

  1. 【云计算】云上建站快速入门:博客、论坛、CMS、电子商务网站统统
  2. 屏蔽微软的SignalR
  3. 第四节:python if语句用法
  4. Theano2.1.5-基础知识之打印出theano的图
  5. JS异步模式与Promise模式
  6. 2018高职计算机474分排名,2018年高职分类考试招生录取分数线出炉
  7. shapefile导入oracle,shp2sdo.exe用法:shpfile导入OracleSpatial
  8. Makefile和Cmake的联系与区别
  9. 3星|《财经》2017年第29期:未来,国有资本的收益和变现都是补贴社保的渠道...
  10. 构建直接路由模式(DR)的LVS
  11. 超简代码版设计模式系列六
  12. ubuntu 中文版 man
  13. 向身边优秀的人学习,让自己变得优秀
  14. 怎样开启成功的“数据分析师”职业生涯(R、Python、机器学习、通信和数据可视化、数据直觉)
  15. c语言字符三维数组定义时赋值,c语言中三维数组的赋值顺序?
  16. linux 中输入bash,Linux上Bash Shell编程
  17. 【19保研】保研预报名招生信息汇总!
  18. 图片使用内存法进行浮雕处理_无锡浮雕景观雕塑制作安装
  19. 【微信小程序】NodeJs调用云开发HTTP API错误代码47001 / data format error错误
  20. 大家的淘宝直播应该这么做!(一)

热门文章

  1. SpringBoot中MBG的使用
  2. mac 磁盘清理,一下多出好几十G
  3. 人生在世,不作为是不对的。!
  4. 华为首发鸿蒙手机,华为首发鸿蒙手机亮相,麒麟9000+55W快充+120Hz,依旧一机难求...
  5. TextRank论文阅读
  6. 用css+html完成学校官网
  7. scrcpy投屏工具的在harmonyOS开发上的使用
  8. SPOJ 5 The Next Palindrome
  9. 自动计算税金 CALCULATE_TAX_FROM_GROSSAMOUNT
  10. 英特尔又做了个“违背祖宗的决定”