目录

开发步骤

引入client

生产者

消费者

应用场景

简单队列

工作队列

发布/订阅

路由模式

topic模式

rpc模式

发布确认


开发步骤

引入client

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.2.0</version>
</dependency>

生产者

1、引入类

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

2、创建Connection

 ConnectionFactory factory = new ConnectionFactory();// 设置服务地址factory.setHost("127.0.0.1");// 端口factory.setPort(5672);// vhostfactory.setVirtualHost("/vhost_test");// 用户名factory.setUsername("admin");// 密码factory.setPassword("123456");Connection connection = factory.newConnection();

3、创建Channel

Channel channel = connection.createChannel()

channel设置:

4、声明交换器、队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

exchange声明:

DeclareOk exchangeDeclare​(String exchange, String type) throws IOException;
DeclareOk exchangeDeclare​(String exchange, BuiltinExchangeType type) throws IOException;
DeclareOk exchangeDeclare​(String exchange, String type, boolean durable) throws IOException;
DeclareOk exchangeDeclare​(String exchange, BuiltinExchangeType type, boolean durable) throws IOException;
DeclareOk exchangeDeclare​(String exchange, String type, boolean durable, boolean autoDelete, Map<String,​Object> arguments) throws IOException;
DeclareOk exchangeDeclare​(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, Map<String,​Object> arguments) throws IOException;
DeclareOk exchangeDeclare​(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String,​Object> arguments) throws IOException;
DeclareOk exchangeDeclare​(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String,​Object> arguments) throws IOException;

参数说明:

  • exchange:exchange名称

  • type:exchange类型,BuiltinExchangeType 枚举类型包括:fanout,direct,topic,header。

  • durable:是否持久化

  • internal:是否内部exchange,生产者不能直接发布到内部exchange。

  • arguments:其他参数,用于构造exchange。

队列声明:

DeclareOk queueDeclare() throws IOException;
DeclareOk queueDeclare​(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String,​Object> arguments) throws IOException;

参数说明:

  • queue:队列名称
  • durable:是否持久化
  • exclusive:是否私有,仅当前程序可访问。
  • autoDelete:当最后一个消费者取消订阅后,自动删除。
  • arguments:其他参数,创建queue时使用。

5、发送消息

String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
void basicPublish​(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) throws IOException;
void basicPublish​(String exchange, String routingKey, boolean mandatory, AMQP.BasicProperties props, byte[] body) throws IOException;
void basicPublish​(String exchange, String routingKey, boolean mandatory, boolean immediate, AMQP.BasicProperties props, byte[] body) throws IOException;

参数说明:

  • exchange:
  • routingKey:
  • props:消息属性
  • body:消息体的byte[]格式。
  • mandatory:当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返回给生产者(Basic.Return + Content-Header + Content-Body);当mandatory设置为false时,出现上述情形broker会直接将消息扔掉。
  • immediate:当immediate标志位设置为true时,如果exchange在将消息路由到queue(s)时发现对于的queue上么有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或者多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。

mandatory标志告诉服务器至少将该消息route到一个队列中,否则将消息返还给生产者;immediate标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,如果所有queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。

在RabbitMQ3.0以后的版本里,去掉了immediate参数的支持,发送带immediate标记的publish会返回如下错误:
“{amqp_error,not_implemented,“immediate=true”,‘basic.publish’}”,immediate标记会影响镜像队列性能,增加代码复杂性,并建议采用“TTL”和“DLX”等方式替代。

消费者

1、引入类

同生产者

2、创建Connection

同生产者

3、创建Channel

同生产者

4、声明交换器,队列

同生产者

5、构造Consumer

// 创建消费者Consumer consumer = new DefaultConsumer(channel) {// 获取消息@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)throws IOException {String msg = new String(body, "utf-8");System.out.println("接收到消息——" + msg);}};

6、接收消息并处理

     // 监听队列channel.basicConsume(QUEUE, true, consumer);
String basicConsume​(String queue, Consumer callback) throws IOException
String basicConsume​(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException
String basicConsume​(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException
String basicConsume​(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException
String basicConsume​(String queue, boolean autoAck, Consumer callback) throws IOException
String basicConsume​(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException
String basicConsume​(String queue, boolean autoAck, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException
String basicConsume​(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException
String basicConsume​(String queue, boolean autoAck, Map<String,​Object> arguments, Consumer callback) throws IOException
String basicConsume​(String queue, boolean autoAck, Map<String,​Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException
String basicConsume​(String queue, boolean autoAck, Map<String,​Object> arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException
String basicConsume​(String queue, boolean autoAck, Map<String,​Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException
String basicConsume​(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException
String basicConsume​(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException
String basicConsume​(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException
String basicConsume​(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException
String basicConsume​(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String,​Object> arguments, Consumer callback) throws IOException
String basicConsume​(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String,​Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException
String basicConsume​(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String,​Object> arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException
String basicConsume​(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String,​Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException

参数说明:

  • queue:队列名称。
  • autoAck:服务器收到消息后是否自动应答。
  • consumerTag:消费者标签,用来区分多个消费者
  • noLocal:设置为true,表示 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者
  • exclusive:是否排他
  • arguments:消费者的参数
  • callback:消费者 DefaultConsumer,用于消费消息,需要重写其中的方法
  • 其他callback
public interface Consumer {//Called when the consumer is cancelled for reasons other than by a call to Channel.basicCancel(java.lang.String).void    handleCancel​(String consumerTag)// Called when the consumer is cancelled by a call to Channel.basicCancel(java.lang.String).   void    handleCancelOk​(String consumerTag)//Called when the consumer is registered by a call to any of the Channel.basicConsume(java.lang.String, com.rabbitmq.client.Consumer) methods.void   handleConsumeOk​(String consumerTag)    //Called when a basic.deliver is received for this consumer.void    handleDelivery​(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)    //Called when a basic.recover-ok is received in reply to a basic.recover.void   handleRecoverOk​(String consumerTag)    //  Called when either the channel or the underlying connection has been shut down.void handleShutdownSignal​(String consumerTag, ShutdownSignalException sig)
}
  • handleCancel:除了调用basicCancel的其他原因导致消息被取消时调用。
  • handleCancelOk:basicCancel调用导致的订阅取消时被调用。
  • handleConsumeOk:任意basicComsume调用导致消费者被注册时调用。
  • handleDelivery:消息接收时被调用。
  • handleRecoverOk:basic.recover-ok被接收时调用
  • handleShutdownSignal:当Channel与Conenction关闭的时候会调用。

应用场景

各场景比较

应用场景 exchange 队列 生产者端 消费者端
简单队列 单个队列 发送到指定队列 自动应答
工作队列 多个队列 发送到指定队列 自动应答/    手动应答,公平分发
发布/订阅 fanout 多个队列 发送到指定exchange,不设置routing key 消费指定队列。
路由模式 direct 多个队列 发送到指定exchange,设置routing key 消费指定队列。指定1个或者多个binging key
topic模式 topic 多个队列 发送到指定exchange,设置routing key。key中包含点号(.)。 消费指定队列。指定1个或者多个binging key,key中包含点号(.)。
rpc        
发布确认        

简单队列

生产者直接发送消息到队列,消费者直接消费队列消息。

/*生产者代码  */
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
//直接发送消息到队列。exchange参数为""
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
/* 消费者代码 */
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);//定义消息消费
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");
};
//开始消费
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

工作队列

一般在实际应用中,生产者发送消息耗时较少,反应较快,反而是消费者因为要处理业务逻辑,处理时间可能会很慢,这样队列中会积压很多消息,所以需要多个消费者分摊压力,这个时候可以使用工作队列。

生产者消费者代码与简单队列一样,差别为运行多个消费者代码实例。

消费者默认采用Round-robin方式轮询分发,每个消费者接收到的消息数基本一样。如果消费者处理消息速度不一致,会导致一个空闲,一个繁忙。

可以采用公平模式,如果消费者未处理完消息,则队列不会再发送消息给此消费者,只到上一条消息处理完。

修改:

  • 添加channel.basicQos(1):保证一次只分发一条消息。
int prefetchCount = 1;
//设置每次仅接收1条消息。
channel.basicQos(prefetchCount);
  • channel.basicAck(envelope.getDeliveryTag(), false):手动确认消息。false表示确认接收消息,true表示拒绝接收消息。
  try {doWork(message);} finally {System.out.println(" [x] Done");//手动确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}
  • channel.basicConsume(QUEUE, false, consumer):设置自动应答为false。
 channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });

发布/订阅

生产者没有把消费发送给队列,而是发送给exchange,由exchange进行路由到绑定的队列,消费者仅消费对应队列,例如事件通知通过邮件和短信进行通知。

exchange的类型为fanout

private static final String EXCHANGE_NAME = "logs";//声明一个exchange:logs
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//发送消息到exchange:logs
channel.basicPublish( "logs", "", null, message.getBytes());
    private static final String EXCHANGE_NAME = "logs";//声明exchange:logschannel.exchangeDeclare(EXCHANGE_NAME, "fanout");//声明一个随机名称的临时队列String queueName = channel.queueDeclare().getQueue();//绑定队列与exchangechannel.queueBind(queueName, EXCHANGE_NAME, "");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};//开始消费临时队列:channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });

路由模式

exchange类型为direct

private static final String EXCHANGE_NAME = "direct_logs";//指定exchange为direct
channel.exchangeDeclare(EXCHANGE_NAME, "direct");String severity = getSeverity(argv);
String message = getMessage(argv);
//指定routing key:severity
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
    //声明exchange:类型为:directchannel.exchangeDeclare(EXCHANGE_NAME, "direct");String queueName = channel.queueDeclare().getQueue();//绑定队列与exchange,routing key:severityfor (String severity : argv) {channel.queueBind(queueName, EXCHANGE_NAME, severity);}DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};//消费消息channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}

topic模式

exchange类型为topic

topic模式与direct模式代码一样,区别为binding key与routing key 必须包含点号(.)。

channel.exchangeDeclare(EXCHANGE_NAME, "topic");String routingKey = getRouting(argv);
String message = getMessage(argv);channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
    channel.exchangeDeclare(EXCHANGE_NAME, "topic");String queueName = channel.queueDeclare().getQueue();for (String bindingKey : argv) {channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);}System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });

rpc模式

发布确认

RabbitMQ开发详解相关推荐

  1. java框架魔乐_16 魔乐科技 SpringBoot框架开发详解

    资源内容: 16 魔乐科技 SpringBoot框架开发详解|____springboot开发代码.rar|____第一章:SpringBoot入门          |____2. SpringBo ...

  2. 物联网数据传输协议MQTT介绍与应用开发详解

    本文首发微信公众号:码上观世界 Part 1 物联网概述 1. 物联网概念 物联网是指通过各种信息传感器.射频识别技术.全球定位系统.红外感应器.激光扫描器等各种装置与技术,实时采集任何需要监控. 连 ...

  3. 【OpenCV 4开发详解】分割图像——分水岭法

    本文首发于"小白学视觉"微信公众号,欢迎关注公众号 本文作者为小白,版权归人民邮电出版社发行所有,禁止转载,侵权必究! 经过几个月的努力,小白终于完成了市面上第一本OpenCV 4 ...

  4. 【OpenCV 4开发详解】QR二维码检测

    本文首发于"小白学视觉"微信公众号,欢迎关注公众号 本文作者为小白,版权归人民邮电出版社发行所有,禁止转载,侵权必究! 经过几个月的努力,小白终于完成了市面上第一本OpenCV 4 ...

  5. 【OpenCV 4开发详解】深度神经网络应用实例

    本文首发于"小白学视觉"微信公众号,欢迎关注公众号 本文作者为小白,版权归人民邮电出版社发行所有,禁止转载,侵权必究! 经过几个月的努力,小白终于完成了市面上第一本OpenCV 4 ...

  6. 【OpenCV 4开发详解】图像修复

    本文首发于"小白学视觉"微信公众号,欢迎关注公众号 本文作者为小白,版权归人民邮电出版社发行所有,禁止转载,侵权必究! 经过几个月的努力,小白终于完成了市面上第一本OpenCV 4 ...

  7. 【OpenCV 4开发详解】分割图像——Mean-Shift分割算法

    本文首发于"小白学视觉"微信公众号,欢迎关注公众号 本文作者为小白,版权归人民邮电出版社发行所有,禁止转载,侵权必究! 经过几个月的努力,小白终于完成了市面上第一本OpenCV 4 ...

  8. 【OpenCV 4开发详解】分割图像——Grabcut图像分割

    本文首发于"小白学视觉"微信公众号,欢迎关注公众号 本文作者为小白,版权归人民邮电出版社发行所有,禁止转载,侵权必究! 经过几个月的努力,小白终于完成了市面上第一本OpenCV 4 ...

  9. 【OpenCV 4开发详解】漫水填充法

    本文首发于"小白学视觉"微信公众号,欢迎关注公众号 本文作者为小白,版权归人民邮电出版社发行所有,禁止转载,侵权必究! 经过几个月的努力,小白终于完成了市面上第一本OpenCV 4 ...

最新文章

  1. Dialog 去白色边框及透明
  2. python返回序列中的最小元素_python实现获取序列中最小的几个元素
  3. 基本数据类型与表达式5 - 零基础入门学习Delphi06
  4. 猴子管理法则(网络文摘)
  5. 机器学习经典必读书,李航《统计学习方法》出视频课了!
  6. vnc连接linux颜色灰色,VNC 灰色的屏幕解决方法
  7. 如何运用并行编程Parallel提升任务执行效率
  8. linux下共享文件夹(windows可访问,linux也可访问)
  9. 7-63 情人节 (15 分)(c++stl)
  10. Echarts4+EchartsGL 3D迁徙图(附源码)
  11. Cardano链上首个流动性解决方案商Occam宣布与Changelly合作孵化新项目
  12. 职场上有3种类型的人,最后一种类型老板最喜欢,你是哪一类?
  13. python自动测试m_python自动化测试实例解析
  14. JavaScript实现秒杀倒计时效果(附源码)
  15. 柯尼卡美能达c353改语言,柯尼卡美能达bizhub c353c253c203维修手册中文部分2.pdf
  16. plsql导出文件转mysql_PLSQL Developer导入导出数据库
  17. 自然人税收管理系统服务器,【轻松学个税申报】自然人税收管理系统客户端操作...
  18. PyTorch框架中使用早停止Early Stopping(含详细代码)
  19. 全球数字电视标准制式
  20. 已写完的二十本最经典原创小说巨作!你都看过吗?

热门文章

  1. 美国教育---一切为了学生的成才
  2. 解决四个字节的字符无法存入数据库
  3. array_merge与array+array的区别
  4. MySQL主从复制原理应用基础
  5. 集存款(复利单利)贷款为一体的计算器(最新版)
  6. Android ---- Context
  7. 配置Exchange 2010邮箱和邮件大小限制
  8. centos 7 mysql图形界面_centos7-vnstat图形界面搭建
  9. 设备管理器中的计算机有什么用,为什么计算机设备管理器中有两个图形卡?
  10. 矩形波的傅里叶变换_冲激信号、门信号、方波、矩形波的傅里叶变换总结