直连模式的缺点

当生产者生产消息过快,消费者消费过慢的情况下,会造成消息的大量堆积。因此这个时候就可以使用work模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列的消息一旦消费就不会存在,所以任务也是不会被重复执行的。

角色:
P: 生产者:任务的发布者
C1: 消费者1,领取任务并且完成任务,假设完成速度较慢
C2: 消费者2:领取任务并且完成任务,假设完成速度快。

创建提供者

package com.wang.rabbitmq.work;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wang.rabbitmq.utils.RabbitMQUtils;import java.io.IOException;public class Provider {public static void main(String[] args) throws IOException {//         获取连接对象Connection connection= RabbitMQUtils.getConnetion();
//        获取通道对象Channel channel=connection.createChannel();
//         通过通道声明队列/*queue是 队列名称durable 声明是否持久化exclusive是否独占autoDelete: 是否在队列为空的时候自动删除arguments: 设置的队列的参数*/channel.queueDeclare("work",true,false,false,null);//         生产消息for(int i=0;i<10;i++){channel.basicPublish("","work",null,(i+"hello work quenet").getBytes());}
//      关闭通道和连接RabbitMQUtils.closeConnectionAndChanel(channel,connection);}}

创建消费者1

package com.wang.rabbitmq.work;import com.rabbitmq.client.*;
import com.wang.rabbitmq.utils.RabbitMQUtils;import java.io.IOException;public class consumer1 {public static void main(String[] args) throws IOException {Connection connection= RabbitMQUtils.getConnetion();Channel channe=connection.createChannel();channe.queueDeclare("work",true,false,false,null);channe.basicConsume("work",true,new DefaultConsumer(channe){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者-1"+new String(body));}});}
}

创建消费者2

package com.wang.rabbitmq.work;import com.rabbitmq.client.*;
import com.wang.rabbitmq.utils.RabbitMQUtils;import java.io.IOException;public class consumer2 {public static void main(String[] args) throws IOException {Connection connection= RabbitMQUtils.getConnetion();Channel channe=connection.createChannel();channe.queueDeclare("work",true,false,false,null);channe.basicConsume("work",true,new DefaultConsumer(channe){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者-1"+new String(body));}});}
}

结果


上面的结果可以看出,消息会公平分发,但是这样也会存在问题,会造成资源的浪费,因此我们希望能者多劳,完成的快的要完成的多一些。所以要对原来的代码进行一定的更改 。

在原来的代码中

  channe.basicConsume("work",true,new DefaultConsumer(channe)

设置true 表示监听队列自动返回完成的状态,而且在一开始默认是把所有的消息直接发送到消费者的通道里面。

因此需要设置

 channe.basicQos(1);

表示一次只发一个消息到消费者。

具体更改代码如下:

    public static void main(String[] args) throws IOException {Connection connection= RabbitMQUtils.getConnetion();final Channel channe=connection.createChannel();channe.basicQos(1);channe.queueDeclare("work",true,false,false,null);channe.basicConsume("work",false,new DefaultConsumer(channe){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者-1"+new String(body));channe.basicAck(envelope.getDeliveryTag(),false);}});}

Publish/Subscribe 模型

这个模型也被称为 广播

在广播模式下,消息的发送流程是下面这样的:
1.可以有多个消费者
2.每个消费者有自己的队列
3. 每个队列都要绑定到Exchange(交换机)
4. 生产者发送的消息,只能发送到交换机,由交换机来决定发给那个队列,生产者则没有办法决定
5. 交换机把消息发送给绑定过的所有队列
6. 队列的消费者都能拿到消息。可以实现一条消息被多个消费者消费

声明交换机

public class Provider {public static void main(String[] args) throws IOException {//         获取连接对象Connection connection= RabbitMQUtils.getConnetion();
//       创建通道Channel channel=connection.createChannel();
//        把通道声明指定的交换机  exchange 表示交换机名称  type表示交换机类型 channel.exchangeDeclare("register","fanout");channel.basicPublish("register","",null,"fanout type".getBytes());RabbitMQUtils.closeConnectionAndChanel(channel,connection);}
}

声明消费者

public class consumer1 {public static void main(String[] args) throws IOException {Connection connection= RabbitMQUtils.getConnetion();Channel channel=connection.createChannel();
//        通道绑定交换机channel.exchangeDeclare("register","fanout");
//         创建临时队列String queueName=channel.queueDeclare().getQueue();
//        通道绑定交换机和队列channel.queueBind(queueName,"register","");
//      消费消息channel.basicConsume(queueName,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1"+new String(body));}});}
}

第四种模式 (Routing)模式

Routing之订阅模型-Direct(直连)

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息会被不同的队列去消费。这个时候我们就需要用到Direct类型的Exchange。

在Direct模型下:

  • 队列和交换机的绑定不能是任意绑定,而是要指定一个RoutingKey(路由Key)。
  • 消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey.
  • Exchange不在吧消息交给每一个队列,而是根据Routing key进行判断,只有队列的RoutingKey与消息的Routing Key一致的时候,才会接收到消息


图解:

  • P:生产者,向Exchange发送消息,发送消息的时候指定一个Routing key
  • X :Exchange交换机,接收生产者的消息,然后把消息传递给与Routing key完全匹配的队列
  • C1: 消费者,其所在队列指定了需要routing key为error的消息
  • C2:消费者,其所在队列指定了需要routing key为info error warning的消息

代码

生产者

    public static void main(String[] args) throws IOException {//        获取连接Connection connetion = RabbitMQUtils.getConnetion();
//        创建通道对象Channel channel = connetion.createChannel();
//        创建交换机channel.exchangeDeclare("logs_direct","direct");
//     发送的消息String routingkey="info";channel.basicPublish("logs_direct",routingkey,null,("这是direct模型发布的基于["+routingkey+"]发送的消息").getBytes());RabbitMQUtils.closeConnectionAndChanel(channel,connetion);}

消费者

package com.wang.rabbitmq.direct;import com.rabbitmq.client.*;
import com.wang.rabbitmq.utils.RabbitMQUtils;import java.io.IOException;public class consumer1 {public static void main(String[] args) throws IOException {//Connection connetion = RabbitMQUtils.getConnetion();Channel channel = connetion.createChannel();channel.exchangeDeclare("logs_direct","direct");
//        创建临时队列String quene=channel.queueDeclare().getQueue();//        然后基于route key 进行绑定队列和交换机channel.queueBind(quene,"logs_direct","error");channel.basicConsume(quene,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者"+new String(body));}});}
}

第五种 Topic模式

代码

提供者

public class Provider {public static void main(String[] args) throws IOException {//        获取连接Connection connetion = RabbitMQUtils.getConnetion();
//        创建通道对象Channel channel = connetion.createChannel();
//        创建交换机channel.exchangeDeclare("logs_topic","topic");
//     发送的消息String routingkey="message.info";channel.basicPublish("logs_topic",routingkey,null,("这是topic模型发布的基于["+routingkey+"]发送的消息").getBytes());RabbitMQUtils.closeConnectionAndChanel(channel,connetion);}
}

消费者1

public class Consumer {public static void main(String[] args) throws IOException {//Connection connetion = RabbitMQUtils.getConnetion();Channel channel = connetion.createChannel();channel.exchangeDeclare("logs_topic","topic");
//        创建临时队列String quene=channel.queueDeclare().getQueue();//        然后基于route key 进行绑定队列和交换机channel.queueBind(quene,"logs_topic","message.info");channel.basicConsume(quene,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者"+new String(body));}});}
}

消费者2

public class consumer1 {public static void main(String[] args) throws IOException {//Connection connetion = RabbitMQUtils.getConnetion();Channel channel = connetion.createChannel();channel.exchangeDeclare("logs_topic","topic");
//        创建临时队列String quene=channel.queueDeclare().getQueue();//        然后基于route key 进行绑定队列和交换机channel.queueBind(quene,"logs_topic","message.ww");channel.basicConsume(quene,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者"+new String(body));}});}
}

RabbitMQ work quene 模式相关推荐

  1. RabbitMQ六种队列模式-简单队列模式

    前言 RabbitMQ六种队列模式-简单队列 [本文] RabbitMQ六种队列模式-工作队列 RabbitMQ六种队列模式-发布订阅 RabbitMQ六种队列模式-路由模式 RabbitMQ六种队列 ...

  2. RabbitMQ六种队列模式-工作队列模式

    前言 RabbitMQ六种队列模式-简单队列 RabbitMQ六种队列模式-工作队列 [本文] RabbitMQ六种队列模式-发布订阅 RabbitMQ六种队列模式-路由模式 RabbitMQ六种队列 ...

  3. RabbitMQ六种队列模式-发布订阅模式

    前言 RabbitMQ六种队列模式-简单队列 RabbitMQ六种队列模式-工作队列 RabbitMQ六种队列模式-发布订阅 [本文] RabbitMQ六种队列模式-路由模式 RabbitMQ六种队列 ...

  4. RabbitMQ六种队列模式-主题模式

    前言 RabbitMQ六种队列模式-简单队列 RabbitMQ六种队列模式-工作队列 RabbitMQ六种队列模式-发布订阅 RabbitMQ六种队列模式-路由模式 RabbitMQ六种队列模式-主题 ...

  5. 【转】RabbitMQ六种队列模式-5.主题模式

    前言 RabbitMQ六种队列模式-简单队列 RabbitMQ六种队列模式-工作队列 RabbitMQ六种队列模式-发布订阅 RabbitMQ六种队列模式-路由模式 RabbitMQ六种队列模式-主题 ...

  6. 【转】RabbitMQ六种队列模式-4.路由模式

    前言 RabbitMQ六种队列模式-简单队列 RabbitMQ六种队列模式-工作队列 RabbitMQ六种队列模式-发布订阅 RabbitMQ六种队列模式-路由模式 [本文] RabbitMQ六种队列 ...

  7. 【转】RabbitMQ六种队列模式-3.发布订阅模式

    前言 RabbitMQ六种队列模式-简单队列 RabbitMQ六种队列模式-工作队列 RabbitMQ六种队列模式-发布订阅 [本文] RabbitMQ六种队列模式-路由模式 RabbitMQ六种队列 ...

  8. 【转】RabbitMQ六种队列模式-2.工作队列模式

    前言 RabbitMQ六种队列模式-简单队列 RabbitMQ六种队列模式-工作队列 [本文] RabbitMQ六种队列模式-发布订阅 RabbitMQ六种队列模式-路由模式 RabbitMQ六种队列 ...

  9. 【转】RabbitMQ六种队列模式-1.简单队列模式

    前言 RabbitMQ六种队列模式-简单队列 [本文] RabbitMQ六种队列模式-工作队列 RabbitMQ六种队列模式-发布订阅 RabbitMQ六种队列模式-路由模式 RabbitMQ六种队列 ...

最新文章

  1. 【Java 并发编程】线程池机制 ( ThreadPoolExecutor 线程池构造参数分析 | 核心线程数 | 最大线程数 | 非核心线程存活时间 | 任务阻塞队列 )
  2. 突破Outlook2003附件格式限制
  3. hibernate jpa_教程:Hibernate,JPA –第1部分
  4. NYOJ 202 红黑树 数组模拟中序遍历
  5. asp.net页面事件:顺序与回传
  6. mysql sql时间比较_mysql和sql时间 字段比较大小的问题
  7. C++Primer第5版学习笔记(三)
  8. 从数据到代码——通过代码生成机制实现强类型编程[上篇]
  9. 拓端tecdat|R语言近似贝叶斯计算MCMC(ABC-MCMC)轨迹图和边缘图可视化
  10. 离线bootstrap_css下载
  11. 使用GSON解析JSON数据
  12. 计算机三级网络技术考过指南 【历年考点汇总】
  13. win10无限重启_让迷你掌上电脑更具生产力,GPD安装 Win10+Ubuntu双系统
  14. 百度盈利模式的弱点在哪里
  15. python01 初识 bmi测量
  16. HashMap 扩容阈值为什么是0.75
  17. 批量合并excel工作表
  18. maven的使用方法
  19. 网络游戏《丛林战争》开发与学习之(四):游戏客户器端的功能开发(上)
  20. css命名冲突怎么办?原理是啥?

热门文章

  1. 3D Tiles介绍(一)
  2. win7 和 win10 上 cygwin 启用 sshd 服务
  3. 【JavaScript】身份证号码合规性校验(支持18位、15位)
  4. python爬虫经典段子_Python爬虫-爬取糗事百科段子
  5. 树梅派 moudbus
  6. 三维可视化的优势是什么?三维园区可视化,三维可视化展示
  7. 关于研究生论文的一些些指点(导师第一次讲话记录)
  8. c语言单精度比大小,c语言单精度和双精度的区别.pdf
  9. jsconfig.json 在ts跟js混合的项目失效
  10. ARM上电后第一条指令