RabbitMQ安装教程及使用
目录标题
- 有哪些MQ
- 安装
- 1. 首先安装配置环境
- 2. 开启管理界面及配置
- 3. 启动
- rabbitMQ使用
- 1. rabbitMQ的简单模式
- 2. 工作者模式
- 3. 发布订阅模式
- 4. 路由模式
- 5. topic主体模式
- rabbitMQ高级特性
- 正文--rabbitMQ高级篇
- 1.消息可靠性投递
- 2. Consumer ACK
- 3. 消费端限流
- 4.TTL
- rabbitMQ高可用
- 正文
- 1. 死信队列
- 2. 延迟队列
- 3.消息幂等性保障
- 4. rabbitMQ集群---一台主机启动多个rabbitMQ 伪集群。
RabbitMQ安装教程及使用(全解析)
有哪些MQ
安装
1. 首先安装配置环境
yum install gcc socat openssl openssl-devel
然后与官网下载与之对应的Erlang版本
地址如下:
https://www.rabbitmq.com/which-erlang.html
下载后安装Erlang
rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm
去官网下载与Erlang版本对应的RabbitMQ
。。。
安装MQ
rpm -ivh rabbitmq-server-3.7.17-1.el7.noarch.rpm
2. 开启管理界面及配置
开启管理界面
rabbitmq-plugins enable rabbitmq_management
配置远程可使用guest登录mq
cd /usr/share/doc/rabbitmq-server-3.7.17
拷贝到改目录
cp rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
修改配置文件,为了RabbitMQ的登录
vi /etc/rabbitmq/rabbitmq.config
3. 启动
centos7用这个命令启动:
systemctl start rabbitmq-server
RadditMQ访问
http://localhost:15672
注:
RadditMQ默认端口号为15672记得放行此端口号
rabbitMQ使用
测试
1. rabbitMQ的简单模式
一个生产者一个队列对应一个消费者
生产者
package com.aaa.hello;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Product {public static void main(String[] args)throws Exception {//创建连接工厂 --配置连接信息ConnectionFactory factory=new ConnectionFactory();//改成你rabbitMQ运行电脑的ipfactory.setHost("192.168.31.142");//创建连接对象ConnectionConnection connection=factory.newConnection();//创建信道Channel channel = connection.createChannel();//创建队列/*** String queue, 队列的名称* boolean durable, 是否该队列持久化 rabbitMQ服务重启后该存放是否存在。* boolean exclusive, 是否独占 false* boolean autoDelete, 是否自动删除 如果长时间没有发生消息 则自动删除* Map<String, Object> arguments 额外参数 先给null*/channel.queueDeclare("队列名",true,false,false,null);//发生消息/*** String exchange: 交换机的名称 如果没有则使用“” 它回自动采用默认* String routingKey, 路由key 如果没有交换机的绑定 使用队列的名称* BasicProperties props, 消息的一些额外配置 目前先不加 null* byte[] body 消息的内容*/String msg="发送到队列的任务";channel.basicPublish("","队列名",null,msg.getBytes());}
}
消费者
package com.aaa.hello;import com.rabbitmq.client.*;import java.io.IOException;public class Consumer {public static void main(String[] args) throws Exception{//创建连接工厂 --配置连接信息ConnectionFactory factory=new ConnectionFactory();//改成你rabbitMQ运行电脑的ipfactory.setHost("192.168.31.142");//创建连接对象ConnectionConnection connection=factory.newConnection();//创建信道Channel channel = connection.createChannel();//接受消息/*** (String queue, 队列的名称* boolean autoAck, 是否自动确认* Consumer callback: 回调方法 当队列中存在信息后 会自动触发回调函数。*/DefaultConsumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//body 接受的信息System.out.println("消息的内容:"+new String(body));}};channel.basicConsume("队列名",true,callback);}
}
2. 工作者模式
一个生产者对应一个队列对应多个消费者
这些消费者之间存在竞争关系
用处:
比如批量处理上. rabbitMQ里面积压了大量的消息。
生产者
package com.aaa.work;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Product {public static void main(String[] args)throws Exception {//创建连接工厂 --配置连接信息ConnectionFactory factory=new ConnectionFactory();//改成你rabbitMQ运行电脑的ipfactory.setHost("192.168.31.142");//创建连接对象ConnectionConnection connection=factory.newConnection();//创建信道Channel channel = connection.createChannel();//创建队列/*** String queue, 队列的名称* boolean durable, 是否该队列持久化 rabbitMQ服务重启后该存放是否存在。* boolean exclusive, 是否独占 false* boolean autoDelete, 是否自动删除 如果长时间没有发生消息 则自动删除* Map<String, Object> arguments 额外参数 先给null*/channel.queueDeclare("队列名",true,false,false,null);//发生消息/*** String exchange: 交换机的名称 如果没有则使用“” 它回自动采用默认* String routingKey, 路由key 如果没有交换机的绑定 使用队列的名称* BasicProperties props, 消息的一些额外配置 目前先不加 null* byte[] body 消息的内容*/for(int i=0;i<10;i++) {String msg = "发送到队列的任务"+i;channel.basicPublish("", "队列名", null, msg.getBytes());}//生产者这里可以管理资源 消费者不能关闭资源。channel.close();connection.close();}
}
消费者
可以创建多个消费者,这里就不一一创建了
package com.aaa.work;import com.rabbitmq.client.*;import java.io.IOException;public class Consumer01 {public static void main(String[] args) throws Exception{//创建连接工厂 --配置连接信息ConnectionFactory factory=new ConnectionFactory();//改成你rabbitMQ运行电脑的ipfactory.setHost("192.168.31.142");//创建连接对象ConnectionConnection connection=factory.newConnection();//创建信道Channel channel = connection.createChannel();//接受消息/*** (String queue, 队列的名称* boolean autoAck, 是否自动确认* Consumer callback: 回调方法 当队列中存在信息后 会自动触发回调函数。*/DefaultConsumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//body 接受的信息try {Thread.sleep(1000L);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者01:"+new String(body));}};channel.basicConsume("队列名",true,callback);}
}
3. 发布订阅模式
一个生产者对应一个交换机
一个交换机对应多个队列
每个队列对应多个消费者
package com.aaa.fanout;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Product {public static void main(String[] args)throws Exception {//创建连接工厂 --配置连接信息ConnectionFactory factory=new ConnectionFactory();//改成你rabbitMQ运行电脑的ipfactory.setHost("192.168.31.142");//创建连接对象ConnectionConnection connection=factory.newConnection();//创建信道Channel channel = connection.createChannel();//创建队列/*** String queue, 队列的名称* boolean durable, 是否该队列持久化 rabbitMQ服务重启后该存放是否存在。* boolean exclusive, 是否独占 false* boolean autoDelete, 是否自动删除 如果长时间没有发生消息 则自动删除* Map<String, Object> arguments 额外参数 先给null*/channel.queueDeclare("队列名01",true,false,false,null);channel.queueDeclare("队列名02",true,false,false,null);//创建交换机/*** String exchange,交换机的名称* BuiltinExchangeType type, 交换机的类型* boolean durable:是否持久化*/channel.exchangeDeclare("交换机名", BuiltinExchangeType.FANOUT,true);/*** String queue, 队列名* String exchange, 交换机的名称* String routingKey:路由key 如果交换机为fanout模式则不需要路由key*///绑定队列与交换机channel.queueBind("队列名01","交换机名","");channel.queueBind("队列名02","交换机名","");//发生消息/*** String exchange: 交换机的名称 如果没有则使用“” 它回自动采用默认* String routingKey, 路由key 如果没有交换机的绑定 使用队列的名称* BasicProperties props, 消息的一些额外配置 目前先不加 null* byte[] body 消息的内容*/for(int i=0;i<10;i++) {String msg = "发送到队列的任务"+i;channel.basicPublish("交换机名", "", null, msg.getBytes());}//生产者这里可以管理资源 消费者不能关闭资源。channel.close();connection.close();}
}
4. 路由模式
一个生产者对应一个交换机
一个交换机对应多个队列
多个队列对应多个消费者
交换机发送消息给队列只要路由key匹配就送达到该队列
只需改动生产者即可
package com.aaa.direct;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Product {public static void main(String[] args)throws Exception {//创建连接工厂 --配置连接信息ConnectionFactory factory=new ConnectionFactory();//改成你rabbitMQ运行电脑的ipfactory.setHost("192.168.31.142");//创建连接对象ConnectionConnection connection=factory.newConnection();//创建信道Channel channel = connection.createChannel();//创建队列/*** String queue, 队列的名称* boolean durable, 是否该队列持久化 rabbitMQ服务重启后该存放是否存在。* boolean exclusive, 是否独占 false* boolean autoDelete, 是否自动删除 如果长时间没有发生消息 则自动删除* Map<String, Object> arguments 额外参数 先给null*/channel.queueDeclare("队列名01",true,false,false,null);channel.queueDeclare("队列名02",true,false,false,null);//创建交换机/*** String exchange,交换机的名称* BuiltinExchangeType type, 交换机的类型* boolean durable:是否持久化*/channel.exchangeDeclare("交换机名", BuiltinExchangeType.DIRECT,true);/*** String queue, 队列名* String exchange, 交换机的名称* String routingKey:路由key 如果交换机为fanout模式则不需要路由key*/channel.queueBind("队列名01","交换机名","error");channel.queueBind("队列名02","交换机名","info");channel.queueBind("队列名02","交换机名","error");channel.queueBind("队列名02","交换机名","warning");//发生消息/*** String exchange: 交换机的名称 如果没有则使用“” 它回自动采用默认* String routingKey, 路由key 如果没有交换机的绑定 使用队列的名称* BasicProperties props, 消息的一些额外配置 目前先不加 null* byte[] body 消息的内容*/for(int i=0;i<10;i++) {String msg = "发送消息的内容"+i;channel.basicPublish("交换机名", "error", null, msg.getBytes());}//生产者这里可以管理资源 消费者不能关闭资源。channel.close();connection.close();}
}
5. topic主体模式
绑定按照通配符的模式。*: 统配一个单词。#: 统配n个单词hello.orange.rabbitlazy.orange
只需修改生产者即可
package com.aaa.topic;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Product {public static void main(String[] args)throws Exception {//创建连接工厂 --配置连接信息ConnectionFactory factory=new ConnectionFactory();//改成你rabbitMQ运行电脑的ipfactory.setHost("192.168.31.142");//创建连接对象ConnectionConnection connection=factory.newConnection();//创建信道Channel channel = connection.createChannel();//创建队列/*** String queue, 队列的名称* boolean durable, 是否该队列持久化 rabbitMQ服务重启后该存放是否存在。* boolean exclusive, 是否独占 false* boolean autoDelete, 是否自动删除 如果长时间没有发生消息 则自动删除* Map<String, Object> arguments 额外参数 先给null*/channel.queueDeclare("队列名01",true,false,false,null);channel.queueDeclare("队列名02",true,false,false,null);//创建交换机/*** String exchange,交换机的名称* BuiltinExchangeType type, 交换机的类型* boolean durable:是否持久化*/channel.exchangeDeclare("交换机名", BuiltinExchangeType.TOPIC,true);/*** String queue, 队列名* String exchange, 交换机的名称* String routingKey:路由key 如果交换机为fanout模式则不需要路由key*/channel.queueBind("队列名01","交换机名","*.orange.*");channel.queueBind("队列名02","交换机名","*.*.rabbit");channel.queueBind("队列名02","交换机名","lazy.#");//发生消息/*** String exchange: 交换机的名称 如果没有则使用“” 它回自动采用默认* String routingKey, 路由key 如果没有交换机的绑定 使用队列的名称* BasicProperties props, 消息的一些额外配置 目前先不加 null* byte[] body 消息的内容*/for(int i=0;i<10;i++) {String msg = "发送消息的内容"+i;channel.basicPublish("交换机名", "lazy.orange.rabbit", null, msg.getBytes());}//生产者这里可以管理资源 消费者不能关闭资源。channel.close();connection.close();}
}
rabbitMQ高级特性
正文–rabbitMQ高级篇
1.消息可靠性投递
2.Consumer ACK
3.消费端限流
4. TTL
5. 死信队列
6. 延迟队列
7. 消息的幂等性
1.消息可靠性投递
注意:在文中代码进行测试时建议将队列中的内容清除干净,否则很容易出现不是我们理论上的结果;
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。(可以类比发售快递过程,每一环节都可能发生丢失)
消息投递步骤:
1.生产者(channel)---->交换机------>队列中。
为了确保消息的可靠性投递,提供了如下两种方式
confirm 确认模式
return 退回模式
(1)确认模式(消息从channel到交换机)
必须开启确认模式
spring:rabbitmq:host: 192.168.31.195#开启rabbitMQ的生产方确认模式publisher-confirm-type: correlated
设置RabbitTemplate的确认回调函数
@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 保证发送方到交换机的可靠性。* 1.开启confirm模式,publisher-confirm-type: correlated* 2.设置rabbitTemplate的确认回调函数。如果消息到达交换机则返回true,如果消息没有到达交换机则返回一个false*/@Testpublic void testConfirm(){rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {if(b==false){//消息没有到达交换机 根据业务需求。System.out.println("继续发现消息");//取消订单}}});rabbitTemplate.convertAndSend("exchange","","hello confirm");}
(2)退回模式(消息从交换机到队列失败)
(1)开启回退机制
server:port: 8001spring:rabbitmq:host: 192.168.31.195#开启rabbitMQ的生产方确认模式publisher-confirm-type: correlated# 开启发布者退回模式publisher-returns: true
(2)设置RabbitTemplate回调的函数
/*** 退回模式:* 1. 开启退回模式。* 2. 设置RabbitTemplate的退回回调函数。*/@Testpublic void testReturn(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {//只要交换机到队列失败时才会触发该方法。 可以继续发送也可以取消相应的业务功能。// returnedMessage.getReplyText()返回失败的原因System.out.println("消息从交换机到队列失败"+returnedMessage.getReplyText());}});rabbitTemplate.convertAndSend("exchange_direct","error2","hello confirm2");}
2. Consumer ACK
表示消费端收到消息后的确认方式。其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息队列中移除。《《《但是在实际业务处理中,很可能消息<接收到>,业务处理出现异常,那么该消息就会丢失。》》》如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。spring.rabbitmq.listener.type=simple: 容器类型.simple或direct
简单理解为一对一;direct理解为一对多个消费者
(1)消费端配置手动开启确认模式(默认自动模式)
spring:rabbitmq:host: 192.168.31.195listener:simple:#表示手动确认acknowledge-mode: manual# 表示自动确认模式# acknowledge-mode: none这里发送消息使用的是路由模式:具体代码参考本节内容第三部分第二小节的退回模式
@RabbitListener(queues = "queue_direct01")public void listener(Message message, Channel channel) throws Exception{long deliveryTag = message.getMessageProperties().getDeliveryTag();byte[] body = message.getBody();String msg=new String(body);System.out.println(msg);try {
// int c = 10 / 0;System.out.println("处理业务逻辑");//消费端手动确认消息//long deliveryTag, 表示的标识。// boolean multiple:是否允许多确认channel.basicAck(deliveryTag,true); //从队列中删除该消息。}catch (Exception e){//(long deliveryTag, boolean multiple, boolean requeue: 是否让队列再次发送该消息。对比上面的内容来确定参数的含义channel.basicNack(deliveryTag,true,true);}}
如何保证消息可靠性
从实际如:快递 出发来理解保证消息可靠性的每个环节的原因
1. 保证消息从发送者到交换机的可靠性: 使用Confirm确认机制。
2. 保证消息从交换机到队列的可靠性; 使用return回退机制(交换机到队列)。
3. 消息在队列中的可靠性。 设置队列和消息的持久化 Durable耐用的持久化的意思;可视化中Features属性值为D。
4. 保证消息从队列到消费者的可靠性。 使用消费端的手动确认机制。
3. 消费端限流
1. 必须为手动确认模式。(避免一次获得太多消息宕机,因为MQ认为消息你一收到就会在给你发,速度很快)
2. 必须配置限流的个数。
spring:rabbitmq:host: 192.168.31.195listener:simple:#表示手动确认acknowledge-mode: manual# 表示自动确认模式# acknowledge-mode: none# 设置每次消费的个数。prefetch: 100import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class MyListener {@RabbitListener(queues = "queue_direct01")public void listener(Message message, Channel channel) throws Exception{long deliveryTag = message.getMessageProperties().getDeliveryTag();byte[] body = message.getBody();String msg=new String(body);System.out.println(msg);try {
// int c = 10 / 0;//System.out.println("处理业务逻辑");//消费端手动确认消息//long deliveryTag, 表示的标识。// boolean multiple:是否允许多确认channel.basicAck(deliveryTag,true); //从队列中删除该消息,若要。}catch (Exception e){//(long deliveryTag, boolean multiple, boolean requeue: 是否让队列再次发送该消息。channel.basicNack(deliveryTag,true,true);}}
}
4.TTL
1.设置队列过期;
2.设置消息的过期;该消息必须在队列的头部时才会被移除,所以才会在可视化中可以看到明明设置了消息过期时间但是没有见该消息消失,但是当拆包查看消息轮到被设定移除的的消息时你不能够看到,因为此时设定移除的消息在队列头部,会直接被移除,所以你不会查看到这样的信息。//为队列设置过期时间 相当于该队列里面的消息都有过期时间,不同时间发的消息,消息是一条一条的过期的,所以随着时间的推移会有不同的消息数目存在队列中,与上面设置消息的过期不冲突;@Testpublic void testSend(){rabbitTemplate.convertAndSend("myexchange","","hello xiaoxi");}//设置消息的过期时间 如果由设置了队列的过期时间 也设置了消息的过期时间 谁的过期时间短 以谁为准。//该消息必须在头部才能从队列中移除。@Testpublic void testSend02(){for(int i=0;i<10;i++) {if(i==3){MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("20000");//获得消息属性并赋值return message;}};//String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor这样为消息设置额外参数rabbitTemplate.convertAndSend("myexchange", "", "hello xiaoxi"+i, messagePostProcessor);}else {//String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessorrabbitTemplate.convertAndSend("myexchange", "", "hello xiaoxi"+i);}}}
TTL设定方式二:
通过代码创建队列和交换机以及绑定。
@Configuration
public class RabbitConfig {private final String exchange_name="myexchange";private final String queue_name="myqueue";//创建交换机对象@Beanpublic Exchange exchange(){Exchange exchange= ExchangeBuilder.fanoutExchange(exchange_name).durable(true).build();return exchange;//fanoutExchange(exchange_name).相关属性在其后面点出来就可以了,队列创建与此类似}//创建队列@Bean(value = "queue")public Queue queue(){Queue queue= QueueBuilder.durable(queue_name).withArgument("x-message-ttl",20000).build();return queue;}//绑定交换机和队列@Beanpublic Binding binding(Queue queue,Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("").noargs();} //绑定多个队列就需要再一个一个写binding
}
配置类配置完使用test类进行发消息就可以了,因为spring容器会自动根据情况进行创建对象
@Configuration标注当前类是配置类, 并会将当前类内声明的⼀个或多个以@Bean 注解标记的⽅法的实例纳⼊到 spring 容器中 并且实例名就是⽅法名。
rabbitMQ高可用
正文
1. 死信队列
2. 延迟队列
3. 消息的幂等性消费
4. rabbitMQ集群的搭建。
1. 死信队列
死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead
message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
消息成为死信的三种情况:(创建对应的队列关系参数参考下图下方代码)1. 队列消息长度到达限制;@Testpublic void testDeadQueue() {for (int i = 0; i < 13; i++) {rabbitTemplate.convertAndSend("exchange", "error", "下单成功。");}}
2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
注意开启手动模式-----------------------------------------------@RabbitListener(queues = "queue")public void listener(Message msg, Channel channel) throws Exception {System.out.println(new String(msg.getBody()));//(long deliveryTag, boolean multiple, boolean requeue: 是否让队列再次发送该消息。channel.basicNack(msg.getMessageProperties().getDeliveryTag(), true, false);}
3. 原队列存在消息过期设置,消息到达超时时间未被消费;
(第三种情况可参考延迟队列)
队列绑定死信交换机:
给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key
创建对应的队列关系
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {private final String EXCHANGE="exchange";private final String DEAD_EXCHANGE="dead_exchange";private final String QUEUE="queue";private final String DEAD_QUEUE="dead_queue";@Beanpublic Queue queue(){return QueueBuilder.durable(QUEUE)//是否持久化.withArgument("x-message-ttl",20000)//设定消息失效时间.withArgument("x-max-length",10)//设定一次可接受消息数量,超过部分直接放入死信队列.withArgument("x-dead-letter-exchange",DEAD_EXCHANGE)//发送给的交换机.withArgument("x-dead-letter-routing-key","error")//路由key.build();}@Beanpublic Queue dead_queue() {return QueueBuilder.durable(DEAD_QUEUE).build();}@Beanpublic Exchange exchange(){return ExchangeBuilder.directExchange(EXCHANGE).build();}@Beanpublic Exchange dead_exchange(){return ExchangeBuilder.directExchange(DEAD_EXCHANGE).build();}@Beanpublic Binding binding(){return BindingBuilder.bind(queue()).to(exchange()).with("error").noargs();}@Beanpublic Binding dead_binding(){return BindingBuilder.bind(dead_queue()).to(dead_exchange()).with("error").noargs();}
}
注意:如果创建队列或者交换机的配置类内容发生修改,那么之前创建的交换机或者队列即使有内容也需要删去,否则程序运行会报错;
2. 延迟队列
//延迟队列查看死信队列在指定时间后是否收到信息@RabbitListener(queues = "dead_queue")public void listener1(Message msg, Channel channel) throws Exception {
//时间到了,消费者未消费,取消订单并回滚库存//延迟队列代码实现//业务代码try {channel.basicAck(msg.getMessageProperties().getDeliveryTag(), true);}catch (Exception e){channel.basicNack(msg.getMessageProperties().getDeliveryTag(),true,true);}}
3.消息幂等性保障
消费者消费队列信息是单线程的;在增删改查中尤其增加和删除需要注意幂等性!
幂等性: 无论执行多少次,得到的结果和第一次都是相同的。(如一次购买多次请求不会有多次扣款)保证消息不被消费者重复消费。只要保证同一条消息再发过来消息不被消费
解决方案:
1.数据库中使用锁,加一个version字段从而保证不被同一条消息执行两次操作
2.使用redis缓存,不经过sql数据库,redis效率高!具体步骤如下:// @Autowiredprivate RedisTemplate redisTemplate;@RabbitListener(queues = "queue")public void listener(Message msg, Channel channel) throws Exception {//消息幂等性保障Object o = redisTemplate.opsForValue().get(msg.getMessageProperties().getDeliveryTag());if(o==null){//业务代码try {System.out.println("完成业务功能");redisTemplate.opsForValue().set(msg.getMessageProperties().getDeliveryTag(), "ykq");channel.basicAck(msg.getMessageProperties().getDeliveryTag(), true);}catch (Exception e){channel.basicNack(msg.getMessageProperties().getDeliveryTag(),true,true);}}else{channel.basicAck(msg.getMessageProperties().getDeliveryTag(),true);}}
4. rabbitMQ集群—一台主机启动多个rabbitMQ 伪集群。
先停止rabbitMQ服务
service rabbitmq-server stop或者systemctl stop rabbitmq-server
(1)开启第一个节点
[root@super sbin]# RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=rabbit1 rabbitmq-server start
【注意这里的5673只是Java代码(客户端)访问的端口号,不是可视化界面访问的端口号】RabbitMQ 3.6.5. Copyright (C) 2007-2016 Pivotal Software, Inc.## ## Licensed under the MPL. See http://www.rabbitmq.com/## ############ Logs: /var/log/rabbitmq/rabbit1.log###### ## /var/log/rabbitmq/rabbit1-sasl.log##########Starting broker...completed with 3 plugins.
(2)开启第二个节点
[root@super ~]# RABBITMQ_NODE_PORT=5674 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15674}]" RABBITMQ_NODENAME=rabbit2 rabbitmq-server startRabbitMQ 3.6.5. Copyright (C) 2007-2016 Pivotal Software, Inc.## ## Licensed under the MPL. See http://www.rabbitmq.com/## ############ Logs: /var/log/rabbitmq/rabbit2.log###### ## /var/log/rabbitmq/rabbit2-sasl.log##########Starting broker...completed with 3 plugins.
如果可视化访问不了或者Java访问超时对应的端口号15674/3,请注意查看对应的端口号是否防火墙放行
设置主从关系
rabbit1操作作为主节点:
[root@super ~]# rabbitmqctl -n rabbit1 stop_app
Stopping node rabbit1@super ...
[root@super ~]# rabbitmqctl -n rabbit1 reset
Resetting node rabbit1@super ...
[root@super ~]# rabbitmqctl -n rabbit1 start_app
Starting node rabbit1@super ...
rabbit2操作为从节点:
[root@super ~]# rabbitmqctl -n rabbit2 stop_app
Stopping node rabbit2@super ...
[root@super ~]# rabbitmqctl -n rabbit2 reset
Resetting node rabbit2@super ...
[root@super ~]# rabbitmqctl -n rabbit2 join_cluster rabbit1@'localhost' ###''内是主机名换成自己的
Clustering node rabbit2@super with rabbit1@super ...
[root@super ~]# rabbitmqctl -n rabbit2 start_app
Starting node rabbit2@super ...
RabbitMQ镜像集群配置
上面已经完成RabbitMQ默认集群模式,但并不保证队列的高可用性,尽管交换机、绑定这些可以复制到集群里的任何一个节点,但是队列内容不会复制。虽然该模式解决一项目组节点压力,但队列节点宕机直接导致该队列无法应用,只能等待重启,所以要想在队列节点宕机或故障也能正常应用,就要复制队列内容到集群里的每个节点,必须要创建镜像队列。
镜像队列是基于普通的集群模式的,然后再添加一些策略,所以你还是得先配置普通集群,然后才能设置镜像队列,我们就以上面的集群接着做。
ha-sync-mode:分为两种模式,manual/automatic;手动模式(manual默认模式:表示新的队列镜像将不会接收已有消息,只会接收新消息;自动(automatic):加入新镜像后,队列将自动同步。需要注意的是,队列同步是一项阻塞操作。
将新节点加入已存在的镜像队列时,默认情况下ha-sync-mode取值为manual,镜像队列中的消息不会主动同步到新的slave中,除非显示调用同步命令。当调用同步命令后,队列开始阻塞,无法对其进行其他操作,直到同步完成。当ha-sync-mode设置为automatic时,新加入的slave会默认同步已知的镜像队列。由于同步过程的限制,所以不建议对生产环境中正在使用的队列进行操作
HaProxy负载均衡RabbitMQ
当有多个rabbitMq时HaProxy解决生产方与消费方不知从哪里送拿信息的问题!
tar -zxvf haproxy-1.6.5.tar.gz -C /usr/local
//进入目录、进行编译、安装
cd /usr/local/haproxy-1.6.5
make TARGET=linux31 PREFIX=/usr/local/haproxy
make install PREFIX=/usr/local/haproxy
mkdir /etc/haproxy
//创建haproxy配置文件
vi /etc/haproxy/haproxy.cfg#logging options
globallog 127.0.0.1 local0 infomaxconn 5120chroot /usr/local/haproxyuid 99gid 99daemonquietnbproc 20pidfile /var/run/haproxy.piddefaultslog globalmode tcpoption tcplogoption dontlognullretries 3option redispatchmaxconn 2000contimeout 5sclitimeout 60ssrvtimeout 15s
#front-end IP for consumers and producterslisten rabbitmq_cluster# haproxy暴漏的端口号bind 0.0.0.0:5672 ##################################该端口号交给ha代理mode tcp#balance url_param userid#balance url_param session_id check_post 64#balance hdr(User-Agent)#balance hdr(host)#balance hdr(Host) use_domain_only#balance rdp-cookie#balance leastconn#balance source //ipbalance roundrobin# haproxy代理的rabbit服务server node1 127.0.0.1:5673 check inter 5000 rise 2 fall 2server node2 127.0.0.1:5674 check inter 5000 rise 2 fall 2listen stats# haproxy的图形化界面bind 192.168.213.181:8100###########################haproxy的图形化界面访问IPmode httpoption httplogstats enablestats uri /rabbitmq-statsstats refresh 5s
开启Haproxy
/usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg//查看haproxy进程状态
ps -ef | grep haproxy访问如下地址对mq节点进行监控
http://192.168.31.195:8100/rabbitmq-statss
RabbitMQ安装教程及使用相关推荐
- RabbitMQ 安装教程(CentOS版)
RabbitMQ 安装教程(CentOS版) 0.环境准备 1.准备一个干净的虚拟机Linux镜像(或者Linux服务器) 2.安装 lrzsz (文件拉取工具),方便上传本地文件,安装命令:yum ...
- 【超级详细】RabbitMQ安装教程
废话不多说,开干! 相关软件我已准备好,下载链接 链接: https://pan.baidu.com/s/1wO9vBK__7Zklo8qPxbIYqw 提取码: 9m8c 当然 你也可以自己在官网进 ...
- 【rabbitmq安装教程】centos7下安装rabbitMQ
命令: 1.先安装Erlang cd usr/local/src/ wget http://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.c ...
- windows系统erlang和rabbitMQ安装教程(附网盘下载地址)
rabbitMQ依赖于erlang,所以安装rabbitMQ之前需要先安装erlang,而他们之间的版本是有对应的,不同版本的rabbitMQ需要对应不同版本的erlang,如下图 官方下载这两个包是 ...
- RabbitMQ安装教程
安装新版RabbitMQ3.9.10,不需要先安装Erlang了,安装mq时,会自动安装erlang的依赖项 Erlang和RabbitMQ版本对照:RabbitMQ Erlang Version R ...
- RabbitMQ安装教程(超鸡细)
目录 一.环境准备 1.1.RabbitMQ版本 和 Erlang 版本兼容性关系 1.2.官方安装包下载地址 1.3.安装包中说明,请下载对应的安装包 二.安装操作步骤 2.1 .传输 2.2.安装 ...
- linux库怎么安装路径设置,Linux libtins 库安装教程
因为工作原因需要用到libtins网络库, 所以今天去装一下. 很尴尬,由于本人对linux理解比较浅, 所以在中途遇到了一些问题. 虽然只是简单的安装步骤,但是阻挡不了自己菜啊. 一. 下载lib ...
- Docker系列之RabbitMQ安装部署教程
Docker系列之RabbitMQ安装部署教程 因为学习RabbitMQ需要,需要安装RabbitMQ,网上找资料,RabbitMQ官方提供了window版.Linux版.Docker版的管理页面,为 ...
- RabbitMQ Server简介和安装教程
引言 什么是AMQP? AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计.消息中间件主要用于组件之间 ...
最新文章
- mysql数据库中nchar_MySQL数据库中CHAR与VARCHAR之争
- Silverlight 3发布新版3.0.50106.0
- java架构师,必须掌握的几点技术?
- 恢复mysql数据库详细图解_binlog恢复mysql数据库超详细步骤
- EMS批量为用户分配邮箱
- Unix系统编程():分散输入和集中输出(Scatter-Gather IO):readv和writev
- Node JS Buffer使用理解
- 安装Office2010提示缺少MSXML版本6.10.1129.0的解决方法
- html5css重复径向渐变,CSS3怎么实现重复径向渐变效果
- JAVA常用http请求工具类封装
- java桌球小游戏使用图片_java桌球小游戏 小球任意角度碰撞
- Roxe:大涨时毅然销毁99% ROC 专注解决跨境汇款难题
- 也许这30句话会帮到你
- 支付宝通过招行网上银行付钱,最多每笔500块
- 关于机房精密空调监控系统,你想了解的都在这里!
- ubuntu20.04 server 无图形命令行安装
- [翻译] effective go 之 Formatting Commentary
- AWVS_扫描报告分析
- web前端学习笔记之JavaScript
- Maxthon 遨游浏览器找回“上次未关闭页面”
热门文章
- Photozoom图像放大的技术一二事
- php写入速度rabbit,PHP操作RabbitMQ简单Demo
- 【保研日记】本科统计学专业
- 产品经理:想爱没那么简单
- 和平精英显示模拟服务器已满,和平精英模拟器注册达到上限? 模拟器注册上限完美解决攻略...
- 【转】系统无法进入睡眠模式解决办法
- jQuery(入门选择器)
- 学习之响应式应用架构重构ReactiveX
- 一个关于高考的黑客故事:用2B铅笔注入阅卷系统
- Listener method 'public void com.config.mq.MsgReceiver.process(java.lang.String) throw