RabbitMQ项目实战——商户管理系统
项目介绍
商户管理系统:
用于管理与公司合作的商户信息,包括商户准入和审核的全流程。有很多下游业务系统需要用到商户信息,每一个系统都会在自己的数据库中存放商户的关键信息。比如提单系统提单需要商户的名称;放宽系统放款需要商户的账号名;风控系统也要关注商信息的变动。
这种同步数据的场景,之前一直使用定时同步,也就是依赖一个核心的数据库,我们只需要修改核心数据的商户信息,其他系统定时去核心系统中拉取数据。
但是定时同步有两个缺点:首先实时性不高,因为定时任务不可能每分每秒都在运行;其次就是一旦核心数据库出现问题,其他所有的系统都无法同步,耦合性太高。
所以这种情况就可以改用MQ同步,因为考虑到还有其他系统也要用到商户信息,所以我们直接采用广播的方式。
该项目无非就是一个生产者消费者模型 每次商户信息变化后就发出通知消费即可。
整体流程:
生产者:
- 注入Template发送消息。 在任何需要发送MQ消息的地方注入Template,或者在单元测试类中注入生产者,调用send()方法。
消费者:
- 创建配置类,定义队列、交换机、绑定;
- 创建消费者,监听队列;
生产者
工程搭建
配置
在实际开发过程中,我们应该将交换机名称及队列名称放在配置文件中统一管理。
com.directexchange=DIRECT_EXCHANGE
com.topicexchange=TOPIC_EXCHANGE
com.fanoutexchange=FANOUT_EXCHANGE
com.directroutingkey=best
com.topicroutingkey1=chen
com.topicroutingkey2=nanjin
properties文件
server.port=9071
#spring.rabbitmq.host=127.0.0.1
#spring.rabbitmq.port=5672
#spring.rabbitmq.virtual-host=/
#spring.rabbitmq.username=guest
#spring.rabbitmq.password=guest
spring.datasource.url=jdbc:mysql://localhost:3306/rabbitmq?useUnicode=true&characterEncoding=utf8
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
propertiesspring.thymeleaf.cache=false
mybatis.typeAliasesPackage=com.example.producer.entity
mybatis.mapperLocations=classpath:mybatis/mapper/*.xml
logging.level.com.example.producer.mapper=debug
业务代码
项目首先开发一个界面来模拟商户信息的增删改查,这边直接采用SSM框架配置LayUI实现一个简单的界面。
一旦商户信息发生变化,除了修改我们自己系统的数据之外,我们还需要广播出去,让所有的下游系统拿到信息,修改自己数据库的商户信息。
增删改查这块还是很简单的,不必多说。流程无非就是controller-》service-》mapper;文末会贴出项目地址。
项目中我们需要注意的就是MQ的消息什么时候发送,在那一层发送,是先更新数据库后发送MQ消息还是先发送MQ消息后在更新数据库。
消费发送
配置
进行rabbitmq进行消息通知的时候,我们首先需要进行相关的配置以及消息发送代码的编写。
@Configuration
public class RabbitConfig {/*** 所有的消息发送都会转换成JSON格式发到交换机** @param connectionFactory* @return*/@Beanpublic RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());return rabbitTemplate;}
}
带着上面的问题,我们可以看一下service的实现类。
以更新商户信息为例。我们先更新数据库,然后调用注入的template发送消息。
@Override
public int update(Merchant merchant) {int k = merchantMapper.update(merchant);// 发送消息失败了???JSONObject title = new JSONObject();String jsonBody = JSONObject.toJSONString(merchant);title.put("type", "update");title.put("desc", "更新商户信息");title.put("content", jsonBody);amqpTemplate.convertAndSend(topicExchange, topicRoutingKey, title.toJSONString());return k;
}
这里的顺序千万要注意!一定是先更新数据库后再发送消息!
如果先发送消息的话,消费者接收到了消息,认为上游数据更新成功了,他接着进行其他业务;但是这个时候上有数据库更新失败了,所以就会导致数据库回滚的话造成数据一致性的问题。
消息可靠性传递
但是,如果先更新数据库,然后发送消息的时候失败了?比如服务器没有成功接受或者路由出现了问题,这个时候应该怎么解决呢?
这个时候就需要用到rabbitmq中的消息可靠性机制,该部分会在另外一篇文章详解。
消费者
工程搭建
配置
将队列信息采用配置文件的形式
com.directexchange=DIRECT_EXCHANGE
com.topicexchange=TOPIC_EXCHANGE
com.fanoutexchange=FANOUT_EXCHANGEcom.firstqueue=FIRST_QUEUE
com.secondqueue=SECOND_QUEUE
com.thirdqueue=THIRD_QUEUE
com.fourthqueue=FOURTH_QUEUE
server.port=9072
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#spring.rabbitmq.cache.channel.size=
消息队列绑定
然后需要进行交换机、队列的定义及两者间的绑定。
@Configuration
@PropertySource("classpath:mq.properties")
public class RabbitConfig {@Value("${com.firstqueue}")private String firstQueue;@Value("${com.secondqueue}")private String secondQueue;@Value("${com.thirdqueue}")private String thirdQueue;@Value("${com.fourthqueue}")private String fourthQueue;@Value("${com.directexchange}")private String directExchange;@Value("${com.topicexchange}")private String topicExchange;@Value("${com.fanoutexchange}")private String fanoutExchange;// 创建四个队列@Bean("FirstQueue")public Queue getFirstQueue() {return new Queue(firstQueue);}@Bean("SecondQueue")public Queue getSecondQueue() {return new Queue(secondQueue);}@Bean("ThirdQueue")public Queue getThirdQueue() {return new Queue(thirdQueue);}@Bean("FourthQueue")public Queue getFourthQueue() {return new Queue(fourthQueue);}// 创建三个交换机@Bean("DirectExchange")public DirectExchange getDirectExchange() {return new DirectExchange(directExchange);}@Bean("TopicExchange")public TopicExchange getTopicExchange() {return new TopicExchange(topicExchange);}@Bean("FanoutExchange")public FanoutExchange getFanoutExchange() {return new FanoutExchange(fanoutExchange);}// 定义四个绑定关系@Beanpublic Binding bindFirst(@Qualifier("FirstQueue") Queue queue, @Qualifier("DirectExchange") DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("best");}@Beanpublic Binding bindSecond(@Qualifier("SecondQueue") Queue queue, @Qualifier("TopicExchange") TopicExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("#");}@Beanpublic Binding bindThird(@Qualifier("ThirdQueue") Queue queue, @Qualifier("FanoutExchange") FanoutExchange exchange) {return BindingBuilder.bind(queue).to(exchange);}@Beanpublic Binding bindFourth(@Qualifier("FourthQueue") Queue queue, @Qualifier("FanoutExchange") FanoutExchange exchange) {return BindingBuilder.bind(queue).to(exchange);}/*** 在消费端转换JSON消息* 监听类都要加上containerFactory属性** @param connectionFactory* @return*/@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);factory.setAutoStartup(true);return factory;}
}
消息监听
@Component
@PropertySource("classpath:mq.properties")
@RabbitListener(queues = "${com.secondqueue}", containerFactory = "rabbitListenerContainerFactory")
public class SecondConsumer {@RabbitHandlerpublic void process(String msgContent, Channel channel, Message message) throws IOException {System.out.println("Second Queue received msg : " + msgContent);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
}
测试
http://127.0.0.1:9071/merchantList
新增、删除、修改信息、修改状态会发送MQ消息到TOPIC_EXCHANGE, 路由到SECOND_QUEUE。
- 先启动消费者consumer
- 在界面上修改商户信息,或者调用生产者的单元测试类发送消息。
修改商户信息后,可以看到消费者工程接收到相关信息。
项目地址
rabbitmq-demo地址
RabbitMQ项目实战——商户管理系统相关推荐
- Java项目实战---歌曲管理系统
Java项目实战-歌曲管理系统 声明:本人仅整理了代码,方便大家进行调试优化,功能上还存在很多纰漏,欢迎大家评论区讨论 代码原地址放于文章末尾 一.实验目的: 掌握类的定义,JAVA应用程序的设计与实 ...
- Vue项目实战 —— 后台管理系统( pc端 ) 第三篇
前期回顾 Vue项目实战 -- 后台管理系统( pc端 ) 第二篇_0.活在风浪里的博客-CSDN博客前期回顾 Vue项目实战 -- 后台管理系统( pc端 ) 第一篇 _0.活在风浪里 ...
- Vue项目实战 —— 后台管理系统( pc端 ) 第一篇
前期回顾 我只写注释 -- 让Ai写代码_0.活在风浪里的博客-CSDN博客前期回顾 Vue项目实战 -- 哔哩哔哩移动端开发-- 第二篇_0.活在风浪里的博客-CSDN博客https://b ...
- Vue项目实战 —— 后台管理系统( pc端 )
前期回顾 我只写注释 -- 让Ai写代码_的博客-CSDN博客前期回顾 Vue项目实战 -- 哔哩哔哩移动端开发-- 第二篇_的博客-CSDN博客https://blog.csdn.net/m0_57 ...
- SSM项目实战:酒店管理系统
使用的技术栈:Spring+SpringMVC+mybatis+Mysql+layui+Maven Maven 项目结构.项目配置项为: 服务器:apache-tomcat-9.0.0.M26 (必须 ...
- [ABP项目实战]-后台管理系统-目录
学习ABP也有一段时间了,但是总是学习了后面的忘记了前面的,为了巩固所学到的知识以及记录所学到的东西,因此有了本系列的诞生. ABP ASP.NET Boilerplate Project(ABP.N ...
- c语言项目实战 —— 图书管理系统
图书管理系统 目的 总体功能分析 各模块功能简要分析 1,图书借出模块 2,图书归还模块 3,图书上架模块 4,图书下架模块 5,查找图书模块 6,退出系统模块 功能实现 1,建立相关类及属性 2,系 ...
- java项目实战-超市管理系统(三)页面设计源码
一.duan.page.MainPage package duan.page;import java.util.ArrayList;import duan.dao.GoodsDao; import d ...
- SpringBoot+MyBatis+Vue+ElementUI项目实战-人事管理系统(免费开源)
一.说明 1.该项目是写给有一定SpringBoot.MyBatis.Vue基础,同时想做做项目的同学看的,不适合一点基础也没有的小白,也不适合高手~~~ 源码下载地址:https://downloa ...
- javaweb项目实战--学生管理系统
准备工作 所使用到的技术: 前端–html css javascript 后端–tomcat servlet jsp el jdbc mybatis 项目结构: 实现原理: 1.登录页面的逻辑结构 2 ...
最新文章
- 【多标签文本分类】Large Scale Multi-label Text Classification with Semantic Word Vectors
- 基于@RabbitListener声明LazyQueue
- jquery获取select中的option的text值
- 中fuse_保险丝座中保险丝的材质,结构,接线方式以及区别的介绍
- 【Python】猜数小游戏
- 跟着小皮老师了解Go语言LiteIDE详细使用教程❤
- 公式、图片、表格等转latex
- 怎么恢复误删的重要文件,电脑误删重要文件怎么恢复
- Android 音视频深入 六 使用FFmpeg播放视频(附源码下载)
- 【EasyRL学习笔记】第八章 针对连续动作的深度Q网络
- 计算机网络——网络层路由协议、IP组播、移动 IP、路由器
- Vue控制表格列的显示隐藏
- 虚拟主机服务器能干嘛用,虚拟主机能拿来干什么
- 2.zookeeper客户端使用与集群特性
- 谷歌高质量外链,google英文外链怎么做效果好?
- 源享科技为什么关闭了_关闭这个阀门竟然需要转8000圈?
- 华为/H3C常用巡检命令
- 我在亚马逊商城卧底的日子
- 西门子S7-200PLC和丹佛斯变频器的通讯协议改造_过路老熊_新浪博客
- 6.11 my Batis
热门文章
- easyui 全部图标
- 【stm32开发日志】步进电机、直线模组、丝杆的接线、编程与使用
- python计算器外壳模板
- SAP PK Oracle
- 基于java的铁路售票系统(火车票预订)ssh框架
- 【Python】基于VB、Python、PythonGUI的BMI计算器小程序
- 国二c语言程序设计分值分布,计算机二级分值
- 教学设计的理念与方法【2】
- citrix服务器共享文件夹,如何使用citrix XenApp 发布远程共享文件夹,及其故障解决...
- sqlServer 如何查看数据库日志文件的大小