<===  RabbitMQ消息队列(三):任务分发机制

上篇文章中,我们把每个Message都是deliver到某个Consumer。在这篇文章中,我们将会将同一个Message deliver到多个Consumer中。这个模式也被成为 "publish / subscribe"。
    这篇文章中,我们将创建一个日志系统,它包含两个部分:第一个部分是发出log(Producer),第二个部分接收到并打印(Consumer)。 我们将构建两个Consumer,第一个将log写到物理磁盘上;第二个将log输出的屏幕。

1. Exchanges

关于exchange的概念在《RabbitMQ消息队列(一): Detailed Introduction 详细介绍》中有详细介绍。现在做一下简单的回顾。

RabbitMQ 的Messaging Model就是Producer并不会直接发送Message到queue。实际上,Producer并不知道它发送的Message是否已经到达queue。

Producer发送的Message实际上是发到了Exchange中。它的功能也很简单:从Producer接收Message,然后投递到queue中。Exchange需要知道如何处理Message,是把它放到那个queue中,还是放到多个queue中?这个rule是通过Exchange 的类型定义的。

我们知道有三种类型的Exchange:direct, topic 和fanout。fanout就是广播模式,会将所有的Message都放到它所知道的queue中。创建一个名字为logs,类型为fanout的Exchange:

channel.exchange_declare(exchange='logs',type='fanout')

Listing exchanges

通过rabbitmqctl可以列出当前所有的Exchange:

$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
logs      fanout
amq.direct      direct
amq.topic       topic
amq.fanout      fanout
amq.headers     headers
...done.

注意 amq.* exchanges 和the default (unnamed)exchange是RabbitMQ默认创建的。

现在我们可以通过exchange,而不是routing_key来publish Message了:

channel.basic_publish(exchange='logs',routing_key='',body=message)

2. Temporary queues

截至现在,我们用的queue都是有名字的:第一个是hello,第二个是task_queue。使用有名字的queue,使得在Producer和Consumer之前共享queue成为可能。

但是对于我们将要构建的日志系统,并不需要有名字的queue。我们希望得到所有的log,而不是它们中间的一部分。而且我们只对当前的log感兴趣。为了实现这个目标,我们需要两件事情:
    1) 每当Consumer连接时,我们需要一个新的,空的queue。因为我们不对老的log感兴趣。幸运的是,如果在声明queue时不指定名字,那么RabbitMQ会随机为我们选择这个名字。方法:

result = channel.queue_declare()

通过 result.method.queue 可以取得queue的名字。基本上都是这个样子: amq.gen-JzTY20BRgKO-HjmUJj0wLg。
    2)当Consumer关闭连接时,这个queue要被deleted。可以加个 exclusive的参数。方法:

result = channel.queue_declare(exclusive=True)

3. Bindings绑定

现在我们已经创建了fanout类型的exchange和没有名字的queue(实际上是RabbitMQ帮我们取了名字)。那exchange怎么样知道它的Message发送到哪个queue呢?答案就是通过bindings:绑定。

方法:

channel.queue_bind(exchange='logs',queue=result.method.queue)

现在logs的exchange就将它的Message附加到我们创建的queue了。 Listing bindings

使用命令rabbitmqctl list_bindings。

4. 最终版本

我们最终实现的数据流图如下:

Producer,在这里就是产生log的program,基本上和前几个都差不多。最主要的区别就是publish通过了exchange而不是routing_key。

emit_log.py script:

#!/usr/bin/env python
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.exchange_declare(exchange='logs',type='fanout')message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',routing_key='',body=message)
print " [x] Sent %r" % (message,)
connection.close()

还有一点要注意的是我们声明了exchange。publish到一个不存在的exchange是被禁止的。如果没有queue bindings exchange的话,log是被丢弃的。
Consumer:receive_logs.py:

#!/usr/bin/env python
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.exchange_declare(exchange='logs',type='fanout')result = channel.queue_declare(exclusive=True)
queue_name = result.method.queuechannel.queue_bind(exchange='logs',queue=queue_name)print ' [*] Waiting for logs. To exit press CTRL+C'def callback(ch, method, properties, body):print " [x] %r" % (body,)channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming()

我们开始不是说需要两个Consumer吗?一个负责记录到文件;一个负责打印到屏幕?
其实用重定向就可以了,当然你想修改callback自己写文件也行。我们使用重定向的方法:
We're done. If you want to save logs to a file, just open a console and type:

$ python receive_logs.py > logs_from_rabbit.log

Consumer2:打印到屏幕:

$ python receive_logs.py

接下来,Producer:

$ python emit_log.py

使用命令rabbitmqctl list_bindings你可以看我们创建的queue。
一个output:

$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
...done.

这个结果还是很好理解的。

尊重原创,转载请注明出处 anzhsoft: http://blog.csdn.net/anzhsoft/article/details/19617305

参考资料:

1. http://www.rabbitmq.com/tutorials/tutorial-three-python.html

RabbitMQ消息队列(四):分发到多Consumer(Publish/Subscribe)相关推荐

  1. php中rabbitmq消息乱码,PHP实现RabbitMQ消息队列(转)

    本篇文章给大家带来的内容是关于PHP和RabbitMQ实现消息队列的完整代码,有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助. 先安装PHP对应的RabbitMQ,这里用的是 php_a ...

  2. (四)RabbitMQ消息队列-服务详细配置与日常监控管理

    (四)RabbitMQ消息队列-服务详细配置与日常监控管理 原文:(四)RabbitMQ消息队列-服务详细配置与日常监控管理 RabbitMQ服务管理 启动服务:rabbitmq-server -de ...

  3. 消息队列——RabbitMQ消息队列集群

    RabbitMQ消息队列集群 消息队列/中间件 RabbitMQ详解 RabbitMQ单机部署 RabbitMQ集群部署 消息队列/中间件 一.前言 在我们秒杀抢购商品的时候,系统会提醒我们稍等排队中 ...

  4. RabbitMQ消息队列常见面试题总结

    1.什么是消息队列: 1.1.消息队列的优点: (1)解耦:将系统按照不同的业务功能拆分出来,消息生产者只管把消息发布到 MQ 中而不用管谁来取,消息消费者只管从 MQ 中取消息而不管是谁发布的.消息 ...

  5. RabbitMQ消息队列(一)《Java-2021面试谈资系列》

    RabbitMQ RabbitMQ消息队列 一.中间件 1.什么是中间件 2.中间件技术及架构概述 3.消息中间件 1.消息中间件的分布式架构 2.消息中间件使用场景 3.常见的消息中间件 4.消息中 ...

  6. 大数据互联网架构阶段 QuartZ定时任务+RabbitMQ消息队列

    QuartZ定时任务+RabbitMQ消息队列 一 .QuartZ定时任务解决订单系统遗留问题 情景分析: 在电商项目中 , 订单生成后 , 数据库商品数量-1 , 但是用户迟迟不进行支付操作 , 这 ...

  7. SpringBoot整合RabbitMQ消息队列

    RabbitMQ 一.RabbitMQ介绍 1.1 现存问题 服务调用:两个服务调用时,我们可以通过传统的HTTP方式,让服务A直接去调用服务B的接口,但是这种方式是同步的方式,虽然可以采用Sprin ...

  8. RabbitMQ学习系列二:.net 环境下 C#代码使用 RabbitMQ 消息队列

    上一篇已经讲了Rabbitmq如何在Windows平台安装,不懂请移步:RabbitMQ学习系列一:windows下安装RabbitMQ服务 一.理论: .net环境下,C#代码调用RabbitMQ消 ...

  9. RabbitMQ消息队列(七):适用于云计算集群的远程调用(RPC)

    在云计算环境中,很多时候需要用它其他机器的计算资源,我们有可能会在接收到Message进行处理时,会把一部分计算任务分配到其他节点来完成.那么,RabbitMQ如何使用RPC呢?在本篇文章中,我们将会 ...

最新文章

  1. ssl2331OJ1373-鱼塘钓鱼 之3【dp】
  2. 使用anconada 的conda更换环境
  3. pt100温度传感器c语言,pt100测温程序-LCD1602
  4. vscode的eslint无效_VSCode配置eslint
  5. 近一周学习之-----npm换源工具之nrm
  6. WIFI安全测试之WPS(PIN)加密暴力破解
  7. 服务器网卡无法开启lldp协议,lldp支持-VMware vSphere - 思科华为论坛
  8. 打卡国潮新风尚@东莞隐贤山庄#玩转主题乐园#
  9. rk3399 android 9.0 skype强制切换到前置摄像头
  10. 工厂模式,java描述
  11. 【博客之星】坚持,是一种信仰
  12. Python_pandas读取数据
  13. js处理时间戳为各种格式/js判断公历/农历/周历节日和节气
  14. OnCreate()与PreCreateWindow()
  15. 计算机科学四个名校,计算机科学CS四大名校来啦
  16. ceph(ceph是什么意思)
  17. springboot丽江市旅游服务网站的设计与实现.rar(论文+项目源码)
  18. Sketch:无损放大像素画
  19. 混合像元分解研究综述——线性混合像元分解算法
  20. CIO要顶住词汇爆炸的冲击(转)

热门文章

  1. python小游戏“植物大战僵尸”
  2. 项目练习:构建读写分离的数据库集群
  3. 宏杉科技针对传统RAID缺陷推CRAID 3.0
  4. C# Chart折线图使用鼠标滚轮放大、缩小和平移曲线方式
  5. 如何让图片无损放大,保持清晰度
  6. cesium之家总目录
  7. c语言天气预报系统设计,基于Android的手机天气预报系统(毕业论文).doc
  8. (续)关于文科思维和理科思维
  9. 网络信息通信的安全问题以及解决方法
  10. ssm+jsp计算机毕业设计超市Pos收银管理系统1y6h3(程序+LW+源码+远程部署)