Python 操作 Rabbit MQ 发布/订阅 (五)

一、发布、订阅:

我们将一个消息分发给多个消费者,这种模式被称为发布/订阅

为了更好的理解这个模式,我们将构建一个日志系统,它包括两个程序:

  • 第一个程序,负责发送日志消息;
  • 第二个程序,负责获取消息并输出内容;

在日志系统中,所有正在运行的接收方程序都会接收消息;

  • 一个接受者,把日志写入硬盘中;
  • 另一个接受者,把日志输出到屏幕上;

最终,日志消息被广播给所有的接受者。

二、交换机(Exchanges):

概念:应用程序发送消息时,先把消息给交换机,由交换机投递给队列,而不是直接给队列。交换机可以由多个消息通道(Channel),用于投递消息。

简单概括下之前的知识

  • 发布者(Producer):是发布消息的应用程序。
  • 队列(Queue):用于消息存储的缓冲。
  • 消费者(Consumer):是接收消息的应用程序。

图解大体流程

  • P:代表是发布者;
  • X:是交换机;

详解图意:发布者(P )→交换机(X)→队列(Q)→消费者(C );

  • 交换机一边从发布者方接收消息,一边把消息推送到队列(Q)。交换机必须知道如何处理它接收到的消息,是推送到指定的队列、还是多个队列,或者是忽略消息。这些都是通过交换机类型(Exchange Type)来定义的。

交换机类型

1.直连交换机(Direct);

2.主题交换机(Topic);

3.头交换机(Headers);

4.扇形交换机(Fanout);

  • 主要说明—扇形交换,它把消息发送给它所知道的所有队列。

    channel.exchange_declare(exchange='fanout_logs',exchange_type='fanout')
    

参数讲解

  • exchange:就是交换机的名称,空字符串代表默认或者匿名交换机;

    channel.basic_publish(exchange='')
    
  • exchange_type:就是交换机的类型;

  • routing_key:分发到指定的队列;

  • body:发送的内容;

  • properties:使消息持久化;

查看交换器列表

命令:rabbitmqctl list_exchanges

Listing exchanges ...
amq.rabbitmq.log    topic
amq.direct  direct
amq.topic   topic
amq.headers headersdirect
amq.fanout  fanout
amq.rabbitmq.trace  topic
amq.match   headers

列表中以amq.*的开头的交换器,都是默认创建的,目前不需要管它们。

三、临时队列:

我们连接上Rabbit MQ的时候,需要一个全新的、空的队列(也就是说不使用之前提到的,routing_key参数指定的队列名),我们可以手动创建一个随机的队列名,或者让服务器为我们选择一个随机的队列名(推荐)。我们仅需要在调用queue_declare方法时,不提供queue参数即可:

# 在管道里, 不声明队列名称
result = channel.queue_declare()

可通过result.method.queue获取已经生成的随机队列名,大概的样子如下所示:

amq.gen-DIAODS2sDSAKJKS==

与消费者断开连接时,这个队列应被立即删除:

# 需要一个空的队列  exclusive=True 表示与消费者断开时, 队列立即删除
result = channel.queue_declare(exclusive=True)

四、绑定:

目前已经创建一个扇形交换机和一个队列。现在需要告诉交换机如果发送消息给队列。

交换机和队列之间的联系我们称为绑定(binding)

# 将fanount_logs交换机将会把消息添加到我们的队列中, 队列名服务器随机生成
channel.queue_bind(exchange='fanout_logs', queue=result.method.queue)

查看绑定列表

列出所有现存的绑定命令:rabbitmqctl list_bindings

五、整理本节最终代码:

图解最终流程

发布日志与之前的区别

1.我们把消息发送给fanout_logs交换机而不是匿名的交换机;

2.发送的时候需要提供routing_key参数,但它的值会被扇形交换机忽略;

以下是send.py

#!/usr/bin/python
# -*- coding: utf-8 -*-
import pika
import sysmessage = ' '.join(sys.argv[1:]) or "Hello World!"# 创建一个实例  本地访问IP地址可以为 localhost 后面5672是端口地址(可以不用指
# 定, 因为默认就是5672)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672))# 声明一个管道, 在管道里发送消息
channel = connection.channel()# 指定交换机的类型为fanout, 执行交换机名:fanout_logs
channel.exchange_declare(exchange='fanout_logs', exchange_type='fanout')# 投递消息 exchange='fancout_logs'交换机的名命; type='fanout':扇形交换机
channel.basic_publish(exchange='fanout_logs',routing_key='',body=message)print "[x] sent {}".format(message, )
# 队列关闭
connection.close()

若没有绑定队列的交换器,消息将会丢失。以下是receive.py

#!/usr/bin/python
# -*- coding: utf-8 -*-import pika# 创建实例
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))# 声明管道
channel = connection.channel()# 指定交换机名为 fanout_logs 类型为扇形
channel.exchange_declare(exchange='fanout_logs', exchange_type='fanout')# 表示与消费者断开连接, 队列立即删除
result = channel.queue_declare(queue='', exclusive=True)# 生成队列的名字
queue_name = result.method.queue# 绑定交换机和队列
channel.queue_bind(exchange='fanout_logs', queue=queue_name)def callback(ch, method, properties, body):print '[X] Received{}'.format(body,)# 消费消息
channel.basic_consume(queue=queue_name,  # 从指定的消息队列中接收消息on_message_callback=callback,  # 如果收到消息, 就调用callback函数来处理)print('=======正在等待消息========')
channel.start_consuming()  # 开始消费消息

3.如果想把日志保存到文件中,打开控制台输入:

python receive.py > logs_from_rabbit.log

4.在屏幕中查看日志,在打开一个新的终端运行:

python receive.py
=======正在等待消息========

5.发送消息:

python send.py 发送第一条消息

6.可以看到消费者接收到了消息,并且日志中也记录了这条消息。

cat logs_from_rabbit.log
=======正在等待消息========
[X] Received发送第一条消息

7.确认已经创建的队列绑定:

rabbitmqctl list_bindings
Listing bindings ...exchange    amq.gen-Di2rIkS1kQWcMODPxF5KuA  queue   amq.gen-Di2rIkS1kQWcMODPxF5KuA  []exchange  hello   queue   hello   []exchange  task_queue  queue   task_queue  []
fanout_logs exchange    amq.gen-Di2rIkS1kQWcMODPxF5KuA  queue   amq.gen-Di2rIkS1kQWcMODPxF5KuA  []

交换器fanout_logs把数据发送给两个系统名命的队列

Python 操作 Rabbit MQ 发布/订阅 (五)相关推荐

  1. Python操作Rabbit MQ的5种模式

    python版本:   2.7.14 一 消息生产者代码: 1 # -*- coding: utf-8 -*- 2 3 import json 4 import pika 5 import urlli ...

  2. Redis哨兵机制以及发布订阅

    Redis哨兵机制 1 哨兵Sentinel机制 2 哨兵架构原理 3 搭建哨兵架构 4 通过springboot操作哨兵 Redis发布订阅 1 哨兵Sentinel机制 Sentinel(哨兵)是 ...

  3. Redis-13Redis发布订阅

    文章目录 概述 消息多播 PubSub发布者订阅者模型 客户端操作 Spring配置发布订阅模式 pubsub不足之处 代码 概述 当使用银行卡消费的时候,银行往往会通过微信.短信或邮件通知用户这笔交 ...

  4. JavaScript的发布订阅模式

    这里要说明一下什么是发布-订阅模式. 发布-订阅模式里面包含了三个模块,发布者,订阅者和处理中心.这里处理中心相当于报刊办事大厅. 发布者相当与某个杂志负责人,他来中心这注册一个的杂志,而订阅者相当于 ...

  5. 基于MQTT的消息发布订阅python实现

    简介: MQTT 全称为 Message Queuing Telemetry Transport(消息队列遥测传输)是一种基于发布/订阅范式的"轻量级"消息协议.该协议构建于TCP ...

  6. java redis mq_redis之mq实现发布订阅模式

    概述 Redis不仅可作为缓存服务器,还可用作消息队列,本示例演示如何使用redis实现发布/订阅消息队列. 在Redis中,发布者没有将消息发送给特定订阅者的程序.相反,发布的消息被描述为通道,而不 ...

  7. redis之mq实现发布订阅模式

    https://github.com/smltq/spring-boot-demo/blob/master/mq-redis 概述 Redis不仅可作为缓存服务器,还可用作消息队列,本示例演示如何使用 ...

  8. mqtt server python_使用python实现mqtt的发布和订阅

    需要安装的python库 使用python编写程序进行测试MQTT的发布和订阅功能.首先要安装:pip install paho-mqtt 测试发布(pub) 我的MQTT部署在阿里云的服务器上面,所 ...

  9. 五、交换机 与 发布/订阅模式、路由模式、主题模式

    RabbitMQ目录 文章目录 交换机 与 发布/订阅模式.路由模式.主题模式 1.Exchanges(交换机) 1.1. Exchanges 概念 1.2. Exchanges 的类型 1.3. 无 ...

最新文章

  1. 春运渡劫!Python给我抢回家的火车票
  2. 本科-人工智能复习题
  3. SSL certificate problem: unable to get local issuer certificate
  4. 转译:Oracle 中 Object_iD 和 Data_Object_ID 的区别
  5. console linux 口 没输出_Console很飒,不止log
  6. Elasticsearch filter和query的不同
  7. 景安服务器域名解析不起作用的正确解析方式之一
  8. 教你一招最屌的阅读开源项目的姿势
  9. html5页面制作成品,dw网页设计成品代码
  10. shift键计算机功能,电脑shift键常用快捷键使用攻略
  11. android 读取本地超大图片
  12. wmp12功能打不开_使用Windows Media Player Plus向WMP添加新功能
  13. 申请美国大学计算机专业,美国TOP20大学计算机专业申请建议
  14. Vue命令式弹窗组件如何实现?我很好奇
  15. 同事间一些搞笑的事情
  16. bugku_crypto_这不是摩斯密码
  17. 云场景实践研究第40期:网聚宝
  18. 腾讯安全发布iOA SaaS版产品,为企业提供安全访问服务
  19. 按汉字首字母排序(sql语句) 只为收集,本人复制粘贴水寒冰局的
  20. 西邮linux兴趣小组2019-2021三年纳新试题浅析

热门文章

  1. java-php-python-ssm-预约健身私教网站-计算机毕业设计
  2. sql判断整除_判断整除
  3. 安卓手机浏览器html上传图片多选,安卓手机浏览器html上传多个图片
  4. 蚊帐上有两个破洞怎么办
  5. HTML5期末大作业:城市旅游网站设计——城市旅游-北京(5页) HTML+CSS+JavaScript 主题度假酒店 计划出行北京
  6. Win7下Android开发环境的搭建(更新于2015/3)
  7. 一篇文章搞定java中的垃圾回收机制面试题
  8. WordPress函数:query_posts(查询文章)
  9. selenium 常见面试题以及答案
  10. 微信小程序输入框键盘显示时,上推固定在底部的按钮