编写Fanout模式的消息接收

其他模块和上文保持一致https://blog.csdn.net/weixin_59334478/article/details/127740411?spm=1001.2014.3001.5501

ReceiveServiceImpl实现类

package com.it.rabbitmq.impl;import com.it.rabbitmq.ReceiveService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;import javax.annotation.Resource;@Service("receiveService")
public class ReceiveServiceImpl implements ReceiveService {//注入amqp的模板类,利用这个对象来发送和接收消息@Resourceprivate AmqpTemplate amqpTemplate;@Overridepublic void receiveMessage() {/*** 发送消息* 参数1为交换机名称* 参数2位RoutingKey* 参数3为具体发送的消息数据*/String message= (String) amqpTemplate.receiveAndConvert("bootDirectQueue");System.out.println(message);}/*** @RabbitListener:用于标记当前方法是一个rabbitmq的消息监听方法,作用是持续性的接收消息* 这个方法不需要手动调用,spring会自动监听* 属性queues:用于指定一个已经存在的队列名称,用于队列的监听* @param message  参数就是接收到的具体消息数据*/@RabbitListener(queues = {"bootDirectQueue"})public void directReceive(String message) {System.out.println("监听器接收的消息:"+message);}@RabbitListener(bindings = {@QueueBinding(value = @Queue(),exchange = @Exchange(name="fanoutExchange",type = "fanout"))})public void fanoutReceive1(String message) {System.out.println("fanoutReceive1监听器接收的消息:"+message);}@RabbitListener(bindings = {@QueueBinding(value = @Queue(),exchange = @Exchange(name="fanoutExchange",type = "fanout"))})public void fanoutReceive2(String message) {System.out.println("fanoutReceive2监听器接收的消息:"+message);}}

编写Fanout模式的消息接收

1.SendService接口

package com.it.rabbitmq;public interface SendService {void sendMessage(String message);void sendFanoutMessage(String message);
}

SendServiceImpl实现类

package com.it.rabbitmq.impl;import com.it.rabbitmq.SendService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Service;import javax.annotation.Resource;@Service("sendService")
public class SendServiceImpl implements SendService {//注入amqp的模板类,利用这个对象来发送和接收消息@Resourceprivate AmqpTemplate amqpTemplate;@Overridepublic void sendMessage(String message) {/*** 发送消息* 参数1为交换机名称* 参数2位RoutingKey* 参数3为具体发送的消息数据*/amqpTemplate.convertAndSend("bootDirectExchange", "bootDirectRouting", message);}@Overridepublic void sendFanoutMessage(String message) {amqpTemplate.convertAndSend("fanoutExchange","",message);}
}

2.RabbitMQConfig类

package com.it.rabbitmq.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {//配置一个direct类型的交换机@Beanpublic DirectExchange directExchange() {return new DirectExchange("bootDirectExchange");}//配置一个队列@Beanpublic Queue directQueue() {return new Queue("bootDirectQueue");}/*** 配置一个交换机和队列的绑定** @param directQueue 需要绑定的队列对象,参数名必须要与某个@Bean的方法名完全相同这样就会自动注入* @param directQueue 需要绑定的交换机对象,参数名必须要与某个@Bean的方法名完全相同这样就会自动注入* @return*/@Beanpublic Binding directBinding(Queue directQueue, DirectExchange directExchange) {//完成绑定:参数1为需要绑定的队列,参数2为需要绑定的交换机,参数3为需要绑定的routingkeyreturn BindingBuilder.bind(directQueue).to(directExchange).with("bootDirectRouting");}//配置一个fanout类型的交换机@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("fanoutExchange");}}

3.运行主函数

package com.it;import com.it.rabbitmq.SendService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;@SpringBootApplication
public class RabbitmqSpringbootApplication {public static void main(String[] args) {ApplicationContext ac = SpringApplication.run(RabbitmqSpringbootApplication.class, args);SendService sendService = (SendService) ac.getBean("sendService");
//        sendService.sendMessage("boot的测试数据");sendService.sendFanoutMessage("boot的fanout测试数据!");}}

编写Topic模式消息接收

ReceiveServiceImpl实现类

package com.it.rabbitmq.impl;import com.it.rabbitmq.ReceiveService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;import javax.annotation.Resource;@Service("receiveService")
public class ReceiveServiceImpl implements ReceiveService {//注入amqp的模板类,利用这个对象来发送和接收消息@Resourceprivate AmqpTemplate amqpTemplate;@Overridepublic void receiveMessage() {/*** 发送消息* 参数1为交换机名称* 参数2位RoutingKey* 参数3为具体发送的消息数据*/String message= (String) amqpTemplate.receiveAndConvert("bootDirectQueue");System.out.println(message);}/*** @RabbitListener:用于标记当前方法是一个rabbitmq的消息监听方法,作用是持续性的接收消息* 这个方法不需要手动调用,spring会自动监听* 属性queues:用于指定一个已经存在的队列名称,用于队列的监听* @param message  参数就是接收到的具体消息数据*/@RabbitListener(queues = {"bootDirectQueue"})public void directReceive(String message) {System.out.println("监听器接收的消息:"+message);}@RabbitListener(bindings = {@QueueBinding(value = @Queue(),exchange = @Exchange(name="fanoutExchange",type = "fanout"))})public void fanoutReceive1(String message) {System.out.println("fanoutReceive1监听器接收的消息:"+message);}@RabbitListener(bindings = {@QueueBinding(value = @Queue(),exchange = @Exchange(name="fanoutExchange",type = "fanout"))})public void fanoutReceive2(String message) {System.out.println("fanoutReceive2监听器接收的消息:"+message);}@RabbitListener(bindings = {@QueueBinding(value = @Queue("topic1"),key = {"aa"},exchange = @Exchange(name="topicExchange",type = "topic"))})public void topicReceive1(String message){System.out.println("topic1消费者---aa---"+message);}@RabbitListener(bindings = {@QueueBinding(value = @Queue("topic2"),key = {"aa.*"},exchange = @Exchange(name="topicExchange",type = "topic"))})public void topicReceive2(String message){System.out.println("topic2消费者---aa---"+message);}@RabbitListener(bindings = {@QueueBinding(value = @Queue("topic3"),key = {"aa.#"},exchange = @Exchange(name="topicExchange",type = "topic"))})public void topicReceive3(String message){System.out.println("topic3消费者---aa---"+message);}}

编写Topic模式消息发送

1.SendService接口

package com.it.rabbitmq;public interface SendService {void sendMessage(String message);void sendFanoutMessage(String message);void sendTopicMessage(String message);
}

SendServiceImpl类

package com.it.rabbitmq.impl;import com.it.rabbitmq.SendService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Service;import javax.annotation.Resource;@Service("sendService")
public class SendServiceImpl implements SendService {//注入amqp的模板类,利用这个对象来发送和接收消息@Resourceprivate AmqpTemplate amqpTemplate;@Overridepublic void sendMessage(String message) {/*** 发送消息* 参数1为交换机名称* 参数2位RoutingKey* 参数3为具体发送的消息数据*/amqpTemplate.convertAndSend("bootDirectExchange", "bootDirectRouting", message);}@Overridepublic void sendFanoutMessage(String message) {amqpTemplate.convertAndSend("fanoutExchange","",message);}@Overridepublic void sendTopicMessage(String message) {amqpTemplate.convertAndSend("topicExchange","",message);}}

2.RabbitMQConfig类,提前声明一个topic的交换机

package com.it.rabbitmq.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {//配置一个direct类型的交换机@Beanpublic DirectExchange directExchange() {return new DirectExchange("bootDirectExchange");}//配置一个队列@Beanpublic Queue directQueue() {return new Queue("bootDirectQueue");}/*** 配置一个交换机和队列的绑定** @param directQueue 需要绑定的队列对象,参数名必须要与某个@Bean的方法名完全相同这样就会自动注入* @param directQueue 需要绑定的交换机对象,参数名必须要与某个@Bean的方法名完全相同这样就会自动注入* @return*/@Beanpublic Binding directBinding(Queue directQueue, DirectExchange directExchange) {//完成绑定:参数1为需要绑定的队列,参数2为需要绑定的交换机,参数3为需要绑定的routingkeyreturn BindingBuilder.bind(directQueue).to(directExchange).with("bootDirectRouting");}//配置一个fanout类型的交换机@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("fanoutExchange");}//配置一个topic类型的交换机@Beanpublic TopicExchange topicExchange() {return new TopicExchange("topicExchange");}}

3.运行主函数

package com.it;import com.it.rabbitmq.SendService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;@SpringBootApplication
public class RabbitmqSpringbootApplication {public static void main(String[] args) {ApplicationContext ac = SpringApplication.run(RabbitmqSpringbootApplication.class, args);SendService sendService = (SendService) ac.getBean("sendService");
//        sendService.sendMessage("boot的测试数据");//        sendService.sendFanoutMessage("boot的fanout测试数据!");sendService.sendTopicMessage("boot的topic测试数据,key:aa");}}

功能测试:

查看接收类

修改发送信息的routingkey

修改发送信息的routingkey

springboot集成rabbitmq:fanout、topic相关推荐

  1. springboot 集成rabbitmq 实例

    springboot 集成rabbitmq 实例 个人在学习rabbitmq时发现网上很少有系统性介绍springboot和rabbitmq如何集成的,其他人总结的都片段化,所以结合个人调研过程,整理 ...

  2. RabbitMQ——SpringBoot集成RabbitMQ

    文章目录: 1.创建一个SpringBoot工程--消息发送者 1.创建一个SpringBoot工程--消息接收者 3.测试结果 3.1 direct 3.2 fanout 3.3 topic 3.4 ...

  3. SpringBoot集成RabbitMQ消息队列搭建与ACK消息确认入门

    1.Windows下安装RabbitMQ的步骤详解+图解(erlang+RabbitMQ) 2.SpringBoot集成RabbitMQ参考文章 1.RabbitMQ介绍 RabbitMQ是实现AMQ ...

  4. Springboot集成rabbitMQ之mandatory和备份交换机

    Springboot集成rabbitMQ之mandatory和备份交换机 mandatory 之前编写的消息队列代码中,通过重写ConfirmCallback中的confirm方法实现了消息送达的确认 ...

  5. Springboot集成RabbitMQ一个完整案例

    springboot 集成 RabbitMQ 非常简单,如果只是简单的使用配置非常少,springboot 提供了 spring-boot-starter-amqp 对消息各种支持. 1.配置pom文 ...

  6. springboot 集成 RabbitMQ confirm 确认模式和 return 回退模式以及Consumer Ack模式

    springboot 集成 RabbitMQ confirm 确认模式和 return 回退模式以及Consumer Ack模式 说明: RabbitMQ消息的可靠投递 在使用 RabbitMQ 的时 ...

  7. Springboot集成rabbitmq实现延时队列

    Springboot集成rabbitmq实现延时队列 什么是延时队列? 列举几个使用场景: 常见的种类有: 延时任务-实现方式: 详细信息:[https://www.cnblogs.com/JonaL ...

  8. springboot集成rabbitmq商品秒杀业务实战(流量削峰)

    消息队列如何实现流量削峰? 要对流量进行削峰,最容易想到的解决方案就是用消息队列来缓冲瞬时流量,把同步的直接调用转换成异步的间接推送,中间通过一个队列在一端承接瞬时的流量洪峰,在另一端平滑地将消息推送 ...

  9. springboot集成rabbitMQ实现消息的推送

    RabbitMQ消息中间件组件,目前比较流行的消息中间件有:RabbitMQ.RocketMQ.ActiveMQ.Kafka等. 我的代码中用的是RabbitMQ,先介绍几个概念: 一:消息队列的特性 ...

最新文章

  1. 基于visual Studio2013解决C语言竞赛题之1085相邻之和素数
  2. ecplice中class.forname一直报错_英雄联盟LOL闪退,弹出吉格斯报错BUGSPLAT
  3. K8s Pod 钩子生命周期
  4. [恢]hdu 1407
  5. C语言float数据类型介绍、示例和应用经验
  6. 活动选择问题 贪心
  7. Android ASCII编码转换成相对应字符
  8. 什么是文本分析,文本分析主要运用到了哪些关键技术?
  9. arcgis路网密度计算、提取中心线、面积计算
  10. 微信小程序支持分享到朋友圈了
  11. MATLAB数据导入
  12. 雅虎免费邮箱开通POP3和自动转发的方法
  13. 详解Yarn中三种资源调度器(FIFO Scheduler、Capacity Scheduler、Fair Scheduler)和配置自定义队列实现任务提交不同队列
  14. 基于java springboot租房平台设计,公寓租赁系统
  15. Redis 安装+设置密码
  16. 少年之文明与国之文明——---从奥运会看国人素质之飞跃
  17. OR-CAD CAPTURE学习笔记——ERROR(ORCAP-11010)
  18. 发哥莫慌!这56亿让区块链帮你搞定
  19. CSS盒子塌陷及解决方法
  20. Linux中硬盘分区、创建逻辑卷

热门文章

  1. Express框架入门(三)结合 multer 上传图片
  2. 蓝屏代码:0x000000ED:UNMOUNTABLE_BOOT_VOLUME 的解决方案
  3. branch and bound(分支定界)算法
  4. 全角字符与半角字符的互相转化
  5. JavaFX基础:1: 简介
  6. 禁用win7+ 64位驱动签名功能
  7. 手机社交游戏设计中交互理念的渗透
  8. 【数据挖掘】多特征组合的基本方法
  9. docker 安装Kafka集群
  10. 这家零部件制造企业用无代码搭建个性化ERP系统,实现全业务闭环管理