RabbitMQ开发详解
目录
开发步骤
引入client
生产者
消费者
应用场景
简单队列
工作队列
发布/订阅
路由模式
topic模式
rpc模式
发布确认
开发步骤
引入client
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.2.0</version>
</dependency>
生产者
1、引入类
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
2、创建Connection
ConnectionFactory factory = new ConnectionFactory();// 设置服务地址factory.setHost("127.0.0.1");// 端口factory.setPort(5672);// vhostfactory.setVirtualHost("/vhost_test");// 用户名factory.setUsername("admin");// 密码factory.setPassword("123456");Connection connection = factory.newConnection();
3、创建Channel
Channel channel = connection.createChannel()
channel设置:
4、声明交换器、队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
exchange声明:
DeclareOk exchangeDeclare(String exchange, String type) throws IOException;
DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type) throws IOException;
DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;
DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable) throws IOException;
DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String,Object> arguments) throws IOException;
DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, Map<String,Object> arguments) throws IOException;
DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String,Object> arguments) throws IOException;
DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String,Object> arguments) throws IOException;
参数说明:
exchange:exchange名称
type:exchange类型,BuiltinExchangeType 枚举类型包括:fanout,direct,topic,header。
durable:是否持久化
internal:是否内部exchange,生产者不能直接发布到内部exchange。
arguments:其他参数,用于构造exchange。
队列声明:
DeclareOk queueDeclare() throws IOException;
DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String,Object> arguments) throws IOException;
参数说明:
- queue:队列名称
- durable:是否持久化
- exclusive:是否私有,仅当前程序可访问。
- autoDelete:当最后一个消费者取消订阅后,自动删除。
- arguments:其他参数,创建queue时使用。
5、发送消息
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
void basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, AMQP.BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, AMQP.BasicProperties props, byte[] body) throws IOException;
参数说明:
- exchange:
- routingKey:
- props:消息属性
- body:消息体的byte[]格式。
- mandatory:当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返回给生产者(Basic.Return + Content-Header + Content-Body);当mandatory设置为false时,出现上述情形broker会直接将消息扔掉。
- immediate:当immediate标志位设置为true时,如果exchange在将消息路由到queue(s)时发现对于的queue上么有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或者多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。
mandatory标志告诉服务器至少将该消息route到一个队列中,否则将消息返还给生产者;immediate标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,如果所有queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。
在RabbitMQ3.0以后的版本里,去掉了immediate参数的支持,发送带immediate标记的publish会返回如下错误:
“{amqp_error,not_implemented,“immediate=true”,‘basic.publish’}”,immediate标记会影响镜像队列性能,增加代码复杂性,并建议采用“TTL”和“DLX”等方式替代。
消费者
1、引入类
同生产者
2、创建Connection
同生产者
3、创建Channel
同生产者
4、声明交换器,队列
同生产者
5、构造Consumer
// 创建消费者Consumer consumer = new DefaultConsumer(channel) {// 获取消息@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)throws IOException {String msg = new String(body, "utf-8");System.out.println("接收到消息——" + msg);}};
6、接收消息并处理
// 监听队列channel.basicConsume(QUEUE, true, consumer);
String basicConsume(String queue, Consumer callback) throws IOException
String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException
String basicConsume(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException
String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException
String basicConsume(String queue, boolean autoAck, Map<String,Object> arguments, Consumer callback) throws IOException
String basicConsume(String queue, boolean autoAck, Map<String,Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException
String basicConsume(String queue, boolean autoAck, Map<String,Object> arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException
String basicConsume(String queue, boolean autoAck, Map<String,Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException
String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException
String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException
String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException
String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String,Object> arguments, Consumer callback) throws IOException
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String,Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String,Object> arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String,Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException
参数说明:
- queue:队列名称。
- autoAck:服务器收到消息后是否自动应答。
- consumerTag:消费者标签,用来区分多个消费者
- noLocal:设置为true,表示 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者
- exclusive:是否排他
- arguments:消费者的参数
- callback:消费者 DefaultConsumer,用于消费消息,需要重写其中的方法
- 其他callback
public interface Consumer {//Called when the consumer is cancelled for reasons other than by a call to Channel.basicCancel(java.lang.String).void handleCancel(String consumerTag)// Called when the consumer is cancelled by a call to Channel.basicCancel(java.lang.String). void handleCancelOk(String consumerTag)//Called when the consumer is registered by a call to any of the Channel.basicConsume(java.lang.String, com.rabbitmq.client.Consumer) methods.void handleConsumeOk(String consumerTag) //Called when a basic.deliver is received for this consumer.void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) //Called when a basic.recover-ok is received in reply to a basic.recover.void handleRecoverOk(String consumerTag) // Called when either the channel or the underlying connection has been shut down.void handleShutdownSignal(String consumerTag, ShutdownSignalException sig)
}
- handleCancel:除了调用basicCancel的其他原因导致消息被取消时调用。
- handleCancelOk:basicCancel调用导致的订阅取消时被调用。
- handleConsumeOk:任意basicComsume调用导致消费者被注册时调用。
- handleDelivery:消息接收时被调用。
- handleRecoverOk:basic.recover-ok被接收时调用
- handleShutdownSignal:当Channel与Conenction关闭的时候会调用。
应用场景
各场景比较
应用场景 | exchange | 队列 | 生产者端 | 消费者端 |
简单队列 | 无 | 单个队列 | 发送到指定队列 | 自动应答 |
工作队列 | 无 | 多个队列 | 发送到指定队列 | 自动应答/ 手动应答,公平分发 |
发布/订阅 | fanout | 多个队列 | 发送到指定exchange,不设置routing key | 消费指定队列。 |
路由模式 | direct | 多个队列 | 发送到指定exchange,设置routing key | 消费指定队列。指定1个或者多个binging key |
topic模式 | topic | 多个队列 | 发送到指定exchange,设置routing key。key中包含点号(.)。 | 消费指定队列。指定1个或者多个binging key,key中包含点号(.)。 |
rpc | ||||
发布确认 |
简单队列
生产者直接发送消息到队列,消费者直接消费队列消息。
/*生产者代码 */
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
//直接发送消息到队列。exchange参数为""
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
/* 消费者代码 */
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);//定义消息消费
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");
};
//开始消费
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
工作队列
一般在实际应用中,生产者发送消息耗时较少,反应较快,反而是消费者因为要处理业务逻辑,处理时间可能会很慢,这样队列中会积压很多消息,所以需要多个消费者分摊压力,这个时候可以使用工作队列。
生产者消费者代码与简单队列一样,差别为运行多个消费者代码实例。
消费者默认采用Round-robin方式轮询分发,每个消费者接收到的消息数基本一样。如果消费者处理消息速度不一致,会导致一个空闲,一个繁忙。
可以采用公平模式,如果消费者未处理完消息,则队列不会再发送消息给此消费者,只到上一条消息处理完。
修改:
- 添加channel.basicQos(1):保证一次只分发一条消息。
int prefetchCount = 1;
//设置每次仅接收1条消息。
channel.basicQos(prefetchCount);
- channel.basicAck(envelope.getDeliveryTag(), false):手动确认消息。false表示确认接收消息,true表示拒绝接收消息。
try {doWork(message);} finally {System.out.println(" [x] Done");//手动确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}
- channel.basicConsume(QUEUE, false, consumer):设置自动应答为false。
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
发布/订阅
生产者没有把消费发送给队列,而是发送给exchange,由exchange进行路由到绑定的队列,消费者仅消费对应队列,例如事件通知通过邮件和短信进行通知。
exchange的类型为fanout。
private static final String EXCHANGE_NAME = "logs";//声明一个exchange:logs
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//发送消息到exchange:logs
channel.basicPublish( "logs", "", null, message.getBytes());
private static final String EXCHANGE_NAME = "logs";//声明exchange:logschannel.exchangeDeclare(EXCHANGE_NAME, "fanout");//声明一个随机名称的临时队列String queueName = channel.queueDeclare().getQueue();//绑定队列与exchangechannel.queueBind(queueName, EXCHANGE_NAME, "");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};//开始消费临时队列:channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
路由模式
exchange类型为direct。
private static final String EXCHANGE_NAME = "direct_logs";//指定exchange为direct
channel.exchangeDeclare(EXCHANGE_NAME, "direct");String severity = getSeverity(argv);
String message = getMessage(argv);
//指定routing key:severity
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
//声明exchange:类型为:directchannel.exchangeDeclare(EXCHANGE_NAME, "direct");String queueName = channel.queueDeclare().getQueue();//绑定队列与exchange,routing key:severityfor (String severity : argv) {channel.queueBind(queueName, EXCHANGE_NAME, severity);}DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};//消费消息channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}
topic模式
exchange类型为topic。
topic模式与direct模式代码一样,区别为binding key与routing key 必须包含点号(.)。
channel.exchangeDeclare(EXCHANGE_NAME, "topic");String routingKey = getRouting(argv);
String message = getMessage(argv);channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
channel.exchangeDeclare(EXCHANGE_NAME, "topic");String queueName = channel.queueDeclare().getQueue();for (String bindingKey : argv) {channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);}System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
rpc模式
发布确认
RabbitMQ开发详解相关推荐
- java框架魔乐_16 魔乐科技 SpringBoot框架开发详解
资源内容: 16 魔乐科技 SpringBoot框架开发详解|____springboot开发代码.rar|____第一章:SpringBoot入门 |____2. SpringBo ...
- 物联网数据传输协议MQTT介绍与应用开发详解
本文首发微信公众号:码上观世界 Part 1 物联网概述 1. 物联网概念 物联网是指通过各种信息传感器.射频识别技术.全球定位系统.红外感应器.激光扫描器等各种装置与技术,实时采集任何需要监控. 连 ...
- 【OpenCV 4开发详解】分割图像——分水岭法
本文首发于"小白学视觉"微信公众号,欢迎关注公众号 本文作者为小白,版权归人民邮电出版社发行所有,禁止转载,侵权必究! 经过几个月的努力,小白终于完成了市面上第一本OpenCV 4 ...
- 【OpenCV 4开发详解】QR二维码检测
本文首发于"小白学视觉"微信公众号,欢迎关注公众号 本文作者为小白,版权归人民邮电出版社发行所有,禁止转载,侵权必究! 经过几个月的努力,小白终于完成了市面上第一本OpenCV 4 ...
- 【OpenCV 4开发详解】深度神经网络应用实例
本文首发于"小白学视觉"微信公众号,欢迎关注公众号 本文作者为小白,版权归人民邮电出版社发行所有,禁止转载,侵权必究! 经过几个月的努力,小白终于完成了市面上第一本OpenCV 4 ...
- 【OpenCV 4开发详解】图像修复
本文首发于"小白学视觉"微信公众号,欢迎关注公众号 本文作者为小白,版权归人民邮电出版社发行所有,禁止转载,侵权必究! 经过几个月的努力,小白终于完成了市面上第一本OpenCV 4 ...
- 【OpenCV 4开发详解】分割图像——Mean-Shift分割算法
本文首发于"小白学视觉"微信公众号,欢迎关注公众号 本文作者为小白,版权归人民邮电出版社发行所有,禁止转载,侵权必究! 经过几个月的努力,小白终于完成了市面上第一本OpenCV 4 ...
- 【OpenCV 4开发详解】分割图像——Grabcut图像分割
本文首发于"小白学视觉"微信公众号,欢迎关注公众号 本文作者为小白,版权归人民邮电出版社发行所有,禁止转载,侵权必究! 经过几个月的努力,小白终于完成了市面上第一本OpenCV 4 ...
- 【OpenCV 4开发详解】漫水填充法
本文首发于"小白学视觉"微信公众号,欢迎关注公众号 本文作者为小白,版权归人民邮电出版社发行所有,禁止转载,侵权必究! 经过几个月的努力,小白终于完成了市面上第一本OpenCV 4 ...
最新文章
- Dialog 去白色边框及透明
- python返回序列中的最小元素_python实现获取序列中最小的几个元素
- 基本数据类型与表达式5 - 零基础入门学习Delphi06
- 猴子管理法则(网络文摘)
- 机器学习经典必读书,李航《统计学习方法》出视频课了!
- vnc连接linux颜色灰色,VNC 灰色的屏幕解决方法
- 如何运用并行编程Parallel提升任务执行效率
- linux下共享文件夹(windows可访问,linux也可访问)
- 7-63 情人节 (15 分)(c++stl)
- Echarts4+EchartsGL 3D迁徙图(附源码)
- Cardano链上首个流动性解决方案商Occam宣布与Changelly合作孵化新项目
- 职场上有3种类型的人,最后一种类型老板最喜欢,你是哪一类?
- python自动测试m_python自动化测试实例解析
- JavaScript实现秒杀倒计时效果(附源码)
- 柯尼卡美能达c353改语言,柯尼卡美能达bizhub c353c253c203维修手册中文部分2.pdf
- plsql导出文件转mysql_PLSQL Developer导入导出数据库
- 自然人税收管理系统服务器,【轻松学个税申报】自然人税收管理系统客户端操作...
- PyTorch框架中使用早停止Early Stopping(含详细代码)
- 全球数字电视标准制式
- 已写完的二十本最经典原创小说巨作!你都看过吗?
热门文章
- 美国教育---一切为了学生的成才
- 解决四个字节的字符无法存入数据库
- array_merge与array+array的区别
- MySQL主从复制原理应用基础
- 集存款(复利单利)贷款为一体的计算器(最新版)
- Android ---- Context
- 配置Exchange 2010邮箱和邮件大小限制
- centos 7 mysql图形界面_centos7-vnstat图形界面搭建
- 设备管理器中的计算机有什么用,为什么计算机设备管理器中有两个图形卡?
- 矩形波的傅里叶变换_冲激信号、门信号、方波、矩形波的傅里叶变换总结