RabbitMQ防止重复消费
直接上代码
RabbitConfig
/*** @author fan* @date 2022年04月27日 11:17*/
@EnableRabbit
@Configuration
public class RabbitConfig {/**创建扇形交换机开始*///测试队列名称private String fanoutQueueName = "fanoutQueue";// 测试交换机名称private String fanoutExchange = "fanoutExchange";// RoutingKey 路由键无需配置,配置也不起作用//private String fanoutRoutingKey = "fanoutRoutingKey";/** 创建队列 */@Beanpublic Queue fanoutQueue() {return new Queue(fanoutQueueName);}/** 创建交换机 */@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange(fanoutExchange);}/** 通过routingKey把队列与交换机绑定起来 */@Beanpublic Binding fanoutExchangeBinding() {return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());}/**结束创建扇形交换机(Fanout exchange)*直连交换机(Direct exchange): 具有路由功能的交换机,绑定到此交换机的时候需要指定一个 routing_key,交换机发送消息的时候需要routing_key,会将消息发送道对应的队列
*扇形交换机(Fanout exchange): 广播消息到所有队列,没有任何处理,速度最快
*主题交换机(Topic exchange): 在直连交换机基础上增加模式匹配,也就是对routing_key进行模式匹配,`*`代表一个单词,`#`代表多个单词
*首部交换机(Headers exchange): 忽略routing_key,使用Headers信息(一个Hash的数据结构)进行匹配,优势在于可以有更多更灵活的匹配规则*/
}
producer
/*** @author fan* @date 2022年04月27日 15:01*/
@Component
public class MyProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;/** * 发送JSON类型* @author fan * @date 2022/4/27 15:39* @param queueName 队列名称* @return java.lang.String */public String sendjsonObject(String queueName) {//构造前端回传参数JSONObject jsonObject = new JSONObject();jsonObject.put("email", "Icomxchishi@qq.com");jsonObject.put("phone", "03803880388");jsonObject.put("age", 15);jsonObject.put("sex","女");jsonObject.put("name", "messageData");jsonObject.put("data", Arrays.asList("张三","12345678",12,"男"));jsonObject.put("timestamp", System.currentTimeMillis());String jsonString = jsonObject.toJSONString();String messageId = String.valueOf(UUID.randomUUID());Message message = MessageBuilder.withBody(jsonString.getBytes()).setDeliveryMode(MessageDeliveryMode.PERSISTENT).setContentType(MessageProperties.CONTENT_TYPE_JSON).setDeliveryTag(System.currentTimeMillis()).setContentEncoding("utf-8").setMessageId(messageId).build();rabbitTemplate.convertAndSend(queueName, message);return "sendjsonObject";}/** * 发送map类型数据* @author fan * @date 2022/4/27 18:33* @param queueName 队列名称* @return java.lang.String *//*public String sendMessageMap(String queueName) {String messageId = String.valueOf(UUID.randomUUID());String messageData = "my message!";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put("messageId", messageId);map.put("messageData", messageData);map.put("createTime", createTime);map.put("msg","随便");map.put("data", Arrays.asList("张三","12345678",12,"男"));rabbitTemplate.convertAndSend(queueName, map);return "sendMapMessage";}*/}
/**lua配置类主要是去加载lua文件的内容,到时内存中。方便redis去读取和控制* @author fan* @date 2022年05月07日 1:01*/
@Configuration
public class MyLuaConfiguration {@Bean(name = "unLock")public DefaultRedisScript<Long> unLockLuaScript() {DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript<Long>();defaultRedisScript.setResultType(Long.class);defaultRedisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("unLock.lua")));return defaultRedisScript;}}
lua脚本原子性,判断是否是自己加的锁
-- 判断锁的值是否相等。 KEYS[1], ARGV[1],是指传入的参数,以上面为例,KEYS[1] 指的是 lock_order,ARGV[1] 指的是 123uD,
if redis.call("get",KEYS[1]) == ARGV[1]
thenreturn redis.call("del",KEYS[1]) -- 删除这个 key,返回删除 key 的个数
elsereturn 0 -- 锁值不相等返回 0
end
consumer,以sendjsonObject();为例
@Slf4j
@Component
public class MyConsumer {@Autowiredprivate RedisTemplate redisTemplate;@Autowiredprivate RedisClient redisClient;//本人封装的,可以使用redisTemplate@Autowiredprivate UserService userService;@Autowiredprivate LoggerService loggerService;/** 消费者消费成功后,再给MQ确认的时候出现了网络波动* MQ没有接收到确认,为了保证消息被消费,RabbitMQ就会继续给消费者重新投递之前的消息* 消费者就接收到了两条一样的消息* 重复消息可能是网络波动原因造成的或其它原因,导致不可避免的重复消息* 如何保证消息幂等性??* 解决:消费者先获取到消息后在根据messageId去查询redis是否存在该消息* 若不存在,则正常消费,消费完毕后写入redis* 若存在,则消息已经被消费过,就直接丢弃* @author fan* @date 2022/4/27 16:28* @param message*/@RabbitListener(queues = "fanoutQueue")public void sendjsonObject(Message message, Channel channel) throws Exception{//消费者消息重复log.info("MyConsumer 消费者收到消息:{}" , JSONObject.toJSONString(message));String messageId = message.getMessageProperties().getMessageId();String msg = new String(message.getBody(), "UTF-8");String messageIdRedis = //(String) redisTemplate.opsForValue().get("messageId");redisClient.getString("messageId");if (messageId != null) {if (messageId.equals(messageIdRedis)) {return;}}//redisTemplate.opsForValue().set(messageId, messageId, 3_000L, TimeUnit.SECONDS);//时间具体根据业务定redisClient.setKey(messageId,messageId,3_000L);// 写入缓存System.out.println("number==" + channel.getChannelNumber());JSONObject jsonObject = JSONObject.parseObject(msg);long deliverTag = message.getMessageProperties().getDeliveryTag();try {//具体业务,如:插入或修改user,这里用数据库可以不用redis做判断Integer age = (Integer) jsonObject.get("age");String name = (String) jsonObject.get("name");String sex = (String) jsonObject.get("sex");LocalDateTime localDateTime = LocalDateTime.now();Date dataTime = Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant());User user = new User();user.setSex(sex);user.setName(name);user.setDataTime(dataTime);user.setAge(age);int i = 0;if (user != null){i = userService.savetUser(user);//新增或修改redisClient.setKey(user.getId() + "", user, 5_000L);// 写入缓存,便于操作延时业务//redisTemplate.opsForList().rightPushAll(user.getId(), Arrays.asList(user));//loggerService.saveLog(jsonObject);//日志记录}log.info("收到消息obj:" + user + ",id=" + user.getId());log.info("保存了" + i + "记录");channel.basicAck(deliverTag,true);//手动设置ack,消费成功,确认消息}catch (Exception e){try {//异常返回false,就重新回到队列channel.basicNack(deliverTag, false, true);} catch (Exception ioException) {log.error("重新放入队列失败,失败原因:{}",e.getMessage(),e);}log.error("TopicConsumer消费者出错,mq参数:{},错误信息:{}",message,e.getMessage(),e);//channel.basicReject(deliverTag, false);}log.info("消费消息jsonObject:" + jsonObject + ",messageId:" + messageId);}/**@RabbitListener(queues = "fanoutQueue")public void sendMessageMap(Map map){log.info("收到消息map:" + map.toString());}*/}
ProducerController
@RestController
@RequestMapping(value = "producer")
public class ProducerController {@Autowiredprivate MyProducer myProducer;@RequestMapping("/sendjsonObject")public String sendjsonObject() {//前端传参数回来在producer那里模拟了,这里就略return myProducer.sendjsonObject("fanoutQueue");}/**@GetMapping("/sendMessageMap")public String sendMessageMap() {return myProducer.sendMessageMap("fanoutQueue");}*/}
application.yml
server:port: 8081spring:datasource:url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=falsedriver-class-name: com.mysql.jdbc.Driverusername: rootpassword: dbcp2: initial-size: 5 max-total: 5 max-wait-millis: 200 min-idle: 5 redis:database: 0host: 127.0.0.1port: 6379password:rabbitmq:host: localhostport: 5672username: guestpassword: guestpublisher-returns: truepublisher-confirms: trueconnection-timeout: 5000#另外一种打印语句的方式log-impl: org.apache.ibatis.logging.stdout.StdOutImpllogging:level:com:acong:dao: debugfile: d:/logs/redis.log
效果图:
RabbitMQ防止重复消费相关推荐
- RabbitMQ消息重复消费问题
业务背景 消息队列在数据传输的过程中,为了保证消息传递的可靠性,一般会对消息采用ack确认机制,如果消息传递失败,消息队列会进行重试,此时便可能存在消息重复消费的问题. 比如,用户到银行取钱后会收到扣 ...
- 解决RabbitMQ消息丢失与重复消费问题
解决RabbitMQ消息丢失与重复消费问题 参考文章: (1)解决RabbitMQ消息丢失与重复消费问题 (2)https://www.cnblogs.com/sessionbest/articles ...
- RabbitMQ—重复消费、数据丢失和消息顺序性
原文作者:weixin_49367803 原文地址:https://blog.csdn.net/weixin_49367803/article/details/108480256 一.如何保证消息不被 ...
- Rabbitmq消息可靠投递和重复消费等问题解决方案
消息的可靠性投递 在一些对数据一致性要求较高的业务场景里面,如果消息在发布和消费过程中出现了问题(消息丢失,消息重复消费),就会导致数据不一致,要做到消息的可靠性投递. 在RabbitMq里面提供了很 ...
- 【重难点】【RabbitMQ 02】如何避免消息重复投递和消息重复消费、如何防止消息丢失、如何保证消息的顺序性、如何保证消息队列的可用性
[重难点][RabbitMQ 02]如何避免消息重复投递和消息重复消费.如何防止消息丢失.如何保证消息的顺序性.如何保证消息队列的可用性 文章目录 [重难点][RabbitMQ 02]如何避免消息重复 ...
- RabbitMQ如何解决被重复消费和数据丢失的问题?
想想为什么要使用MQ? 1.解耦,系统A在代码中直接调用系统B和系统C的代码,如果将来D系统接入,系统A还需要修改代码,过于麻烦! 2.异步,将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快 ...
- RabbitMQ 可靠性、重复消费、顺序性、消息积压解决方案
前言 为什么引入消息队列?引入 MQ 给我们解决了一些问题,但同时又引入了一些复杂的问题,这些问题是大型项目中必须解决的重点,更重要的是,面试也经常问.实际上消息队列可以说是没法百分之百保证可靠性的! ...
- RabbitMQ如何防止消息丢失及重复消费
RabbitMQ目录 文章目录 RabbitMQ如何防止消息丢失及重复消费 一.消息丢失 1.1.生产者没有成功把消息发送到MQ 1.1.1.confirm(发布确认)机制 1.1.2.事务机制 1. ...
- RabbitMQ常见问题解决方案——消息丢失、重复消费、消费乱序、消息积压
文章目录 背景 RabbitMQ常见问题解决方案 1. RabbitMQ的可靠性(消息丢失问题) 1.1 生产者丢失消息 1.2 RabbitMQ弄丢消息 1.2.1 交换机持久化 1.2.2 队列持 ...
最新文章
- Facebook称其新的AI技术可以更快发现仇恨言论
- Nginx负载均衡+tomcat+session共享
- Spring Junit 读取WEB-INF下的配置文件
- 关于网管软件中的预警功能的发展
- odu oracle 价格_Oracle数据库ODU的几种恢复场景
- Activiti6--入门学习--监听器
- 调用python-nmap实现扫描局域网存活主机
- JZOJ 4809. 挖金矿
- 企业微信逆向分析之——自己二维码——静态分析
- spanning-tree portfast default
- 细说华为和荣耀的关系:潮流的荣耀和稳重的华为
- 机器人学导论(一)——空间描述和变换
- 2.6java基础 数组
- 使用Webcam实现拍照功能
- 联想lenovo sl700 240G sata ps3111主控+未知颗粒 掉盘,ps3111写保护开卡量产修复过程
- offline强化学习之Revive SDK的使用
- cent os 7 与cent os 6区别
- SwiftUI AVKit 之合并和叠加音频mp3 wav 并输出(教程含源码)
- 立创eda学习笔记十:如何添加好友或组建团队
- c语言本地函数声明非法,C++本地函数定义是非法的