【RabbitMQ】基础三:发布与订阅模式(Publish/Subscribe)
【RabbitMQ】基础三:发布与订阅模式(Publish/Subscribe)
- 1. 订阅模式
- 2. 发布与订阅模式说明
- 3. 代码示例
- 3.1 生产者
- 3.2 消费者
- 3.3 测试
- 4. 总结
1. 订阅模式
订阅模式示例图:
前面2个案例中,只有3个角色:
- P:生产者,也就是要发送消息的程序
- C:消费者:消息的接受者,会一直等待消息到来。
- queue:消息队列,图中红色部分
而在订阅模型中,多了一个exchange角色,而且过程略有变化:
- P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
- C:消费者,消息的接受者,会一直等待消息到来。
- Queue:消息队列,接收消息、缓存消息。
- Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
**Exchange(交换机)只负责转发消息,不具备存储消息的能力,**因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
2. 发布与订阅模式说明
发布订阅模式:
1、每个消费者监听自己的队列。
2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息
简单解释就是,可以将消息发送给不同类型的消费者。做到发布一次,消费多个。
3. 代码示例
3.1 生产者
package com.siyi.simple;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class ConnectionUtil {public static Connection getConnection() throws Exception {//创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();//设置主机地址;默认为localhostconnectionFactory.setHost("localhost");//连接端口;默认为 5672connectionFactory.setPort(5672);//虚拟主机名称;默认为/connectionFactory.setVirtualHost("/siyi");//连接用户名;默认为guestconnectionFactory.setUsername("siyi");//连接密码;默认为guestconnectionFactory.setPassword("siyi");//返回连接return connectionFactory.newConnection();}
}
package com.siyi.ps;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.siyi.simple.ConnectionUtil;public class Producer {//交换机名称static final String FANOUT_EXCHANGE = "fanout_exchange";//队列名称static final String FANOUT_QUEUE_1 = "fanout_queue_1";//队列名称static final String FANOUT_QUEUE_2 = "fanout_queue_2";public static void main(String[] args) throws Exception {//创建连接Connection connection = ConnectionUtil.getConnection();//创建通道(频道)Channel channel = connection.createChannel();/*** 声明交换机* 参数1:交换机名称* 参数2:交换机类型,fanout,topic,direct,headers*/channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);//声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其他参数*/channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);//队列绑定交换机channel.queueBind(FANOUT_QUEUE_1,FANOUT_EXCHANGE,"");channel.queueBind(FANOUT_QUEUE_2,FANOUT_EXCHANGE,"");for(int i=0;i<=10;i++){//发送消息String message = "你好,世界~ 发布订阅模式:"+i;/*** 参数1:交换机名称,如果没有指定则使用默认Default Exchage* 参数2:路由key,简单模式可以传递队列名称* 参数3:消息其他属性* 参数4:消息内容*/channel.basicPublish(FANOUT_EXCHANGE,"",null,message.getBytes());System.out.println("已发送消息:"+message);}//关闭资源channel.close();connection.close();}
}
3.2 消费者
消费者1
package com.siyi.ps;import com.rabbitmq.client.*;
import com.siyi.simple.ConnectionUtil;import java.io.IOException;public class Consumer1 {public static void main(String[] args) throws Exception {//创建连接Connection connection = ConnectionUtil.getConnection();//创建通道(频道)Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Producer.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);//声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其他参数*/channel.queueDeclare(Producer.FANOUT_QUEUE_1, true, false, false, null);//队列绑定交换机channel.queueBind(Producer.FANOUT_QUEUE_1,Producer.FANOUT_EXCHANGE,"");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel){/*** @param consumerTag 消息者标签,在channel.basicConsume时候可以指定* @param envelope 消息包的内容,可以从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* @param properties 属性信息* @param body 消息* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key为:"+envelope.getRoutingKey());//交换机System.out.println("交换机为:"+envelope.getExchange());//消息idSystem.out.println("消息id为:"+envelope.getDeliveryTag());//收到的消息System.out.println("消费者1-接收到的消息为:"+new String(body, "utf-8"));}};//监听消息/*** 参数1:队列名称* 参数2:是是否自动确认,设置为true为表示消息接收到自动想mq回复接收到了,* mq接收到回复会删除消息,设置为false则需要手动确认。* 参数3:消息接收到后回调*/channel.basicConsume(Producer.FANOUT_QUEUE_1,false,consumer);}
}
消费者2
package com.siyi.ps;import com.rabbitmq.client.*;
import com.siyi.simple.ConnectionUtil;import java.io.IOException;public class Consumer2 {public static void main(String[] args) throws Exception {//创建连接Connection connection = ConnectionUtil.getConnection();//创建通道(频道)Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Producer.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);//声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其他参数*/channel.queueDeclare(Producer.FANOUT_QUEUE_2, true, false, false, null);//队列绑定交换机channel.queueBind(Producer.FANOUT_QUEUE_2,Producer.FANOUT_EXCHANGE,"");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel){/*** @param consumerTag 消息者标签,在channel.basicConsume时候可以指定* @param envelope 消息包的内容,可以从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* @param properties 属性信息* @param body 消息* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key为:"+envelope.getRoutingKey());//交换机System.out.println("交换机为:"+envelope.getExchange());//消息idSystem.out.println("消息id为:"+envelope.getDeliveryTag());//收到的消息System.out.println("消费者2-接收到的消息为:"+new String(body, "utf-8"));}};//监听消息/*** 参数1:队列名称* 参数2:是是否自动确认,设置为true为表示消息接收到自动想mq回复接收到了,* mq接收到回复会删除消息,设置为false则需要手动确认。* 参数3:消息接收到后回调*/channel.basicConsume(Producer.FANOUT_QUEUE_2,false,consumer);}
}
3.3 测试
启动所有消费者,然后使用生产者发送消息;在每个消费者对应的控制台可以查看到生产者发送的所有消息;到达广播的效果。
在执行完测试代码后,其实到RabbitMQ的管理后台找到 Exchanges 选项卡,点击 fanout_exchange 的交换机,可以查看到如下的绑定:
4. 总结
交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
发布订阅模式与工作队列模式的区别
- 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
- 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
- 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑定到默认的交换机 。
【RabbitMQ】基础三:发布与订阅模式(Publish/Subscribe)相关推荐
- RabbitMQ 入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe)
发布/订阅 在上篇第二部分教程中,我们搭建了一个工作队列.每个任务之分发给一个工作者(worker).在本篇教程中,我们要做的之前完全不一样--分发一个消息给多个消费者(consumers).这种模式 ...
- RabbitMQ官方中文入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe)
2019独角兽企业重金招聘Python工程师标准>>> 发布/订阅 在上篇教程中,我们搭建了一个工作队列.每个任务之分发给一个工作者(worker).在本篇教程中,我们要做的之前完全 ...
- AKKA 集群中的发布与订阅Distributed Publish Subscribe in Cluster
Distributed Publish Subscribe in Cluster 基本定义 在单机环境下订阅与发布是很常用的,然而在集群环境是比较麻烦和不好实现的: AKKA已经提供了相应的实现,集群 ...
- 知方可补不足~SQL2008中的发布与订阅模式~续
上一回介绍了如何在sql2008中建立一个数据库的发布者,今天来说一下如何建立一个订阅者,其实订阅者也是一个数据库,而这个数据库是和发布者的数据结构相同的库,它们之间通过SQL代理进行数据上的同步. ...
- RabbitMQ工作模式Publish/Subscribe发布订阅,test测试代码
RabbitMQ有以下几种工作模式 : 1.Work queues 工作队列 2.Publish/Subscribe 发布订阅 3.Routing 路由 4.Topics 通 ...
- RabbitMQ入门:发布/订阅(Publish/Subscribe)
在前面的两篇博客中 RabbitMQ入门:Hello RabbitMQ 代码实例 RabbitMQ入门:工作队列(Work Queue) 遇到的实例都是一个消息只发送给一个消费者(工作者),他们的消息 ...
- Publish/Subscribe 发布与订阅模式
Publish/Subscribe 发布与订阅: 通过交换机来实现,一个生产者可以让不同队列的消费者同时得到消息 生产者: package Fanout; import com.rabbitmq.cl ...
- RabbitMQ消息队列:发布/订阅(Publish/Subscribe)
2019独角兽企业重金招聘Python工程师标准>>> 前面我们把每个Message都是deliver到某个单一的Consumer.今天我们将了解如何把同一个Message deli ...
- redis基础教程 --发布与订阅
redis 发布订阅 redis发布 与订阅是一种信息通信模式,发送者(pub)发送信息,订阅者(sub)接收信息 客户端订阅消息 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传 ...
最新文章
- Android 懒加载
- column 'XXXX' in field list is ambiguous
- python语音程序设计基础篇_【笔记】python自学笔记(基础篇)——字典操作
- python怎么导入时间-Python的import导入与时间
- 计算机语言低下限高上限,原神双雷阵容厉不厉害
- 大规模markpoint特效
- JSP的文件上传处理
- 服务器维护 测试化验加工费,测试化验加工费.PPT
- 明晚直播预告丨Oracle 19c避雷经验分享
- 计算机复试考研专业课,2018计算机考研专业课复试复习攻略
- oracle rac防护,Oracle RAC日常基本维护命令
- 成功解决 AttributeError: module ‘neat’ has no attribute ‘Config’解决方式
- tp3.2 访问地址url大小写及控制器多个单词组成时url
- WPF 框架prism代码笔记
- 一键免费下载外文文献的方式
- ibm刀片服务器虚拟化,刀片服务器内置虚拟化 IBM升级服务器
- 转盘抽奖小程序java_大转盘抽奖小程序版 转盘抽奖网页版
- 【5G手机漏接电话问题解决方式】
- css3 匀速运动的圆
- Istio的授权策略