【RabbitMQ】基础三:发布与订阅模式(Publish/Subscribe)

  • 1. 订阅模式
  • 2. 发布与订阅模式说明
  • 3. 代码示例
    • 3.1 生产者
    • 3.2 消费者
    • 3.3 测试
  • 4. 总结

1. 订阅模式

订阅模式示例图:

前面2个案例中,只有3个角色:

  • P:生产者,也就是要发送消息的程序
  • C:消费者:消息的接受者,会一直等待消息到来。
  • queue:消息队列,图中红色部分

而在订阅模型中,多了一个exchange角色,而且过程略有变化:

  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
  • C:消费者,消息的接受者,会一直等待消息到来。
  • Queue:消息队列,接收消息、缓存消息。
  • Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

**Exchange(交换机)只负责转发消息,不具备存储消息的能力,**因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

2. 发布与订阅模式说明


发布订阅模式:
1、每个消费者监听自己的队列。
2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息

简单解释就是,可以将消息发送给不同类型的消费者。做到发布一次,消费多个。

3. 代码示例

3.1 生产者

package com.siyi.simple;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class ConnectionUtil {public static Connection getConnection() throws Exception {//创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();//设置主机地址;默认为localhostconnectionFactory.setHost("localhost");//连接端口;默认为 5672connectionFactory.setPort(5672);//虚拟主机名称;默认为/connectionFactory.setVirtualHost("/siyi");//连接用户名;默认为guestconnectionFactory.setUsername("siyi");//连接密码;默认为guestconnectionFactory.setPassword("siyi");//返回连接return connectionFactory.newConnection();}
}
package com.siyi.ps;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.siyi.simple.ConnectionUtil;public class Producer {//交换机名称static final String FANOUT_EXCHANGE = "fanout_exchange";//队列名称static final String FANOUT_QUEUE_1 = "fanout_queue_1";//队列名称static final String FANOUT_QUEUE_2 = "fanout_queue_2";public static void main(String[] args) throws Exception {//创建连接Connection connection = ConnectionUtil.getConnection();//创建通道(频道)Channel channel = connection.createChannel();/*** 声明交换机* 参数1:交换机名称* 参数2:交换机类型,fanout,topic,direct,headers*/channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);//声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其他参数*/channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);//队列绑定交换机channel.queueBind(FANOUT_QUEUE_1,FANOUT_EXCHANGE,"");channel.queueBind(FANOUT_QUEUE_2,FANOUT_EXCHANGE,"");for(int i=0;i<=10;i++){//发送消息String message = "你好,世界~ 发布订阅模式:"+i;/*** 参数1:交换机名称,如果没有指定则使用默认Default Exchage* 参数2:路由key,简单模式可以传递队列名称* 参数3:消息其他属性* 参数4:消息内容*/channel.basicPublish(FANOUT_EXCHANGE,"",null,message.getBytes());System.out.println("已发送消息:"+message);}//关闭资源channel.close();connection.close();}
}

3.2 消费者

消费者1

package com.siyi.ps;import com.rabbitmq.client.*;
import com.siyi.simple.ConnectionUtil;import java.io.IOException;public class Consumer1 {public static void main(String[] args) throws Exception {//创建连接Connection connection = ConnectionUtil.getConnection();//创建通道(频道)Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Producer.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);//声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其他参数*/channel.queueDeclare(Producer.FANOUT_QUEUE_1, true, false, false, null);//队列绑定交换机channel.queueBind(Producer.FANOUT_QUEUE_1,Producer.FANOUT_EXCHANGE,"");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel){/*** @param consumerTag 消息者标签,在channel.basicConsume时候可以指定* @param envelope 消息包的内容,可以从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* @param properties 属性信息* @param body 消息* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key为:"+envelope.getRoutingKey());//交换机System.out.println("交换机为:"+envelope.getExchange());//消息idSystem.out.println("消息id为:"+envelope.getDeliveryTag());//收到的消息System.out.println("消费者1-接收到的消息为:"+new String(body, "utf-8"));}};//监听消息/*** 参数1:队列名称* 参数2:是是否自动确认,设置为true为表示消息接收到自动想mq回复接收到了,*       mq接收到回复会删除消息,设置为false则需要手动确认。* 参数3:消息接收到后回调*/channel.basicConsume(Producer.FANOUT_QUEUE_1,false,consumer);}
}

消费者2

package com.siyi.ps;import com.rabbitmq.client.*;
import com.siyi.simple.ConnectionUtil;import java.io.IOException;public class Consumer2 {public static void main(String[] args) throws Exception {//创建连接Connection connection = ConnectionUtil.getConnection();//创建通道(频道)Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Producer.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);//声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其他参数*/channel.queueDeclare(Producer.FANOUT_QUEUE_2, true, false, false, null);//队列绑定交换机channel.queueBind(Producer.FANOUT_QUEUE_2,Producer.FANOUT_EXCHANGE,"");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel){/*** @param consumerTag 消息者标签,在channel.basicConsume时候可以指定* @param envelope 消息包的内容,可以从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* @param properties 属性信息* @param body 消息* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key为:"+envelope.getRoutingKey());//交换机System.out.println("交换机为:"+envelope.getExchange());//消息idSystem.out.println("消息id为:"+envelope.getDeliveryTag());//收到的消息System.out.println("消费者2-接收到的消息为:"+new String(body, "utf-8"));}};//监听消息/*** 参数1:队列名称* 参数2:是是否自动确认,设置为true为表示消息接收到自动想mq回复接收到了,*       mq接收到回复会删除消息,设置为false则需要手动确认。* 参数3:消息接收到后回调*/channel.basicConsume(Producer.FANOUT_QUEUE_2,false,consumer);}
}

3.3 测试

启动所有消费者,然后使用生产者发送消息;在每个消费者对应的控制台可以查看到生产者发送的所有消息;到达广播的效果。

在执行完测试代码后,其实到RabbitMQ的管理后台找到 Exchanges 选项卡,点击 fanout_exchange 的交换机,可以查看到如下的绑定:

4. 总结

交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。

发布订阅模式与工作队列模式的区别

  1. 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
  2. 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
  3. 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑定到默认的交换机 。

【RabbitMQ】基础三:发布与订阅模式(Publish/Subscribe)相关推荐

  1. RabbitMQ 入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe)

    发布/订阅 在上篇第二部分教程中,我们搭建了一个工作队列.每个任务之分发给一个工作者(worker).在本篇教程中,我们要做的之前完全不一样--分发一个消息给多个消费者(consumers).这种模式 ...

  2. RabbitMQ官方中文入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe)

    2019独角兽企业重金招聘Python工程师标准>>> 发布/订阅 在上篇教程中,我们搭建了一个工作队列.每个任务之分发给一个工作者(worker).在本篇教程中,我们要做的之前完全 ...

  3. AKKA 集群中的发布与订阅Distributed Publish Subscribe in Cluster

    Distributed Publish Subscribe in Cluster 基本定义 在单机环境下订阅与发布是很常用的,然而在集群环境是比较麻烦和不好实现的: AKKA已经提供了相应的实现,集群 ...

  4. 知方可补不足~SQL2008中的发布与订阅模式~续

    上一回介绍了如何在sql2008中建立一个数据库的发布者,今天来说一下如何建立一个订阅者,其实订阅者也是一个数据库,而这个数据库是和发布者的数据结构相同的库,它们之间通过SQL代理进行数据上的同步. ...

  5. RabbitMQ工作模式Publish/Subscribe发布订阅,test测试代码

    RabbitMQ有以下几种工作模式 : 1.Work queues  工作队列 2.Publish/Subscribe 发布订阅 3.Routing      路由 4.Topics        通 ...

  6. RabbitMQ入门:发布/订阅(Publish/Subscribe)

    在前面的两篇博客中 RabbitMQ入门:Hello RabbitMQ 代码实例 RabbitMQ入门:工作队列(Work Queue) 遇到的实例都是一个消息只发送给一个消费者(工作者),他们的消息 ...

  7. Publish/Subscribe 发布与订阅模式

    Publish/Subscribe 发布与订阅: 通过交换机来实现,一个生产者可以让不同队列的消费者同时得到消息 生产者: package Fanout; import com.rabbitmq.cl ...

  8. RabbitMQ消息队列:发布/订阅(Publish/Subscribe)

    2019独角兽企业重金招聘Python工程师标准>>> 前面我们把每个Message都是deliver到某个单一的Consumer.今天我们将了解如何把同一个Message deli ...

  9. redis基础教程 --发布与订阅

    redis 发布订阅 redis发布 与订阅是一种信息通信模式,发送者(pub)发送信息,订阅者(sub)接收信息 客户端订阅消息 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传 ...

最新文章

  1. Android 懒加载
  2. column 'XXXX' in field list is ambiguous
  3. python语音程序设计基础篇_【笔记】python自学笔记(基础篇)——字典操作
  4. python怎么导入时间-Python的import导入与时间
  5. 计算机语言低下限高上限,原神双雷阵容厉不厉害
  6. 大规模markpoint特效
  7. JSP的文件上传处理
  8. 服务器维护 测试化验加工费,测试化验加工费.PPT
  9. 明晚直播预告丨Oracle 19c避雷经验分享
  10. 计算机复试考研专业课,2018计算机考研专业课复试复习攻略
  11. oracle rac防护,Oracle RAC日常基本维护命令
  12. 成功解决 AttributeError: module ‘neat’ has no attribute ‘Config’解决方式
  13. tp3.2 访问地址url大小写及控制器多个单词组成时url
  14. WPF 框架prism代码笔记
  15. 一键免费下载外文文献的方式
  16. ibm刀片服务器虚拟化,刀片服务器内置虚拟化 IBM升级服务器
  17. 转盘抽奖小程序java_大转盘抽奖小程序版 转盘抽奖网页版
  18. 【5G手机漏接电话问题解决方式】
  19. css3 匀速运动的圆
  20. Istio的授权策略

热门文章

  1. php工程师需要掌握的知识体系
  2. finereport字段显示设置_FineReport基本用法
  3. H.264编码相关概念
  4. Android Binder 原理,android实战项目pdf
  5. 2023 全球人工智能开发者先锋大会—AI 人才学习赛rank1方案分享
  6. 对话软件大师Martin Fowler:进化型设计
  7. 用python制作日历
  8. 爬虫:b站(bilibili)电影《鹰猎长空》短评
  9. linux删除文件text命令行,使用 Linux 文件恢复工具
  10. windows环境安装BehaviorTree.CPP【基于vscode】