1.下载

https://www.rabbitmq.com/community-plugins.html

wget https://dl.bintray.com/rabbitmq/community-plugins/3.6.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171215-3.6.x.zip

下载后解压,并将其拷贝至(使用Linux Debian/RPM部署)rabbitmq服务器目录:/usr/local/rabbitmq/plugins中( windows安装目录\rabbitmq_server-version\plugins ).

/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.12/plugins

2.解压

unzip rabbitmq_delayed_message_exchange-20171215-3.6.x.zip

3.启用插件

使用命令rabbitmq-plugins enable rabbitmq_delayed_message_exchang启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchang

输出如下:

The following plugins have been enabled:rabbitmq_delayed_message_exchange

通过rabbitmq-plugins list查看已安装列表,如下:

Configured: E = explicitly enabled; e = implicitly enabled| Status:   * = running on rabbit@n1|/
[e*] amqp_client                       3.6.12
[e*] cowboy                            1.0.4
[e*] cowlib                            1.0.2
[  ] rabbitmq_amqp1_0                  3.6.12
[  ] rabbitmq_auth_backend_ldap        3.6.12
[  ] rabbitmq_auth_mechanism_ssl       3.6.12
[  ] rabbitmq_consistent_hash_exchange 3.6.12
[E*] rabbitmq_delayed_message_exchange 20171215-3.6.x
[  ] rabbitmq_event_exchange           3.6.12
[  ] rabbitmq_federation               3.6.12
[  ] rabbitmq_federation_management    3.6.12
[  ] rabbitmq_jms_topic_exchange       3.6.12
[E*] rabbitmq_management               3.6.12
[e*] rabbitmq_management_agent         3.6.12
[  ] rabbitmq_management_visualiser    3.6.12
[  ] rabbitmq_mqtt                     3.6.12
[  ] rabbitmq_recent_history_exchange  3.6.12
[  ] rabbitmq_sharding                 3.6.12
[  ] rabbitmq_shovel                   3.6.12
[  ] rabbitmq_shovel_management        3.6.12
[  ] rabbitmq_stomp                    3.6.12
[  ] rabbitmq_top                      3.6.12
[E*] rabbitmq_tracing                  3.6.12
[  ] rabbitmq_trust_store              3.6.12
[e*] rabbitmq_web_dispatch             3.6.12
[  ] rabbitmq_web_mqtt                 3.6.12
[  ] rabbitmq_web_mqtt_examples        3.6.12
[  ] rabbitmq_web_stomp                3.6.12
[  ] rabbitmq_web_stomp_examples       3.6.12
[  ] sockjs                            0.3.4

测试代码:

Producer:

import java.util.Date;
import java.util.HashMap;
import java.util.Map;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer {public static void main(String[] args) throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setVirtualHost("/");connectionFactory.setHost("172.31.1.135");connectionFactory.setUsername("xx");connectionFactory.setPassword("xx");connectionFactory.setPort(5672);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchangeName = "delay-exchange";String routingkey = "delay.delay";String queueName = "delay_queueName";//x-delayed-message 声明Map<String,Object> map =new HashMap<>();map.put("x-delayed-type", "direct");channel.exchangeDeclare(exchangeName, "x-delayed-message", true, false, map);//注意:arguments需要声明在队列上,声明在交换机上是不会起作用的。channel.queueDeclare(queueName, true, false, false, map);channel.queueBind(queueName,exchangeName,routingkey);for (int i = 0; i < 3; i++) {// deliveryMode=2 持久化,expiration 消息有效时间String msg = "delayed payload".getBytes("UTF-8") +" "+new Date().getTime();byte[] messageBodyBytes = msg.getBytes();Map<String, Object> headers = new HashMap<String, Object>();headers.put("x-delay", 50000);AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);channel.basicPublish(exchangeName, routingkey, props.build(), messageBodyBytes);}}
}

Consumer:

import java.io.IOException;
import java.util.Date;import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;public class Consumer {public static void main(String[] args) throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setVirtualHost("/");connectionFactory.setHost("172.31.1.135");connectionFactory.setPort(5672);connectionFactory.setUsername("xxx");connectionFactory.setPassword("xxx");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String queueName = "delay_queueName";channel.queueDeclare(queueName,true,false,false,null);channel.basicConsume(queueName, false, "myConsumer Tag", new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {String routingKey = envelope.getRoutingKey();String convernType = properties.getContentType();long deliveryTag = envelope.getDeliveryTag();System.out.println("routingKey:"+routingKey+",convernType:"+convernType+",deliveryTag:"+deliveryTag+",Msg body:"+new String(body)+ " "+new Date().getTime());channel.basicAck(deliveryTag, false);}});}
}

执行结果

routingKey:delay.delay,convernType:null,deliveryTag:13,Msg body:[B@387c703b 1575280086578 1575280136587
routingKey:delay.delay,convernType:null,deliveryTag:14,Msg body:[B@50cbc42f 1575280086582 1575280136587
routingKey:delay.delay,convernType:null,deliveryTag:15,Msg body:[B@75412c2f 1575280086582 1575280136587
routingKey:delay.delay,convernType:null,deliveryTag:16,Msg body:[B@387c703b 1575280078869 1575280178875
routingKey:delay.delay,convernType:null,deliveryTag:17,Msg body:[B@50cbc42f 1575280078870 1575280178875
routingKey:delay.delay,convernType:null,deliveryTag:18,Msg body:[B@75412c2f 1575280078871 1575280178875

Note:使用rabbitmq-delayed-message-exchange插件时发送到队列的消息数量在web管理界面可能不可见,不影响正常功能使用

Note :使用过程中发现,当一台启用了rabbitmq-delayed-message-exchange插件的RAM节点在重启的时候会无法启动,查看日志发现了一个Timeout异常,开发者解释说这是节点在启动过程会同步集群相关数据造成启动超时,并建议不要使用Ram节点

插件开发者: 
RAM nodes start blank and need a disk node to sync tables from. In this case it times out.

More importantly, you don’t need RAM nodes. If you’re not sure if you do, you certainly don’t, as don’t 99% of users.

报错信息如下

Error description:{could_not_start,rabbit,{{case_clause,{timeout,[rabbit_delayed_messagerabbit@n2,rabbit_delayed_messagerabbit@n2_index]}},[{rabbit_boot_steps,'-run_step/2-lc$^1/1-1-',1,[{file,"src/rabbit_boot_steps.erl"},{line,49}]},{rabbit_boot_steps,run_step,2,[{file,"src/rabbit_boot_steps.erl"},{line,49}]},{rabbit_boot_steps,'-run_boot_steps/1-lc$^0/1-0-',1,[{file,"src/rabbit_boot_steps.erl"},{line,26}]},{rabbit_boot_steps,run_boot_steps,1,[{file,"src/rabbit_boot_steps.erl"},{line,26}]},{rabbit,start,2,[{file,"src/rabbit.erl"},{line,733}]},{application_master,start_it_old,4,[{file,"application_master.erl"},{line,273}]}]}}Log files (may contain more information):/var/log/rabbitmq/rabbit@n2.log/var/log/rabbitmq/rabbit@n2-sasl.logError: {could_not_start,rabbit,{{case_clause,{timeout,[rabbit_delayed_messagerabbit@n2,rabbit_delayed_messagerabbit@n2_index]}},[{rabbit_boot_steps,'-run_step/2-lc$^1/1-1-',1,[{file,"src/rabbit_boot_steps.erl"},{line,49}]},{rabbit_boot_steps,run_step,2,[{file,"src/rabbit_boot_steps.erl"},{line,49}]},{rabbit_boot_steps,'-run_boot_steps/1-lc$^0/1-0-',1,[{file,"src/rabbit_boot_steps.erl"},{line,26}]},{rabbit_boot_steps,run_boot_steps,1,[{file,"src/rabbit_boot_steps.erl"},{line,26}]},{rabbit,start,2,[{file,"src/rabbit.erl"},{line,733}]},{application_master,start_it_old,4,[{file,"application_master.erl"},{line,273}]}]}}

RAM

rabbitmqctl join_cluster rabbit@n1 --ram

root@n2 plugins]# rabbitmqctl stop_app
Stopping rabbit application on node rabbit@n2
[root@n2 plugins]# rabbitmqctl reset
Resetting node rabbit@n2
[root@n2 plugins]# rabbitmqctl join_cluster rabbit@n1
Clustering node rabbit@n2 with rabbit@n1
[root@n2 plugins]# rabbitmqctl start_app
Starting node rabbit@n2
[root@n2 plugins]# 

## 提示,如果 安装插件,导致集群崩溃

一般是要将 RAM -DISK 的那个节点, reset 后,重新加入集群即可,但是这样,会造成消息丢失

节点2
1.rabbitmqctl stop_app
2.rabbitmqctl reset
3.节点1执行 rabbitmqctl forget_cluster_node rabbit@rbmq92
4.rabbitmqctl join_cluster rabbit@rbmq91
5.rabbitmqctl change_cluster_node_type disk
6.rabbitmqctl start_app

参考资料

1.https://www.cnblogs.com/-mrl/p/11114116.html

2.https://blog.csdn.net/youjin/article/details/82586888

3.https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/

4.https://blog.csdn.net/skiof007/article/details/80914318

5.https://www.rabbitmq.com/community-plugins.html

1. 进入docker容器内 docker exec  -t rabbit  bash
2. rabbitmq-plugins list 命令查看已安装插件
3. 在插件网址找到延迟插件的下载地址 http://www.rabbitmq.com/community-plugins.html
4. exit 退出容器到宿主机中,下载插件: wget  https://dl.bintray.com/rabbitmq/community-plugins/3.7.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171201-3.7.x.zip
5. 解压 unzip XXX.zip -d .
6. 拷贝至docker容器内: docker cp xxx.xz rabbit:/plugins
7. 再次进入docker容器内: 进入docker容器内 docker exec  -t rabbit  bash
8. 执行命令让插件生效: 启动延时插件:rabbitmq-plugins enable rabbitmq_delayed_message_exchange

rabbitmq-delayed-message-exchange相关推荐

  1. Delayed Message 插件实现 RabbitMQ 延迟队列

    延迟队列是为了存放那些延迟执行的消息,待消息过期之后消费端从队列里拿出来执行. DLX + TTL 方式存在的时序问题 对于延迟队列不管是 AMQP 协议或者 RabbitMQ 本身是不支持的,之前有 ...

  2. RabbitMQ安装Delayed Message 插件

    安装Delayed Message 插件 在官网:https://www.rabbitmq.com/community-plugins.html 点击: 下载好之后就是一个解压好的文件: 然后在将这个 ...

  3. RabbitMQ - 4种Exchange类型

    在rabbitmq中,exchange有4个类型:direct,topic,fanout,header. direct exchange 此类型的exchange路由规则很简单: exchange在和 ...

  4. RabbitMQ 四种Exchange

    AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列.生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机.先由Exchange来接收,然后Exchang ...

  5. RabbitMQ三种Exchange模式(fanout,direct,topic)的性能比较

    一.Direct Exchange 任何发送到Direct Exchange的消息都会被转发到RouteKey中指定的Queue. 1.一般情况可以使用rabbitMQ自带的Exchange:&quo ...

  6. (转)RabbitMQ学习之exchange总结

    http://blog.csdn.net/zhu_tianwei/article/details/53969674 前面介绍了几类exchange的作用,这个总结一下:  direct:消息会被推送至 ...

  7. RabbitMQ三种Exchange模式(fanout,direct,topic)的性能比较(转)

    RabbitMQ中,所有生产者提交的消息都由Exchange来接受,然后Exchange按照特定的策略转发到Queue进行存储 RabbitMQ提供了四种Exchange:fanout,direct, ...

  8. RabbitMQ之Federation Exchange、Federation Queue、Shovel

    文章目录 1.Federation Exchange(联邦交换机) 1.1 为什么使用联邦交换机 1.2 搭建步骤 1.2.1 需要保证每台节点单独运行 1.2.2 在每台机器上开启federatio ...

  9. Kafka 六战 RabbitMQ,这差距还不够明显吗?

    经常有人问我 有个 xx 需求,我应该用 Kafka 还是 RabbitMQ ? 这个问题很常见,而且很多人对二者的选择也把握不好. 所以我决定写篇文章来详细说一下:Kafka 和 RabbitMQ ...

  10. 懵了,Kafka、RabbitMQ到底选哪个?

    经常有人问我 有个 xx 需求,我应该用 Kafka 还是 RabbitMQ ? 这个问题很常见,而且很多人对二者的选择也把握不好. 所以我决定写篇文章来详细说一下:Kafka 和 RabbitMQ ...

最新文章

  1. Nginx如何限流?
  2. 网页连接不上java服务端,用Java插入IP时无法连接到服务器
  3. ustc小道消息20220122
  4. jvm(2)-java内存区域
  5. 计算机二级目录设置,word2设置标题格式,生成目录,奇偶页设置等等,适用考计算机二级办公软件,也适用于毕业论文格式设置...
  6. Redis基础-下载安装配置
  7. Mint-UI 移动首页开发 - header导航、banner轮播图
  8. 苹果手机打不开html文件,苹果手机描述文件打不开怎么办
  9. 演化博弈及Python实现
  10. 五个比SCI-Hub还牛的下载文献方法,教你全网免费下载外文文献。
  11. 如何用微信建立打卡小程序(做打卡签到小程序方法)
  12. Python给pdf制定权限加密
  13. python实现微信发红包
  14. 第一只python小爬虫
  15. 滴滴Logi-KafkaManager开源之路:一站式Kafka集群指标监控与运维管控平台
  16. 洛谷——P3906 Geodetic集合
  17. 计算机科学与技术有意义吗,各位大大,我是一名小二本的计算机科学与技术专业学生,我想问我有必要去考研吗?...
  18. Linux下xmind下载安装
  19. Ubuntu18.04安装中国版火狐
  20. C++1——控制语句章节

热门文章

  1. 请给我写信——教你使用简洁漂亮的网易邮件接口
  2. 常量池详解(含栈、堆、方法区简析)
  3. java中sleep方法_Java中sleep方法和wait的详细区别
  4. SVN下载,安装,汉化,卸载,及idea配置SVN环境
  5. 李飞飞高徒Andrej Karpathy为大家答疑解惑
  6. 中国人的区块链技术闯入国际学术顶会
  7. HTML DOM 事件 —— 鼠标事件 JS鼠标事件
  8. impress.js
  9. 把印章系统数据库注册到cwyy实例上的恢复目录
  10. 英雄联盟怎么窗口化 lol窗口模式怎么移动缩小