1.主题模式下

发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如:

"stock.usd.nyse","nyse.vmw"

"quick.orange.rabbit"等这些类型

当然这个单词列表最多不能超过 255 个字节

在这个规则列表中,其中有两个替换符是需要注意的,*(星号)可以代替一个单词,#(井号)可以替代零个或多个单词。

2.匹配案例

Q1-->绑定的是 中间带 orange 带 3 个单词的字符串(*.orange.*)

Q2-->绑定的是 最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)或者第一个单词是 lazy 的多个单词(lazy.#)

quick.orange.rabbit         被队列 Q1Q2 接收到

lazy.orange.elephant      被队列 Q1Q2 接收到

quick.orange.fox             被队列 Q1 接收到

lazy.brown.fox                 被队列 Q2 接收到

lazy.pink.rabbit                虽然满足两个绑定但只被队列 Q2 接收一次

quick.brown.fox               不匹配任何绑定不会被任何队列接收到会被丢弃

quick.orange.male.rabbit 是四个单词不匹配任何绑定会被丢弃

lazy.orange.male.rabbit   是四个单词但匹配 Q2

3.演示案例

生产者,匹配不同的routingKey

package org.example.Topic;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;/*** 生产者:发消息*/
public class Producer {public static final String EXCHANGE_NAME="topic";public static void main(String[] args) throws Exception{// 创建一个连接工厂ConnectionFactory factory=new ConnectionFactory();// 工厂ipfactory.setHost("112.124.16.34");factory.setUsername("admin");factory.setPassword("123456");factory.setPort(5672);// 创建连接Connection connection=factory.newConnection();// 获取信道Channel channel=connection.createChannel();Map<String,String> bindingKeyMap=new HashMap<>();bindingKeyMap.put("quick.orange.rabbit","被队列1和2接收到");bindingKeyMap.put("lazy.orange.elephant","被队列1和2接收到");bindingKeyMap.put("quick.orange.fox","被队列1接收到");bindingKeyMap.put("lazy.brown.fox","被队列2接收到");bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列2接收一次");bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到,会被丢弃");bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定,会被丢弃");bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配队列2");/*** 发布消息* 1.exchange   发送到哪一个交换机,不填则选择默认交换机* 2.routingKey 路由对于默认交换机,路由键就是目标队列名称* 3.其它参数信息* 4.发送的消息*/for(Map.Entry<String,String>bindingKeyEntry:bindingKeyMap.entrySet()){String routingKey=bindingKeyEntry.getKey();String message=bindingKeyEntry.getValue();channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8"));System.out.println("生产者发出消息:"+message);}// 关闭通道和连接channel.close();// ***关闭连接***connection.close();}
}

队列1

package org.example.Topic;import com.rabbitmq.client.*;
/*** 消费者1*/
public class Consumer01 {// 交换机的名称// 队列名称public static final String EXCHANGE_NAME="topic";public static final String QUEUE_NAME="queue_01";public static void main(String[] args) throws Exception{ConnectionFactory factory=new ConnectionFactory();factory.setHost("112.124.16.34");factory.setUsername("admin");factory.setPassword("123456");factory.setPort(5672);Connection connection= factory.newConnection();Channel channel=connection.createChannel();// 声明交换机,交换机名称,类型// 声明队列// 主题交换机和队列绑定channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);channel.queueDeclare(QUEUE_NAME,false,false,false,null);channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.orange.*");/*** 消费者消费* 1.消费的队列* 2.消费成功之后是否自动应答* 3.deliverCallback* 4.cancelCallback*/// 接收和处理消息的回调对象DeliverCallback deliverCallback=(consumerTag,message)->{String mes=new String(message.getBody(),"UTF-8");System.out.println("queue1—接收到消息:"+mes);};// 消费者取消时的回调对象CancelCallback cancelCallback=(consumerTag)->{System.out.println("消费被中断");};channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}

队列2

package org.example.Topic;import com.rabbitmq.client.*;import java.io.IOException;/*** 消费者2*/
public class Consumer02 {// 交换机的名称// 队列名称public static final String EXCHANGE_NAME="topic";public static final String QUEUE_NAME="queue_02";public static void main(String[] args) throws Exception{ConnectionFactory factory=new ConnectionFactory();factory.setHost("112.124.16.34");factory.setUsername("admin");factory.setPassword("123456");Connection connection= factory.newConnection();Channel channel=connection.createChannel();// 声明交换机: 交换机名称,类型// 声明队列// 主题交换机和队列绑定channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);channel.queueDeclare(QUEUE_NAME,false,false,false,null);channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.*.rabbit");channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"lazy.#");/*** 消费者消费* 1.消费的队列* 2.消费成功之后是否自动应答* 3.deliverCallback* 4.cancelCallback*/// 接收和处理消息的回调对象DeliverCallback deliverCallback=new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {String mes=new String(message.getBody(),"UTF-8");System.out.println("queue2—接收到消息:"+mes);}};// 消费者取消时的回调对象CancelCallback cancelCallback=new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {}};channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);channel.close();// ***关闭连接***connection.close();}
}

RabbitMQ六种工作模式—主题模式相关推荐

  1. 聊一聊RabbitMQ六种工作模式与应用场景

    简介 今天我们来聊一聊 RabbitMQ 的工作模式与其对于的应用场景有哪些. 你可能会疑惑,作为 MQ 不就是生产者将消息发送到 MQ ,再讲消息发送到消费者哪里,任务不就完成了吗? 其实,不是这样 ...

  2. RabbitMQ六种队列模式-主题模式

    前言 RabbitMQ六种队列模式-简单队列 RabbitMQ六种队列模式-工作队列 RabbitMQ六种队列模式-发布订阅 RabbitMQ六种队列模式-路由模式 RabbitMQ六种队列模式-主题 ...

  3. RabbitMQ六种工作模式:simple work publish routing topic rpc

    simple简单模式 消息产生着§将消息放入队列 消息的消费者(consumer) 监听(while) 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者 ...

  4. RabbitMQ六种工作模式

    Simple 模式 消息产生着将消息放入队列 消息的消费者 (consumer) 监听 (while) 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除 (隐患 消息可能没有被消费 ...

  5. RabbitMQ的三种模式-----主题模式(Topic)

    主题模式(Topic): 任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue 上 1:简述 如上图所示 此类交换器使得来自不同的源头的消息可以到达一 ...

  6. AMQ初级使用(队列模式+主题模式)

    队列模式 1.生产者代码: package com.zzf.jms.queue;import org.apache.activemq.ActiveMQConnectionFactory;import ...

  7. RabbitMQ六种工作模式01

    01: Work Queue工作队列模式 //接口所有的属性都是静态常量属性 public interface RabbitContent {//队列String QEUEU_HELLO = &quo ...

  8. 消息中间件的应用场景与 RabbitMQ的六种工作模式介绍

    消息中间件的应用场景与 RabbitMQ的六种工作模式介绍 消息中间件应用场景 异步处理 应用解耦 流量削峰 RabbitMQ的六种工作模式 简单模式 工作模式 发布订阅模式 路由模式 主题模式 PR ...

  9. 【转】RabbitMQ六种队列模式-5.主题模式

    前言 RabbitMQ六种队列模式-简单队列 RabbitMQ六种队列模式-工作队列 RabbitMQ六种队列模式-发布订阅 RabbitMQ六种队列模式-路由模式 RabbitMQ六种队列模式-主题 ...

最新文章

  1. jquery click 第一次没用_【通知】同济大学研究生会20202021学年第一次主席联席会...
  2. Apress水果大餐——移动开发
  3. execve函数的介绍与使用
  4. 【LeetCode】121.买卖股票的最佳时机
  5. python 培训-Python培训
  6. 用MySQL创建数据库和数据库表
  7. 12.11团队任务汇总
  8. 决策树 随机森林 xgboost_一文看懂随机森林-RandomForest(附4个构造步骤+4种实现方式评测+10个优缺点)...
  9. SQLAlchemy orm.query.Query
  10. linux迭代同步文件,Linux Shell——迭代循环
  11. linux 输出数据到csv,Linux-从外壳输出CSV文件
  12. python可以参加哪些竞赛_找出Python竞赛中可达到的分数的程序
  13. python下载官网-Python2.7.10
  14. 零基础必看的Html5+Css3+移动端前端教程(一)
  15. 基于LED或红外激光的可见光音频系统
  16. 你还在用截图工具,获取视频中的图片?
  17. DTD(文档类型定义)介绍
  18. Android FrameLayout的:layout_marginTop属性失效的问题
  19. Excel怎么实现图片查找功能?
  20. Android布局优化

热门文章

  1. JPA: Spring Data JPA @OneToMany 注解参数 orphanRemoval,一对多删除详解
  2. Random函数用法
  3. 浅谈Java与C++的关系和区别
  4. 【.NET】 DictionaryCloneable 字典类Dictionary复制
  5. 悦淘金:什么才是社交零售的真正逻辑?电商新逻辑
  6. Linux 命令(180)—— renice 命令
  7. Linux 7双网卡绑定
  8. 华为服务器怎么修改启动项,服务器启动项设置方法
  9. C# 無法載入 DLL ‘UsbDll.dll‘: 找不到指定的模組。 (發生例外狀況於 HRESULT: 0x8007007E)”
  10. Linux系统——Nginx反向代理与负载均衡