RabbitMQ work quene 模式
直连模式的缺点
当生产者生产消息过快,消费者消费过慢的情况下,会造成消息的大量堆积。因此这个时候就可以使用work模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列的消息一旦消费就不会存在,所以任务也是不会被重复执行的。
角色:
P: 生产者:任务的发布者
C1: 消费者1,领取任务并且完成任务,假设完成速度较慢
C2: 消费者2:领取任务并且完成任务,假设完成速度快。
创建提供者
package com.wang.rabbitmq.work;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wang.rabbitmq.utils.RabbitMQUtils;import java.io.IOException;public class Provider {public static void main(String[] args) throws IOException {// 获取连接对象Connection connection= RabbitMQUtils.getConnetion();
// 获取通道对象Channel channel=connection.createChannel();
// 通过通道声明队列/*queue是 队列名称durable 声明是否持久化exclusive是否独占autoDelete: 是否在队列为空的时候自动删除arguments: 设置的队列的参数*/channel.queueDeclare("work",true,false,false,null);// 生产消息for(int i=0;i<10;i++){channel.basicPublish("","work",null,(i+"hello work quenet").getBytes());}
// 关闭通道和连接RabbitMQUtils.closeConnectionAndChanel(channel,connection);}}
创建消费者1
package com.wang.rabbitmq.work;import com.rabbitmq.client.*;
import com.wang.rabbitmq.utils.RabbitMQUtils;import java.io.IOException;public class consumer1 {public static void main(String[] args) throws IOException {Connection connection= RabbitMQUtils.getConnetion();Channel channe=connection.createChannel();channe.queueDeclare("work",true,false,false,null);channe.basicConsume("work",true,new DefaultConsumer(channe){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者-1"+new String(body));}});}
}
创建消费者2
package com.wang.rabbitmq.work;import com.rabbitmq.client.*;
import com.wang.rabbitmq.utils.RabbitMQUtils;import java.io.IOException;public class consumer2 {public static void main(String[] args) throws IOException {Connection connection= RabbitMQUtils.getConnetion();Channel channe=connection.createChannel();channe.queueDeclare("work",true,false,false,null);channe.basicConsume("work",true,new DefaultConsumer(channe){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者-1"+new String(body));}});}
}
结果
上面的结果可以看出,消息会公平分发,但是这样也会存在问题,会造成资源的浪费,因此我们希望能者多劳,完成的快的要完成的多一些。所以要对原来的代码进行一定的更改 。
在原来的代码中
channe.basicConsume("work",true,new DefaultConsumer(channe)
设置true 表示监听队列自动返回完成的状态,而且在一开始默认是把所有的消息直接发送到消费者的通道里面。
因此需要设置
channe.basicQos(1);
表示一次只发一个消息到消费者。
具体更改代码如下:
public static void main(String[] args) throws IOException {Connection connection= RabbitMQUtils.getConnetion();final Channel channe=connection.createChannel();channe.basicQos(1);channe.queueDeclare("work",true,false,false,null);channe.basicConsume("work",false,new DefaultConsumer(channe){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者-1"+new String(body));channe.basicAck(envelope.getDeliveryTag(),false);}});}
Publish/Subscribe 模型
这个模型也被称为 广播
在广播模式下,消息的发送流程是下面这样的:
1.可以有多个消费者
2.每个消费者有自己的队列
3. 每个队列都要绑定到Exchange(交换机)
4. 生产者发送的消息,只能发送到交换机,由交换机来决定发给那个队列,生产者则没有办法决定
5. 交换机把消息发送给绑定过的所有队列
6. 队列的消费者都能拿到消息。可以实现一条消息被多个消费者消费
声明交换机
public class Provider {public static void main(String[] args) throws IOException {// 获取连接对象Connection connection= RabbitMQUtils.getConnetion();
// 创建通道Channel channel=connection.createChannel();
// 把通道声明指定的交换机 exchange 表示交换机名称 type表示交换机类型 channel.exchangeDeclare("register","fanout");channel.basicPublish("register","",null,"fanout type".getBytes());RabbitMQUtils.closeConnectionAndChanel(channel,connection);}
}
声明消费者
public class consumer1 {public static void main(String[] args) throws IOException {Connection connection= RabbitMQUtils.getConnetion();Channel channel=connection.createChannel();
// 通道绑定交换机channel.exchangeDeclare("register","fanout");
// 创建临时队列String queueName=channel.queueDeclare().getQueue();
// 通道绑定交换机和队列channel.queueBind(queueName,"register","");
// 消费消息channel.basicConsume(queueName,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1"+new String(body));}});}
}
第四种模式 (Routing)模式
Routing之订阅模型-Direct(直连)
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息会被不同的队列去消费。这个时候我们就需要用到Direct类型的Exchange。
在Direct模型下:
- 队列和交换机的绑定不能是任意绑定,而是要指定一个RoutingKey(路由Key)。
- 消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey.
- Exchange不在吧消息交给每一个队列,而是根据Routing key进行判断,只有队列的RoutingKey与消息的Routing Key一致的时候,才会接收到消息
图解:
- P:生产者,向Exchange发送消息,发送消息的时候指定一个Routing key
- X :Exchange交换机,接收生产者的消息,然后把消息传递给与Routing key完全匹配的队列
- C1: 消费者,其所在队列指定了需要routing key为error的消息
- C2:消费者,其所在队列指定了需要routing key为info error warning的消息
代码
生产者
public static void main(String[] args) throws IOException {// 获取连接Connection connetion = RabbitMQUtils.getConnetion();
// 创建通道对象Channel channel = connetion.createChannel();
// 创建交换机channel.exchangeDeclare("logs_direct","direct");
// 发送的消息String routingkey="info";channel.basicPublish("logs_direct",routingkey,null,("这是direct模型发布的基于["+routingkey+"]发送的消息").getBytes());RabbitMQUtils.closeConnectionAndChanel(channel,connetion);}
消费者
package com.wang.rabbitmq.direct;import com.rabbitmq.client.*;
import com.wang.rabbitmq.utils.RabbitMQUtils;import java.io.IOException;public class consumer1 {public static void main(String[] args) throws IOException {//Connection connetion = RabbitMQUtils.getConnetion();Channel channel = connetion.createChannel();channel.exchangeDeclare("logs_direct","direct");
// 创建临时队列String quene=channel.queueDeclare().getQueue();// 然后基于route key 进行绑定队列和交换机channel.queueBind(quene,"logs_direct","error");channel.basicConsume(quene,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者"+new String(body));}});}
}
第五种 Topic模式
代码
提供者
public class Provider {public static void main(String[] args) throws IOException {// 获取连接Connection connetion = RabbitMQUtils.getConnetion();
// 创建通道对象Channel channel = connetion.createChannel();
// 创建交换机channel.exchangeDeclare("logs_topic","topic");
// 发送的消息String routingkey="message.info";channel.basicPublish("logs_topic",routingkey,null,("这是topic模型发布的基于["+routingkey+"]发送的消息").getBytes());RabbitMQUtils.closeConnectionAndChanel(channel,connetion);}
}
消费者1
public class Consumer {public static void main(String[] args) throws IOException {//Connection connetion = RabbitMQUtils.getConnetion();Channel channel = connetion.createChannel();channel.exchangeDeclare("logs_topic","topic");
// 创建临时队列String quene=channel.queueDeclare().getQueue();// 然后基于route key 进行绑定队列和交换机channel.queueBind(quene,"logs_topic","message.info");channel.basicConsume(quene,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者"+new String(body));}});}
}
消费者2
public class consumer1 {public static void main(String[] args) throws IOException {//Connection connetion = RabbitMQUtils.getConnetion();Channel channel = connetion.createChannel();channel.exchangeDeclare("logs_topic","topic");
// 创建临时队列String quene=channel.queueDeclare().getQueue();// 然后基于route key 进行绑定队列和交换机channel.queueBind(quene,"logs_topic","message.ww");channel.basicConsume(quene,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者"+new String(body));}});}
}
RabbitMQ work quene 模式相关推荐
- RabbitMQ六种队列模式-简单队列模式
前言 RabbitMQ六种队列模式-简单队列 [本文] RabbitMQ六种队列模式-工作队列 RabbitMQ六种队列模式-发布订阅 RabbitMQ六种队列模式-路由模式 RabbitMQ六种队列 ...
- RabbitMQ六种队列模式-工作队列模式
前言 RabbitMQ六种队列模式-简单队列 RabbitMQ六种队列模式-工作队列 [本文] RabbitMQ六种队列模式-发布订阅 RabbitMQ六种队列模式-路由模式 RabbitMQ六种队列 ...
- RabbitMQ六种队列模式-发布订阅模式
前言 RabbitMQ六种队列模式-简单队列 RabbitMQ六种队列模式-工作队列 RabbitMQ六种队列模式-发布订阅 [本文] RabbitMQ六种队列模式-路由模式 RabbitMQ六种队列 ...
- RabbitMQ六种队列模式-主题模式
前言 RabbitMQ六种队列模式-简单队列 RabbitMQ六种队列模式-工作队列 RabbitMQ六种队列模式-发布订阅 RabbitMQ六种队列模式-路由模式 RabbitMQ六种队列模式-主题 ...
- 【转】RabbitMQ六种队列模式-5.主题模式
前言 RabbitMQ六种队列模式-简单队列 RabbitMQ六种队列模式-工作队列 RabbitMQ六种队列模式-发布订阅 RabbitMQ六种队列模式-路由模式 RabbitMQ六种队列模式-主题 ...
- 【转】RabbitMQ六种队列模式-4.路由模式
前言 RabbitMQ六种队列模式-简单队列 RabbitMQ六种队列模式-工作队列 RabbitMQ六种队列模式-发布订阅 RabbitMQ六种队列模式-路由模式 [本文] RabbitMQ六种队列 ...
- 【转】RabbitMQ六种队列模式-3.发布订阅模式
前言 RabbitMQ六种队列模式-简单队列 RabbitMQ六种队列模式-工作队列 RabbitMQ六种队列模式-发布订阅 [本文] RabbitMQ六种队列模式-路由模式 RabbitMQ六种队列 ...
- 【转】RabbitMQ六种队列模式-2.工作队列模式
前言 RabbitMQ六种队列模式-简单队列 RabbitMQ六种队列模式-工作队列 [本文] RabbitMQ六种队列模式-发布订阅 RabbitMQ六种队列模式-路由模式 RabbitMQ六种队列 ...
- 【转】RabbitMQ六种队列模式-1.简单队列模式
前言 RabbitMQ六种队列模式-简单队列 [本文] RabbitMQ六种队列模式-工作队列 RabbitMQ六种队列模式-发布订阅 RabbitMQ六种队列模式-路由模式 RabbitMQ六种队列 ...
最新文章
- 【Java 并发编程】线程池机制 ( ThreadPoolExecutor 线程池构造参数分析 | 核心线程数 | 最大线程数 | 非核心线程存活时间 | 任务阻塞队列 )
- 突破Outlook2003附件格式限制
- hibernate jpa_教程:Hibernate,JPA –第1部分
- NYOJ 202 红黑树 数组模拟中序遍历
- asp.net页面事件:顺序与回传
- mysql sql时间比较_mysql和sql时间 字段比较大小的问题
- C++Primer第5版学习笔记(三)
- 从数据到代码——通过代码生成机制实现强类型编程[上篇]
- 拓端tecdat|R语言近似贝叶斯计算MCMC(ABC-MCMC)轨迹图和边缘图可视化
- 离线bootstrap_css下载
- 使用GSON解析JSON数据
- 计算机三级网络技术考过指南 【历年考点汇总】
- win10无限重启_让迷你掌上电脑更具生产力,GPD安装 Win10+Ubuntu双系统
- 百度盈利模式的弱点在哪里
- python01 初识 bmi测量
- HashMap 扩容阈值为什么是0.75
- 批量合并excel工作表
- maven的使用方法
- 网络游戏《丛林战争》开发与学习之(四):游戏客户器端的功能开发(上)
- css命名冲突怎么办?原理是啥?