背景:Dubbo远程调用的性能问题

Dubbo调用普遍存在于我们的微服务项目中, 这些Dubbo调用全部是同步的操作, 这里的"同步"指:消费者A调用生产者B之后,A的线程会进入阻塞状态,等待生产者B运行结束返回之后,A才能运行之后的代码, Dubbo消费者发送调用后进入阻塞状态,这个状态表示该线程仍占用内存资源,但是什么动作都不做, 如果生产者运行耗时较久,消费者就一直等待,如果消费者利用这个时间,那么可以处理更多请求,业务整体效率会提升

实际情况下,Dubbo有些必要的返回值必须等待,但是不必要等待的服务返回值,我们可以不等待去做别的事情, 这种情况下我们就要使用消息队列

什么是消息队列

消息队列(Message Queue)简称MQ,也称:"消息中间件"消息队列是采用"异步"的方式来传递数据完成业务操作流程的业务处理方式(要求两个微服务项目并不需要同时完成请求)

消息队列的特征

  • 利用异步的特性, 提高服务器的运行效率, 减少因为远程调用出现的线程等待\阻塞时间

  • 削峰填谷:在并发峰值超过当前系统处理能力时,我们将没处理的信息保存在消息队列中,在后面出现的较闲的时间中去处理,直到所有数据依次处理完成,能够防止在并发峰值时短时间大量请求而导致的系统不稳定

  • 消息队列的延时:因为是异步执行,请求的发起者并不知道消息何时能处理完,如果业务不能接受这种延迟,就不要使用消息队列

常见消息队列软件

  • Kafka:性能好\功能弱:适合大数据量,高并发的情况,大数据领域使用较多

  • RabbitMQ:功能强\性能一般:适合发送业务需求复杂的消息队列,java业务中使用较多

  • RocketMQ:阿里的

  • ActiveMQ:前几年流行的,老项目可能用到

消息队列的事务处理

当接收消息队列中信息的模块运行发送异常时,怎么完成事务的回滚?

在处理消息队列异常时,经常会设置一个"死信队列",将无法处理的异常信息发送到这个队列中, 死信队列没有任何处理者,通常情况下会有专人周期性的处理死信队列的消息

什么是Kafka

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。Kafka最初是由LinkedIn开发,并随后于2011年初开源

Kafka Cluster(Kafka集群)

Producer:消息的发送方,也就是消息的来源,Kafka中的生产者

order就是消息的发送方,在Dubbo中order是消费者,这个身份变化了

Consumer:消息的接收方,也是消息的目标,Kafka中的消费者

stock就是消息的接收方,在Dubbo中stock是生产者,这个身份变化了

Topic:话题或主题的意思,消息的收发双方要依据同一个话题名称,才不会将信息错发给别人

Record:消息记录,就是生产者和消费者传递的信息内容,保存在指定的Topic中

Kafka的特征与优势

Kafka作为消息队列,它和其他同类产品相比,突出的特点就是性能强大

Kafka将消息队列中的信息保存在硬盘

Kafka对硬盘的读取规则进行优化后,效率能够接近内存

硬盘的优化规则主要依靠"顺序读写,零拷贝,日志压缩等技术"

Kafka处理队列中数据的默认设置:

  • Kafka队列信息能够一直向硬盘中保存(理论上没有大小限制)

  • Kafka默认队列中的信息保存7天,可以配置这个时间,缩短这个时间可以减少Kafka的磁盘消耗

Kafka的安装和配置

必须将我们kafka软件的解压位置设置在一个根目录,文件夹名称尽量短(例如:kafka), 然后路径不要有空格和中文, 我们要创建一个空目录用于保存Kafka运行过程中产生的数据,本次创建名称为data的空目录,下面进行Kafka启动前的配置,先到D:\kafka\config下配置有文件zookeeper.properties,找到dataDir属性修改如下

dataDir=D:/data

修改完毕之后要Ctrl+S进行保存,否则修改无效!!!!

注意D盘和data文件夹名称,匹配自己电脑的真实路径和文件夹名称

还要修改server.properties配置文件

log.dirs=D:/data

启动kafka

要想启动Kafka必须先启动Zookeeper

Zookeeper介绍

Linux服务器中安装的各种软件,很多都是有动物形象的

如果这些软件在Linux中需要修改配置信息的话,就需要进入这个软件,去修改配置,每个软件都需要单独修改配置的话,工作量很大

我们使用Zookeeper之后,可以创建一个新的管理各种软件配置的文件管理系统

Linux系统中各个软件的配置文件集中到Zookeeper中

实现在Zookeeper中,可以修改服务器系统中的各个软件配置信息

长此以往,很多软件就删除了自己写配置文件的功能,而直接从Zookeeper中获取

Kafka就是需要将配置编写在Zookeeper中的软件之一

所以要先启动zookeeper才能启动kafka

Zookeeper启动

进入路径D:\kafka\bin\windows

输入cmd进入dos命令行

D:\kafka\bin\windows>kafka-server-start.bat ..\..\config\server.properties

kafka启动

D:\kafka\bin\windows>kafka-server-start.bat ..\..\config\server.properties

Kafka使用演示

启动的zookeeper和kafka的窗口不要关闭, 我们在csmall项目中编写一个kafka使用的演示, csmall-cart-webapi模块, 添加依赖

<!-- Google JSON API -->
<!-- 它是java对象和json格式字符串相互转换的工具类 -->
<dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId>
</dependency>
<!-- Kafka API -->
<!--  Spring整合支持Kafka的依赖  -->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

修改yml文件进行配置

spring:kafka:# 定义kafka的位置bootstrap-servers: localhost:9092# consumer.group-id是一个必须配置的设置,不配置的话启动时会报错# 意思是"话题分组",这配置的目的是为了区分不同项目的话题名称# 本质上,这个分组名称会在消息发送时,自动前缀在话题名称前# 例如当前项目发送了一个话题名称为message的消息,实际传输的话题名称为csmall.messageconsumer:group-id: csmall

在SpringBoot启动类中添加启动Kafka的注解

@SpringBootApplication
@EnableDubbo
// 启动对kafka的支持
@EnableKafka
// 为了测试kafka收发消息的功能
// 我们利用SpringBoot自带的任务调用工具,周期性的向kafka发送消息
// 明确下面的注解和kafka没有必然的支持关系
@EnableScheduling
public class CsmallCartWebapiApplication {public static void main(String[] args) {SpringApplication.run(CsmallCartWebapiApplication.class, args);}}

下面我们就可以实现周期性的向kafka发送消息并接收的操作了

编写消息的发送

cart-webapi包下创建kafka包

包中创建Producer类来发送消息

 生产者

// 这个类中要编写代码进行周期运行,所以要交由Spring管理
@Component
public class Producer {// 直接从Spring容器中获取能够操作Kafka的对象// 这个对象是在添加好依赖和yml配合后,启动SpringBoot时自动添加到Spring容器的// KafkaTemplate<[话题名称的类型],[消息的类型]>@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;int i=1;// 实现每隔10秒钟(10000毫秒)运行一次的方法@Scheduled(fixedRate = 10000)public void sendMessage(){// 实例化Cart对象并赋值,尝试发送给KafkaCart cart=new Cart();cart.setId(i++);cart.setCommodityCode("PC100");cart.setUserId("UU100");cart.setPrice(RandomUtils.nextInt(90)+10);cart.setCount(RandomUtils.nextInt(10)+1);//  {"id":"1","userId":"UU100",.....}// 利用gson工具,将cart对象转换为json格式字符串方便发送Gson gson=new Gson();String json=gson.toJson(cart);System.out.println("要发送的消息为:"+json);// 执行发送kafkaTemplate.send("myCart",json);}}

kafka包中创建一个叫Consumer的类来接收消息, 接收消息的类可以是本模块的类,也可以是其它模块的类,编写的代码是完全一致

// 要接收kafka的消息需要讲接收消息的对象保存到Spring容器中
// 因为KafkaTemplate是spring在管理的
@Component
public class Consumer {// SpringKafka接收消息依靠了框架提供的"监听机制"// 框架中有一个线程,一直实时关注kafka的消息情况// 如果我们指定的话题名称(myCart)接收了消息,那么这条线程就会自动调用下面的方法@KafkaListener(topics = "myCart")// 下面定义的方法就是接收到消息后运行的方法// 这个方法有参数,参数类型是固定的,参数的值就是监听器接收到的消息内容public void received(ConsumerRecord<String,String> record){// 参数类型必须是ConsumerRecord// 泛型<[话题名称的类型],[消息的类型]>// 我们可以将record视为从kafka中接收到的消息对象String json=record.value();// json可能的值: {"id":2,"commodityCode":"PC100","price":74,"count":8,"userId":"UU100"}Gson gson=new Gson();// gson也可以将json字符串转换为java对象Cart cart=gson.fromJson(json,Cart.class);System.out.println("接收到了消息:"+cart);}}

消息队列之Kafka相关推荐

  1. 2021年大数据Kafka(一):❤️消息队列和Kafka的基本介绍❤️

    全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 消息队列和Kafka的基本介绍 一.什么是消息队列 二.消息队列的应用场景 ...

  2. Cris 玩转大数据系列之消息队列神器 Kafka

    Cris 玩转大数据系列之消息队列神器 Kafka Author:Cris 文章目录 Cris 玩转大数据系列之消息队列神器 Kafka Author:Cris 1. Kafka 概述 1.1 消息队 ...

  3. “简单”的消息队列与kafka

    小时候就特别喜欢龙系精灵,特别是乘龙,后来才知道只是冰水...尴尬. 在宠物小精灵中,乘龙一直是训练家的载人伙伴,和我们下面的MQ好像有几分相似呢~~ 前言 MQ,全称消息队列,现在市面上有很多种消息 ...

  4. python消息队列celery_消息队列(kafka/nsq等)与任务队列(celery/ytask等)到底有什么不同?...

    写这篇博文的起因是,我在论坛宣传我开源的新项目YTask(go语言异步任务队列)时,有小伙伴在下面回了一句"为什么不用nsq?".这使我想起,我在和同事介绍celery时同事说了一 ...

  5. 【消息队列】kafka是如何保证消息不被重复消费的

    一.kafka自带的消费机制 kafka有个offset的概念,当每个消息被写进去后,都有一个offset,代表他的序号,然后consumer消费该数据之后,隔一段时间,会把自己消费过的消息的offs ...

  6. 扫盲消息队列 | 消息中间件 | Kafka

    先吐槽 我真的写技术文章写到怀疑人生,我翻看历史发文记录,只要我一本正经的写的技术文章,都没人看,但是!一发闲扯淡的内容,阅读量肯定是技术文的好几倍(读者爸爸们别这么搞嘛) 这说明啥?说明学习还是太枯 ...

  7. 消息队列_消息队列:kafka

    概念 kafka是一个分布式的基于发布/订阅模式的消息队列,主要用于大数据实时处理领域. 要理解kafka首先要有分布式的概念,要有消息队列的概念.分布式系统最大的优势就是解耦和削峰,这种情况下,A系 ...

  8. 消息队列之kafka面试题

    1.什么是kafka Kafka是分布式发布-订阅消息系统,它最初是由LinkedIn公司开发的,之后成为Apache项目的一部分,Kafka是一个分布式,可划分的,冗余备份的持久性的日志服务,它主要 ...

  9. Java开发 - 消息队列之Kafka初体验

    目录 前言 Kafka 什么是Kafka Kafka软件结构 Kafka的特点 怎么启动Kafka 下载Kafka 配置Kafka Zookeeper 启动Kafka Kafka案例 添加依赖 添加配 ...

最新文章

  1. ajax校验用户名可用吗,基于jQuery实现Ajax验证用户名是否可用实例
  2. 区块链BaaS云服务(39)时戳信息Bystack“架构“
  3. sql serve 数据库游标的使用
  4. vuejs 和 element 搭建的一个后台管理界面
  5. 手机抓直播源工具app_东方卫视 手机在线直播 央视源
  6. Qt中文件读写进文本框出现乱码问题详解(gbk格式出现乱码为例)
  7. java开源商城--(8)商品管理之商品分类
  8. 计算机如何隐藏任务栏的程序,电脑系统教程_电脑如何隐藏任务栏图标
  9. 分布式任务调度组件 Uncode-Schedule
  10. Webpack打包警告: We noticed you're using the `useBuiltIns` option without declaring a core-js version.
  11. 国王游戏(贪心算法)
  12. 圆柱贴180度全景图片
  13. BI神器Power Query(11)-- PQ M函数快速查询
  14. Unity3D 片元NDC空间z值(ZBuffer)转View空间z值,公式推导
  15. i7 9750h和r7 5800h差距大不大
  16. sanity测试_使用Sanity CLI可以做的5件事
  17. java 根据开始日期 ,需要的工作日天数 ,计算工作截止日期,并返回截止日期
  18. 双11,客服系统让你告别客服节前emo
  19. 骁龙820 html5 dom,苹果iPhone 7 A10处理器探秘:这项性能 11倍于骁龙820!
  20. 【十问评估经营理念】

热门文章

  1. mysql的集群和分布式区别_集群和分布式的区别
  2. 会影响电线载流量的因素
  3. 可视化 | Python分析中秋月饼,这几种口味才是yyds
  4. 如何把握现货黄金今日行情
  5. 分析器错误信息: 未能加载类型“WebApplication1.Global”。类似问题总结。
  6. [前端项目学习笔记] 200行代码网站首页轮播实现(html,css,js)
  7. java 父类转换成子类的方法
  8. 运行torchAudio下的wav2vec2.0样例
  9. vue3 + antd + typeScript 封装一个高仿的ProTable
  10. ant design vue 1.7.8版本treeSelect组件坑