消费者客户端成功接收一条消息的标志是:这条消息被签收。
消费者客户端成功接收一条消息一般包括三个阶段:

         1、消费者接收消息,也即从MessageConsumer的receive方法返回

2、消费者处理消息

3、消息被签收

其中,第三阶段的签收可以有ActiveMQ发起,也可以由消费者客户端发起,取决于Session是否开启事务以及签收模式的设置。

在带事务的Session中,消费者客户端事务提交之时,消息自动完成签收。

在不带事务的Session中,消息何时以及如何被签收取决于Session的签收模式设置

activemq的消息确认机制就是文档中说的ack机制有:
    AUTO_ACKNOWLEDGE = 1    自动确认
    CLIENT_ACKNOWLEDGE = 2    客户端手动确认   
    DUPS_OK_ACKNOWLEDGE = 3    自动批量确认
    SESSION_TRANSACTED = 0    事务提交并确认
    INDIVIDUAL_ACKNOWLEDGE = 4    单条消息确认 activemq 独有
  ACK模式描述了Consumer与broker确认消息的方式(时机),比如当消息被Consumer接收之后,Consumer将在何时确认消息。
  对于broker而言,只有接收到ACK指令,才会认为消息被正确的接收或者处理成功了,通过ACK,可以在consumer(/producer)
  与Broker之间建立一种简单的“担保”机制.
  手动确认和单条消息确认需要手动的在客户端调用message.acknowledge()
  消息重发机制RedeliveryPolicy 有几个属性如下:
RedeliveryPolicy redeliveryPolicy= new RedeliveryPolicy();//是否在每次尝试重新发送失败后,增长这个等待时间redeliveryPolicy.setUseExponentialBackOff(true);//重发次数,默认为6次   这里设置为10次redeliveryPolicy.setMaximumRedeliveries(10);//重发时间间隔,默认为1秒redeliveryPolicy.setInitialRedeliveryDelay(1);//第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是valueredeliveryPolicy.setBackOffMultiplier(2);//是否避免消息碰撞redeliveryPolicy.setUseCollisionAvoidance(false);//设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效redeliveryPolicy.setMaximumRedeliveryDelay(-1);

那么在整合activemq时候就只需要修改配置文件和客户端就可以了,activemq就是这种机制,例如支付宝支付回调的时候,只有我们返回一个success,支付那边才不会给我重发消息

配置文件:

import javax.jms.Queue;import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;@EnableJms
@Configuration
public class ActiveMQ4Config {  @Beanpublic Queue queue(){return new ActiveMQQueue("queue1");}@Beanpublic RedeliveryPolicy redeliveryPolicy(){RedeliveryPolicy  redeliveryPolicy=   new RedeliveryPolicy();//是否在每次尝试重新发送失败后,增长这个等待时间redeliveryPolicy.setUseExponentialBackOff(true);//重发次数,默认为6次   这里设置为10次redeliveryPolicy.setMaximumRedeliveries(10);//重发时间间隔,默认为1秒redeliveryPolicy.setInitialRedeliveryDelay(1);//第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是valueredeliveryPolicy.setBackOffMultiplier(2);//是否避免消息碰撞redeliveryPolicy.setUseCollisionAvoidance(false);//设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效redeliveryPolicy.setMaximumRedeliveryDelay(-1);return redeliveryPolicy;}@Beanpublic ActiveMQConnectionFactory activeMQConnectionFactory (@Value("${activemq.url}")String url,RedeliveryPolicy redeliveryPolicy){  ActiveMQConnectionFactory activeMQConnectionFactory =  new ActiveMQConnectionFactory("admin","admin",url);activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);return activeMQConnectionFactory;}@Beanpublic JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory,Queue queue){JmsTemplate jmsTemplate=new JmsTemplate();jmsTemplate.setDeliveryMode(2);//进行持久化配置 1表示非持久化,2表示持久化
        jmsTemplate.setConnectionFactory(activeMQConnectionFactory);jmsTemplate.setDefaultDestination(queue); //此处可不设置默认,在发送消息时也可设置队列jmsTemplate.setSessionAcknowledgeMode(4);//客户端签收模式return jmsTemplate;}//定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂@Bean(name = "jmsQueueListener")public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {DefaultJmsListenerContainerFactory factory =new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(activeMQConnectionFactory);//设置连接数factory.setConcurrency("1-10");//重连间隔时间factory.setRecoveryInterval(1000L);factory.setSessionAcknowledgeMode(4);return factory;}}

消费者:

import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;@Component
public class Consumer {private final static Logger logger = LoggerFactory.getLogger(Consumer.class);@JmsListener(destination = "queue1", containerFactory = "jmsQueueListener")public void receiveQueue(final TextMessage text, Session session)throws JMSException {try {logger.debug("Consumer收到的报文为:" + text.getText());text.acknowledge();// 使用手动签收模式,需要手动的调用,如果不在catch中调用session.recover()消息只会在重启服务后重发} catch (Exception e) {    session.recover();// 此不可省略 重发信息使用
        }}
}

由此可以知道activemq的queue消息是可以保证消息不丢失,不会被重复消费的,因为会给每个消息设置一个唯一的id,当消息发送失败之后可以根据这个机制来进行消费,当然也是一种处理分布式事物的方法

消息中间件的模式是可以保证消息不会丢失的,持久化和自动重发,消息回签,都可以很好的避免那种机制。消费端代码发生异常了,可以自动重发,自动消息重发。由于之前在测试的时候足够看官方文档,所以理解说客户端发生异常了,是不可以进行重发的,但是今天了解之后,发觉还是自动重发的机制,利用回签机制进行的。

转载于:https://www.cnblogs.com/xiufengchen/p/10563402.html

springboot整合activemq加入会签,自动重发机制,持久化相关推荐

  1. SpringBoot整合activeMQ消息队列手动签收(Session.CLIENT_ACKNOWLEDGE)为什么失效啊?

    今天在家隔离办公,不太忙,然后就琢磨起来消息队列activeMQ的消息事务来解决分布式事务,但是奈何在SpringBoot整合activeMQ时,其消费者手动签收消息时出现了问题-->当acti ...

  2. activeMQ基础学习和SpringBoot整合activeMQ案例

    昨天仔细研究了activeMQ消息队列,也遇到了些坑,昨天晚上也写了篇文章记录坑的内容,其实上篇文章(SpringBoot整合activeMQ消息队列手动签收(Session.CLIENT_ACKNO ...

  3. springboot整合ActiveMQ(点对点和发布订阅)

    springboot整合ActiveMQ(点对点和发布订阅) ActiveMQ是什么,为什么使用MQ 是基于 Java 中的 JMS 消息服务规范实现的一个消息中间件. 1.系统解耦 采用中间件之后, ...

  4. SpringBoot SpringBoot 开发实用篇 5 整合第三方技术 5.21 SpringBoot 整合 ActiveMQ

    SpringBoot [黑马程序员SpringBoot2全套视频教程,springboot零基础到项目实战(spring boot2完整版)] SpringBoot 开发实用篇 文章目录 Spring ...

  5. 详细的springboot整合activeMq安装与使用(上)

    最近在学习activeMQ消息中间件,特此记录一下,方便以后使用. 如果有不严谨的地方,欢迎大家提出,共同进步呀. 本章,会先讲解activeMq的基本介绍.安装.和更改用户名.端口号.下节会仔细讲解 ...

  6. SpringBoot整合 ActiveMQ、SpringBoot整合RabbitMQ、SpringBoot整合Kafka

    1.概念:SpringBoot 整合消息服务2.具体内容对于异步消息组件在实际的应用之中会有两类:· JMS:代表作就是 ActiveMQ,但是其性能不高,因为其是用 java 程序实现的:· AMQ ...

  7. Springboot整合ActiveMQ发送邮件

    虽然ActiveMQ以被其他MQ所替代,但仍有学习的意义,本文采用邮件发送的例子展示ActiveMQ 文章目录 1. 生产者 1.1 引入maven依赖 1.2 application.yml配置 1 ...

  8. springboot整合RabbitMQ实现延时自动取消订单

    1.pom依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>s ...

  9. SpringBoot整合第三方技术学习笔记(自用)

    SpringBoot整合第三方技术学习笔记 搬运黑马视频配套笔记 KF-4.数据层解决方案 KF-4-1.SQL 回忆一下之前做SSMP整合的时候数据层解决方案涉及到了哪些技术?MySQL数据库与My ...

最新文章

  1. python 连接db2_大迷糊的博客
  2. VTK:重叠AMR用法实战
  3. linux 时间 无法连接服务器地址,怎么解决linux访问不了ip地址问题?
  4. abaqus python 建立节点集合_Abaqus中Python通过findAt方法建立region区域
  5. 用python 写网络爬虫--零基础
  6. python4.2_python4.2参数传入
  7. 用visio制作机柜服务器,visio 绘制机柜接线图 实例教程
  8. 华为云服务器如何登录
  9. 《智能商业》由阿里巴巴学术委员会主席、前总参谋长曾鸣亲自编写,值得一读!
  10. DLNA(明基的返校讲座)
  11. EasyExcel快速上手
  12. 关于程序的编译和解释!
  13. 用C语言写一个停车场管理系统代码
  14. 嵌入式开发|嵌入式软件框架《一》常用的软件框架介绍与选择
  15. 2023年新自采集壁纸网页源码+简约大气
  16. 模糊控制初学入门之概念认知
  17. 联想 n700 android,联想双模触控无线鼠标N700 双模式解析
  18. java 获取当前时间的三种方法
  19. Python库-uiautomator2(app自动化)
  20. 京东双十一累计下单超2044亿元背后,低线市场增量强劲

热门文章

  1. Hadoop 系列之 HDFS
  2. jdk和jre是什么?都有什么用?
  3. 布局网页表格要求其列平均分布的简单操作
  4. 4.3.1 jQuery基础(1)
  5. qq互联开放平台 开源SDK共享 常见问题
  6. [深度学习-NLP]Imdb数据集情感分析之模型对比(贝叶斯, LSTM, GRU, TextCNN, Transformer, BERT)
  7. python实现pdf解密和pdf转图片
  8. GAN —— 《Generative Adversarial Nets》
  9. 利用Excel VBA SQL做特殊文件浏览器
  10. 【版本控制管理】 深入 001 A successful Git branching model GIT 项目分支策略和释放管理