1.引入依赖
        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

配置application.properties文件:

spring.rabbitmq.virtual-host=/
#spring.rabbitmq.host=localhost
spring.rabbitmq.addresses=172.30.67.122:5672,172.30.67.122:5673,172.30.67.122:5674
#spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin#producer
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true#consumer
##手工确认消费者消费的消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual
##设置Qos,即RabbitMQ服务器每次推送给消费者未ack消息的个数
spring.rabbitmq.listener.simple.prefetch=1
2.定义队列、交换器、路由之间的绑定关系
package com.yaomy.control.rabbitmq.amqp.config;import com.google.common.collect.Maps;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.Map;/*** @Description: RabbitMQ生产者交换器、绑定、队列声明* @Version: 1.0*/
@SuppressWarnings("all")
@Configuration
public class RabbitConfig {/*** 声明队列*/@Beanpublic Queue topicQueue(){Map<String, Object> args = Maps.newHashMap();/*** 设置消息发送到队列之后多久被丢弃,单位:毫秒*///args.put("x-message-ttl", 60000);/*** 定义优先级队列,消息最大优先级为15,优先级范围为0-15,数字越大优先级越高*/args.put("x-max-priority", 15);/*** 设置持久化队列*/return QueueBuilder.durable("test_queue2").withArguments(args).build();}/*** 声明Topic类型交换器*/@Beanpublic TopicExchange topicExchange(){TopicExchange exchange = new TopicExchange("test_exchange2");return exchange;}/*** Topic交换器和队列通过bindingKey绑定* @return*/@Beanpublic Binding bindingTopicExchangeQueue(){return BindingBuilder.bind(topicQueue()).to(topicExchange()).with("*.topic.*");}
}
3.定义生产者
package com.yaomy.control.rabbitmq.amqp;import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.connection.PublisherCallbackChannel;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.UUID;/*** @Description: RabbitMQ生产者* @ProjectName: spring-parent* @Version: 1.0*/
@SuppressWarnings("all")
@Component
public class RabbitSender {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 创建一个消息是否投递成功的回调方法*/private final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {/**** @param correlationData 消息的附加信息* @param ack true for ack, false for nack* @param cause 是一个可选的原因,对于nack,如果可用,否则为空。*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if(!ack){//可以进行日志记录、异常处理、补偿处理等System.err.println("异常ack-"+ack+",id-"+correlationData.getId()+",cause:"+cause);}else {//更新数据库,可靠性投递机制System.out.println("正常ack-"+ack+",id-"+correlationData.getId());try{System.out.println(new String(correlationData.getReturnedMessage().getBody()));} catch (Exception e){}}}};/*** 创建一个消息是否被队列接收的监听对象,如果没有队列接收发送出的消息,则调用此方法进行后续处理*/private final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {/**** @param message 被退回的消息* @param replyCode 错误编码* @param replyText 错误描述* @param exchange 交换器* @param routingKey 路由*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.err.println("spring_returned_message_correlation:"+message.getMessageProperties().getHeaders().get(PublisherCallbackChannel.RETURNED_MESSAGE_CORRELATION_KEY)+"return exchange: " + exchange+ ", routingKey: "+ routingKey+ ", replyCode: " + replyCode+ ", replyText: " + replyText+ ",message:" + message);try {System.out.println(new String(message.getBody()));} catch (Exception e){}}};/*** 发送消息* @param exchange 交换器* @param route 路由键* @param message 消息* @param properties*/public void sendMsg(String exchange, String routingKey, String message, MessageProperties properties){try {if(null == properties){properties = new MessageProperties();}/*** 设置消息唯一标识*/properties.setMessageId(UUID.randomUUID().toString());/*** 创建消息包装对象*/Message msg = MessageBuilder.withBody(message.getBytes()).andProperties(properties).build();/*** 设置生产者消息publish-confirm回调函数*/this.rabbitTemplate.setConfirmCallback(confirmCallback);/*** 设置消息退回回调函数*/this.rabbitTemplate.setReturnCallback(returnCallback);/*** 将消息主题和属性封装在Message类中*/Message returnedMessage = MessageBuilder.withBody(message.getBytes()).build();/*** 相关数据*/CorrelationData correlationData = new CorrelationData();/*** 消息ID,全局唯一*/correlationData.setId(msg.getMessageProperties().getMessageId());/*** 设置此相关数据的返回消息*/correlationData.setReturnedMessage(returnedMessage);/*** 如果msg是org.springframework.amqp.core.Message对象的实例,则直接返回,否则转化为Message对象*/this.rabbitTemplate.convertAndSend(exchange, routingKey, msg, new MessagePostProcessor() {/*** 消息后置处理器,消息在转换成Message对象之后调用,可以用来修改消息中的属性、header*/@Overridepublic Message postProcessMessage(Message message) throws AmqpException {MessageProperties msgProperties = message.getMessageProperties();/*** 设置消息发送到队列之后多久被丢弃,单位:毫秒* 此种方案需要每条消息都设置此属性,比较灵活;* 还有一种方案是在声明队列的时候指定发送到队列中的过期时间;* * Queue queue = new Queue("test_queue2");* * queue.getArguments().put("x-message-ttl", 10000);* 这两种方案可以同时存在,以值小的为准*///msgProperties.setExpiration("10000");/*** 设置消息的优先级*/msgProperties.setPriority(9);/*** 设置消息发送到队列中的模式,持久化|非持久化(只存在于内存中)*/msgProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;}}, correlationData);} catch (Exception e){e.printStackTrace();}}
}
4.消费者

@RabbitListener注解标注指定的方法监听指定的队列、也可以标注在类上结合@RabbitHandler使用;监听方法可以使用多种参数接收消息,查看源码可以看到是允许六种参数:

* Annotated methods are allowed to have flexible signatures similar to what* {@link MessageMapping} provides, that is* <ul>* <li>{@link com.rabbitmq.client.Channel} to get access to the Channel</li>* <li>{@link org.springframework.amqp.core.Message} or one if subclass to get access to* the raw AMQP message</li>* <li>{@link org.springframework.messaging.Message} to use the messaging abstraction* counterpart</li>* <li>{@link org.springframework.messaging.handler.annotation.Payload @Payload}-annotated* method arguments including the support of validation</li>* <li>{@link org.springframework.messaging.handler.annotation.Header @Header}-annotated* method arguments to extract a specific header value, including standard AMQP headers* defined by {@link org.springframework.amqp.support.AmqpHeaders AmqpHeaders}</li>* <li>{@link org.springframework.messaging.handler.annotation.Headers @Headers}-annotated* argument that must also be assignable to {@link java.util.Map} for getting access to* all headers.</li>* <li>{@link org.springframework.messaging.MessageHeaders MessageHeaders} arguments for* getting access to all headers.</li>

示例代码:

package com.yaomy.control.rabbitmq.amqp;import com.rabbitmq.client.Channel;
import org.apache.commons.lang3.math.NumberUtils;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;import java.util.Map;/*** @Description: RabbitMQ消息消费者* @Version: 1.0*/
@SuppressWarnings("all")
@Component
public class RabbitReceiver {/**** @param channel 信道* @param message 消息* @throws Exception*/@RabbitListener(queues = "test_queue2")public void onMessage(Channel channel, Message message) throws Exception {System.out.println("--------------------------------------");System.out.println("消费端Payload: " + message.getPayload()+"-ID:"+message.getHeaders().getId()+"-messageId:"+message.getHeaders());Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);//手工ACK,获取deliveryTagchannel.basicAck(deliveryTag, false);}/**** @param channel 信道* @param message 消息* @throws Exception*/@RabbitListener(queues = "test_queue2")public void onMessage(Channel channel, org.springframework.amqp.core.Message message) throws Exception {System.out.println("--------------------------------------");System.out.println("消费端Payload: " + new String(message.getBody())+"-messageId:"+message.getMessageProperties().getMessageId());message.getMessageProperties().getHeaders().forEach((key, value)->{System.out.println("header=>>"+key+"="+value);});Long deliveryTag = message.getMessageProperties().getDeliveryTag();//手工ACK,获取deliveryTagchannel.basicAck(deliveryTag, false);}/**** @param channel 信道* @param body 负载* @param amqp_messageId 消息唯一标识* @param headers 消息header* @throws Exception*///获取特定的消息@RabbitListener(queues = "test_queue2")//@RabbitHandlerpublic void handleMessage(Channel channel, @Payload byte[] body, @Header String amqp_messageId,  @Headers Map<String, Object> headers) throws Exception{System.out.println("====消费消息===amqp_messageId:"+amqp_messageId);headers.keySet().forEach((key)->{System.out.println("header=>>"+key+"="+headers.get(key));});System.out.println(new String(body));Long deliveryTag = NumberUtils.toLong(headers.get("amqp_deliveryTag").toString());/*** 手动Ack*/channel.basicAck(deliveryTag, false);}/**** @param channel 信道* @param body 负载* @param headers 消息header* @throws Exception*/@RabbitListener(queues = "test_queue2")//@RabbitHandlerpublic void handleMessage(Channel channel, @Payload byte[] body, MessageHeaders headers) throws Exception{System.out.println("====消费消息===amqp_messageId:"+headers);headers.keySet().forEach((key)->{System.out.println("header=>>"+key+"="+headers.get(key));});System.out.println(new String(body));Long deliveryTag = NumberUtils.toLong(headers.get("amqp_deliveryTag").toString());/*** 手动Ack*/channel.basicAck(deliveryTag, false);}
}

使用MessageHeaders参数类型接收header示例如下:

header=>>amqp_receivedDeliveryMode=PERSISTENT
header=>>amqp_receivedExchange=test_exchange2
header=>>amqp_deliveryTag=1
header=>>amqp_consumerQueue=test_queue2
header=>>amqp_redelivered=false
header=>>priority=9
header=>>amqp_receivedRoutingKey=test.topic.key
header=>>number=12345
header=>>spring_listener_return_correlation=34b20ba3-9fb0-49e5-b57e-5484e773e9b3
header=>>send_time=2019-12-27 16:08:55
header=>>spring_returned_message_correlation=cba66847-5c31-4ecb-bf97-07ba65238b9a
header=>>amqp_messageId=cba66847-5c31-4ecb-bf97-07ba65238b9a
header=>>id=b58e6adb-e5b2-9513-45c5-8376311cf82f
header=>>amqp_consumerTag=amq.ctag-hPKHjdTdj0NY4awa4bFkAA
header=>>contentType=application/octet-stream
header=>>timestamp=1577434135312

其headers中有一个id属性,这个属性是消费者接收消息new MessageHeaders的时候生成的随机UUID,不可以作为整个系统中消息的唯一标识,只可以作为消费端的唯一标识。

GitHub地址:https://github.com/mingyang66/spring-parent

RabbitMQ学习笔记:springboot2 amqp集成生产者消费者相关推荐

  1. RabbitMQ学习笔记和AMQP协议浅析

    目录 RabbitMQ MQ的相关概念 消息队列协议 消息持久化 消息的分发策略 docker安装RabbitMQ AMQP协议 RabbitMQ的几种模式 简单simple模式 发布/订阅fanou ...

  2. RabbitMQ 学习笔记

    RabbitMQ 学习笔记 RabbitMQ 学习笔记 1. 中间件 1.1 什么是中间件 1.2 为什么要使用消息中间件 1.3 中间件特点 1.4 在项目中什么时候使用中间件技术 2. 中间件技术 ...

  3. RabbitMQ学习笔记(3)----RabbitMQ Worker的使用

    1. Woker队列结构图 这里表示一个生产者生产了消息发送到队列中,但是确有两个消费者在消费同一个队列中的消息. 2. 创建一个生产者 Producer如下: package com.wangx.r ...

  4. Rabbitmq学习笔记(尚硅谷2021)

    Rabbitmq学习笔记 (尚硅谷) 1.MQ 的概念 1.1 什么是 MQ? 1.2 为什么要用 MQ? 削峰 解耦 异步 1.3 MQ 的分类 ActiveMQ Kafka RocketMQ Ra ...

  5. 分布式消息中间件之RabbitMQ学习笔记[一]

    写在前面 嗯,陆续的整理一些中间件的笔记 今天和小伙伴们分享RabbitMQ 相关笔记 博文偏理论,内容涉及: RabbitMQ的简单介绍 AMQP协议标准介绍 RabbitMQ Demo 食用方式: ...

  6. Rabbitmq学习笔记教程-尚硅谷

    Rabbitmq学习笔记 (尚硅谷) 尚硅谷 rabbitmq 教程 1.MQ 的概念 1.1 什么是 MQ? 存放消息的队列,互联网架构中常见的一种服务与服务之间通信的方式. 1.2 为什么要用 M ...

  7. RabbitMQ学习笔记(高级篇)

    RabbitMQ学习笔记(高级篇) 文章目录 RabbitMQ学习笔记(高级篇) RabbitMQ的高级特性 消息的可靠投递 生产者确认 -- confirm确认模式 生产者确认 -- return确 ...

  8. RabbitMQ学习笔记四:RabbitMQ命令(附疑难问题解决)

    RabbitMQ学习笔记四:RabbitMQ命令(附疑难问题解决) 参考文章: (1)RabbitMQ学习笔记四:RabbitMQ命令(附疑难问题解决) (2)https://www.cnblogs. ...

  9. RabbitMQ学习笔记(一)

    前言: 学习B站UP主狂神说视频笔记整理视频链接 什么是中间件 中间件( Middleware ) 是处于操作系统和应用程序之间的软件,也有人认为它应该属于操作系统中的一部分.人们在使用中间件时,往往 ...

最新文章

  1. LeetCode简单题之判断矩阵经轮转后是否一致
  2. HDu 3449 (有依赖的01背包) Consumer
  3. adb devices检测不到夜神模拟器的解决办法
  4. Smali插桩打日志
  5. Oracle IMPDP导入数据案例之注意事项(undo/temp)
  6. 100个囚犯和灯泡C语言,关于国王和100个囚犯
  7. Java 并发编程之 CopyOnWriteArrayList
  8. 遍历目录下的所有文件-os.walk
  9. Educational Codeforces Round 50: F. Relatively Prime Powers(莫比乌斯函数)
  10. 现控笔记(三):状态空间表达式的解
  11. 《深入理解计算机系统》家庭作业
  12. XenCenter导出虚拟机
  13. 命令创建vue项目工程
  14. 24V电磁铁电磁特模块电路分析
  15. [转载]Ext.form.BasicForm getValues()和getFie_-Chaz-_新浪博客
  16. Doris export任务概率性cancelled
  17. 某网站登录接口password参数还原
  18. 断链在平曲线计算中的处理——短链篇
  19. 设计模式之访问者模式(Vistor)
  20. 数据挖掘综合应用:贷款产品预测案例

热门文章

  1. js 获取所有class相同的元素对象(简单实用)
  2. wargame v2.1 Web Wrtteup By Assassin
  3. 数据结构与算法(Java) 54:数值累加
  4. Ubuntu18 安装SciDavis
  5. 机器学习笔记之配分函数(三)对比散度
  6. 【Unity+MySQL】实现简单的注册登录系统
  7. 基于设备指纹零感验证系统
  8. Vue2.0开发之——Vue基础用法-axios(29)
  9. 视觉SLAM十四讲读书笔记(2)P10-P27
  10. 自适应随机图片背景导航页源码