直接上代码

RabbitConfig

/*** @author fan* @date 2022年04月27日 11:17*/
@EnableRabbit
@Configuration
public class RabbitConfig {/**创建扇形交换机开始*///测试队列名称private String fanoutQueueName = "fanoutQueue";// 测试交换机名称private String fanoutExchange = "fanoutExchange";// RoutingKey 路由键无需配置,配置也不起作用//private String fanoutRoutingKey = "fanoutRoutingKey";/** 创建队列 */@Beanpublic Queue fanoutQueue() {return new Queue(fanoutQueueName);}/** 创建交换机 */@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange(fanoutExchange);}/** 通过routingKey把队列与交换机绑定起来 */@Beanpublic Binding fanoutExchangeBinding() {return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());}/**结束创建扇形交换机(Fanout exchange)*直连交换机(Direct exchange): 具有路由功能的交换机,绑定到此交换机的时候需要指定一个    routing_key,交换机发送消息的时候需要routing_key,会将消息发送道对应的队列
*扇形交换机(Fanout exchange): 广播消息到所有队列,没有任何处理,速度最快
*主题交换机(Topic exchange): 在直连交换机基础上增加模式匹配,也就是对routing_key进行模式匹配,`*`代表一个单词,`#`代表多个单词
*首部交换机(Headers exchange): 忽略routing_key,使用Headers信息(一个Hash的数据结构)进行匹配,优势在于可以有更多更灵活的匹配规则*/
}

producer

/*** @author fan* @date 2022年04月27日 15:01*/
@Component
public class MyProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;/** * 发送JSON类型* @author fan * @date 2022/4/27 15:39* @param queueName 队列名称* @return java.lang.String */public String sendjsonObject(String queueName) {//构造前端回传参数JSONObject jsonObject = new JSONObject();jsonObject.put("email", "Icomxchishi@qq.com");jsonObject.put("phone", "03803880388");jsonObject.put("age", 15);jsonObject.put("sex","女");jsonObject.put("name", "messageData");jsonObject.put("data", Arrays.asList("张三","12345678",12,"男"));jsonObject.put("timestamp", System.currentTimeMillis());String jsonString = jsonObject.toJSONString();String messageId = String.valueOf(UUID.randomUUID());Message message = MessageBuilder.withBody(jsonString.getBytes()).setDeliveryMode(MessageDeliveryMode.PERSISTENT).setContentType(MessageProperties.CONTENT_TYPE_JSON).setDeliveryTag(System.currentTimeMillis()).setContentEncoding("utf-8").setMessageId(messageId).build();rabbitTemplate.convertAndSend(queueName, message);return "sendjsonObject";}/** * 发送map类型数据* @author fan * @date 2022/4/27 18:33* @param queueName 队列名称* @return java.lang.String *//*public String sendMessageMap(String queueName) {String messageId = String.valueOf(UUID.randomUUID());String messageData = "my message!";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put("messageId", messageId);map.put("messageData", messageData);map.put("createTime", createTime);map.put("msg","随便");map.put("data", Arrays.asList("张三","12345678",12,"男"));rabbitTemplate.convertAndSend(queueName, map);return "sendMapMessage";}*/}
/**lua配置类主要是去加载lua文件的内容,到时内存中。方便redis去读取和控制* @author fan* @date 2022年05月07日 1:01*/
@Configuration
public class MyLuaConfiguration {@Bean(name = "unLock")public DefaultRedisScript<Long> unLockLuaScript() {DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript<Long>();defaultRedisScript.setResultType(Long.class);defaultRedisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("unLock.lua")));return defaultRedisScript;}}

lua脚本原子性,判断是否是自己加的锁

-- 判断锁的值是否相等。 KEYS[1], ARGV[1],是指传入的参数,以上面为例,KEYS[1] 指的是 lock_order,ARGV[1] 指的是 123uD,
if redis.call("get",KEYS[1]) == ARGV[1]
thenreturn redis.call("del",KEYS[1])    -- 删除这个 key,返回删除 key 的个数
elsereturn 0                            -- 锁值不相等返回 0
end

consumer,以sendjsonObject();为例

@Slf4j
@Component
public class MyConsumer {@Autowiredprivate RedisTemplate redisTemplate;@Autowiredprivate RedisClient redisClient;//本人封装的,可以使用redisTemplate@Autowiredprivate UserService userService;@Autowiredprivate LoggerService loggerService;/** 消费者消费成功后,再给MQ确认的时候出现了网络波动* MQ没有接收到确认,为了保证消息被消费,RabbitMQ就会继续给消费者重新投递之前的消息* 消费者就接收到了两条一样的消息* 重复消息可能是网络波动原因造成的或其它原因,导致不可避免的重复消息* 如何保证消息幂等性??* 解决:消费者先获取到消息后在根据messageId去查询redis是否存在该消息* 若不存在,则正常消费,消费完毕后写入redis* 若存在,则消息已经被消费过,就直接丢弃* @author fan* @date 2022/4/27 16:28* @param message*/@RabbitListener(queues = "fanoutQueue")public void sendjsonObject(Message message, Channel channel) throws Exception{//消费者消息重复log.info("MyConsumer  消费者收到消息:{}" , JSONObject.toJSONString(message));String messageId = message.getMessageProperties().getMessageId();String msg = new String(message.getBody(), "UTF-8");String messageIdRedis = //(String) redisTemplate.opsForValue().get("messageId");redisClient.getString("messageId");if (messageId != null) {if (messageId.equals(messageIdRedis)) {return;}}//redisTemplate.opsForValue().set(messageId, messageId, 3_000L, TimeUnit.SECONDS);//时间具体根据业务定redisClient.setKey(messageId,messageId,3_000L);// 写入缓存System.out.println("number==" + channel.getChannelNumber());JSONObject jsonObject = JSONObject.parseObject(msg);long deliverTag = message.getMessageProperties().getDeliveryTag();try {//具体业务,如:插入或修改user,这里用数据库可以不用redis做判断Integer age = (Integer) jsonObject.get("age");String name = (String) jsonObject.get("name");String sex = (String) jsonObject.get("sex");LocalDateTime localDateTime = LocalDateTime.now();Date dataTime = Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant());User user = new User();user.setSex(sex);user.setName(name);user.setDataTime(dataTime);user.setAge(age);int i = 0;if (user != null){i = userService.savetUser(user);//新增或修改redisClient.setKey(user.getId() + "", user,  5_000L);// 写入缓存,便于操作延时业务//redisTemplate.opsForList().rightPushAll(user.getId(), Arrays.asList(user));//loggerService.saveLog(jsonObject);//日志记录}log.info("收到消息obj:" + user + ",id=" + user.getId());log.info("保存了" + i + "记录");channel.basicAck(deliverTag,true);//手动设置ack,消费成功,确认消息}catch (Exception e){try {//异常返回false,就重新回到队列channel.basicNack(deliverTag, false, true);} catch (Exception ioException) {log.error("重新放入队列失败,失败原因:{}",e.getMessage(),e);}log.error("TopicConsumer消费者出错,mq参数:{},错误信息:{}",message,e.getMessage(),e);//channel.basicReject(deliverTag, false);}log.info("消费消息jsonObject:" + jsonObject + ",messageId:" + messageId);}/**@RabbitListener(queues = "fanoutQueue")public void sendMessageMap(Map map){log.info("收到消息map:" + map.toString());}*/}
ProducerController
@RestController
@RequestMapping(value = "producer")
public class ProducerController {@Autowiredprivate MyProducer myProducer;@RequestMapping("/sendjsonObject")public String sendjsonObject() {//前端传参数回来在producer那里模拟了,这里就略return myProducer.sendjsonObject("fanoutQueue");}/**@GetMapping("/sendMessageMap")public String sendMessageMap() {return myProducer.sendMessageMap("fanoutQueue");}*/}

application.yml

server:port: 8081spring:datasource:url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=falsedriver-class-name: com.mysql.jdbc.Driverusername: rootpassword: dbcp2: initial-size: 5                                        max-total: 5                                            max-wait-millis: 200                                    min-idle: 5 redis:database: 0host: 127.0.0.1port: 6379password:rabbitmq:host: localhostport: 5672username: guestpassword: guestpublisher-returns: truepublisher-confirms: trueconnection-timeout: 5000#另外一种打印语句的方式log-impl: org.apache.ibatis.logging.stdout.StdOutImpllogging:level:com:acong:dao: debugfile: d:/logs/redis.log

效果图:

RabbitMQ防止重复消费相关推荐

  1. RabbitMQ消息重复消费问题

    业务背景 消息队列在数据传输的过程中,为了保证消息传递的可靠性,一般会对消息采用ack确认机制,如果消息传递失败,消息队列会进行重试,此时便可能存在消息重复消费的问题. 比如,用户到银行取钱后会收到扣 ...

  2. 解决RabbitMQ消息丢失与重复消费问题

    解决RabbitMQ消息丢失与重复消费问题 参考文章: (1)解决RabbitMQ消息丢失与重复消费问题 (2)https://www.cnblogs.com/sessionbest/articles ...

  3. RabbitMQ—重复消费、数据丢失和消息顺序性

    原文作者:weixin_49367803 原文地址:https://blog.csdn.net/weixin_49367803/article/details/108480256 一.如何保证消息不被 ...

  4. Rabbitmq消息可靠投递和重复消费等问题解决方案

    消息的可靠性投递 在一些对数据一致性要求较高的业务场景里面,如果消息在发布和消费过程中出现了问题(消息丢失,消息重复消费),就会导致数据不一致,要做到消息的可靠性投递. 在RabbitMq里面提供了很 ...

  5. 【重难点】【RabbitMQ 02】如何避免消息重复投递和消息重复消费、如何防止消息丢失、如何保证消息的顺序性、如何保证消息队列的可用性

    [重难点][RabbitMQ 02]如何避免消息重复投递和消息重复消费.如何防止消息丢失.如何保证消息的顺序性.如何保证消息队列的可用性 文章目录 [重难点][RabbitMQ 02]如何避免消息重复 ...

  6. RabbitMQ如何解决被重复消费和数据丢失的问题?

    想想为什么要使用MQ? 1.解耦,系统A在代码中直接调用系统B和系统C的代码,如果将来D系统接入,系统A还需要修改代码,过于麻烦! 2.异步,将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快 ...

  7. RabbitMQ 可靠性、重复消费、顺序性、消息积压解决方案

    前言 为什么引入消息队列?引入 MQ 给我们解决了一些问题,但同时又引入了一些复杂的问题,这些问题是大型项目中必须解决的重点,更重要的是,面试也经常问.实际上消息队列可以说是没法百分之百保证可靠性的! ...

  8. RabbitMQ如何防止消息丢失及重复消费

    RabbitMQ目录 文章目录 RabbitMQ如何防止消息丢失及重复消费 一.消息丢失 1.1.生产者没有成功把消息发送到MQ 1.1.1.confirm(发布确认)机制 1.1.2.事务机制 1. ...

  9. RabbitMQ常见问题解决方案——消息丢失、重复消费、消费乱序、消息积压

    文章目录 背景 RabbitMQ常见问题解决方案 1. RabbitMQ的可靠性(消息丢失问题) 1.1 生产者丢失消息 1.2 RabbitMQ弄丢消息 1.2.1 交换机持久化 1.2.2 队列持 ...

最新文章

  1. Facebook称其新的AI技术可以更快发现仇恨言论
  2. Nginx负载均衡+tomcat+session共享
  3. Spring Junit 读取WEB-INF下的配置文件
  4. 关于网管软件中的预警功能的发展
  5. odu oracle 价格_Oracle数据库ODU的几种恢复场景
  6. Activiti6--入门学习--监听器
  7. 调用python-nmap实现扫描局域网存活主机
  8. JZOJ 4809. 挖金矿
  9. 企业微信逆向分析之——自己二维码——静态分析
  10. spanning-tree portfast default
  11. 细说华为和荣耀的关系:潮流的荣耀和稳重的华为
  12. 机器人学导论(一)——空间描述和变换
  13. 2.6java基础 数组
  14. 使用Webcam实现拍照功能
  15. 联想lenovo sl700 240G sata ps3111主控+未知颗粒 掉盘,ps3111写保护开卡量产修复过程
  16. offline强化学习之Revive SDK的使用
  17. cent os 7 与cent os 6区别
  18. SwiftUI AVKit 之合并和叠加音频mp3 wav 并输出(教程含源码)
  19. 立创eda学习笔记十:如何添加好友或组建团队
  20. c语言本地函数声明非法,C++本地函数定义是非法的

热门文章

  1. 由一张精益MVP图所浮想联翩
  2. 煽情的儿子460=随笔
  3. inotify事件监控
  4. 解决Win系统缺少msvcp100.dll无法启动问题
  5. 一位美国父亲写给儿子的信
  6. 阿里集团搜索中台TisPlus
  7. 一张图看懂 | 得帆云“非同帆响2.0”新产品发布会
  8. 《ThinkPad-笔记本如何进bios设置u盘启动步骤》
  9. 关于Ubuntu系统双显卡切换的问题
  10. 一人之下手游显示服务器满员,一人之下手游满级多少 满级后攻略[多图]