Rabbitmq

(一) markdown viewer安装

1.

https://chrome.google.com/webstore/detail/markdown-viewer/ckkdlimhmcjmikdlpkmbgfkaikojcbjk 访问将插件添加到google浏览器

2.

直接在chrome浏览器的网址栏输入chrome://extensions/ 查看已经下载的扩展程序,然后找到Markdown Viewer,点击详细信息,然后将允许访问文件网址勾选。这样以后就可以把想要查看的Markdown文件直接拖进浏览器里就可以查看编译过后好看的排版。

3.

把.md文件在google浏览器上访问,或者在vscode上安装.md插件预览即可

!!4.

推荐使用Typora软件,下载安装即可,方便书写

(二) windows rabbitmq安装

1. http://www.rabbitmq.com

2. 安装完成后,输入以下命令

rabbitmq-plugins enable rabbitmq_management
再在google浏览器上输入 localhost:15672,如果无响应,在自己的C:\Users\(用户名)\AppData\Roaming\RabbitMQ\db路径下的文件全部删除,再重新下载rabbitmq-server,再重复2.即可

3. 看到以下窗口即成功,账户密码皆为guest,此处涉及对rabbitmq的理解,可以理解为mysql数据库。

4. virtual hosts<->mysql中的数据库,一般以/开头,然后对用户进行授权,右边栏中有virtual hosts,进行授权,授权完毕,就可以用自己的账户密码了

(三) java演示

1. 此处是pom.xml的依赖,用maven自动导入,依赖在maven官网可以找到,一定要找groupId和artifactId相同的,且使用非红名测试版的最新版

<!--        rabbitmq--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.3</version></dependency>
<!--        slf4j--><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.26</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.26</version><scope>test</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version><scope>test</scope></dependency>

2. 获取MQ连接

写一个工具类util,像tp5里的database.php一样,这样就可以调用了

package com.myapp.demoesi.util;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*包是自动导入的,不用考虑那么多*/public class ConnectionUtils {/*获取MQ的连接,此连接只调用在局部static,并且返回值是一个MQ连接*/public static Connection getConnection() throws IOException, TimeoutException {/*定义一个连接工厂,相当于mysql中的$conn = mysqli_connect($servername, $username, $password);*/ConnectionFactory factory = new ConnectionFactory();//设置服务地址$servernamefactory.setHost("127.0.0.1");//AMQP 5672(http)factory.setPort(5672);//vhost(mysql中的数据库)factory.setVirtualHost("/vhost_mmr");//用户名$usernamefactory.setUsername("mmr");//密码$passwordfactory.setPassword("123456");return factory.newConnection();}
}

3.1 simple简单队列

P:发送;红色:队列;绿色:消费

新建一个simple包,在里面创建Send和Resv类

3.1.1 Send:

package com.myapp.demoesi.rabbitmq.simple;import com.myapp.demoesi.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {private static final String QUEUE_NAME = "test_simple_queue";
//私有,静态,不可变public static void main(String[] args) throws IOException, TimeoutException {//获取一个链接,在刚刚的工具类里Connection connection = ConnectionUtils.getConnection();
//从连接中获取一个通道Channel channel = connection.createChannel();
//创建队列声明channel.queueDeclare(QUEUE_NAME, false, false, false, null);String msg = "Hello,Simple!";//发送在这个队列channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());System.out.println("--send msg:" + msg);
//信道和连接关了channel.close();connection.close();}
}

运行完毕进行验证,在你的rabbitmq里的Queues,进入你的/vhost_mmr的test_simple_queue里

观察是否可以接收到发送的消息,如果可以Send就成功啦!

3.1.2 Resv:

package com.myapp.demoesi.rabbitmq.simple;import com.myapp.demoesi.util.ConnectionUtils;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Resv {private static final String QUEUE_NAME = "test_simple_queue";public static void main(String[] args) throws IOException, TimeoutException {//获取链接Connection connection = ConnectionUtils.getConnection();
//创建频道Channel channel = connection.createChannel();//队列声明,保险起见channel.queueDeclare(QUEUE_NAME, false, false, false, null);DefaultConsumer consumer = new DefaultConsumer(channel) {//获取到达的消息@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);String msg = new String(body, "UTF-8");System.out.println("new api resv:  " + msg);}};
//监听队列 androidchannel.basicConsume(QUEUE_NAME, true, consumer);}/*下面是老方法,此处供学习参考,redis也差不多这样,这个网址聊天这个挺不错的https://www.imooc.com/learn/758*/public void oldresv() {/*    //获取链接Connection connection = ConnectionUtils.getConnection();
//创建频道Channel channel = connection.createChannel();
//定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//监听队列
channel.basicConsume(QUEUE_NAME,true,consumer);
while(true){Delivery delivery = consumer.nextDelivery();
String msgString = new String(delivery.getBody());System.out.println("[resv]msg:"+msgString);
}*///老方法1.3}
}

先开Resv,再执行Send,就可以看到我们发送的Hello simple!了。

3.2.1 work工作队列(轮训分发)

3.2.1.1 Send: 在队列里攒了50个消息

package com.myapp.demoesi.rabbitmq.work;import com.myapp.demoesi.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {/*
*p---queue*/
private static final String QUEUE_NAME = "test_work_queue";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取连接Connection connection = ConnectionUtils.getConnection();//获取channelChannel channel = connection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);for (int i = 0;i<50;i++){String msg = "Hello,"+i;System.out.println("[work] send: "+msg);channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());Thread.sleep(i*20);}channel.close();connection.close();}
}

3.2.1.2 Resv1:消费者1

package com.myapp.demoesi.rabbitmq.work;import com.myapp.demoesi.util.ConnectionUtils;
import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Resv1 {private static final String QUEUE_NAME = "test_work_queue";public static void main(String[] args) throws IOException, TimeoutException {//获取连接Connection connection = ConnectionUtils.getConnection();//获取channelChannel channel = connection.createChannel();//S声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//定义一个消费者Consumer consumer = new DefaultConsumer(channel) {//消息到达触发这个方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);String msg = new String(body, "UTF-8");System.out.println("[resv1] msg:" + msg);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("[1] done");}}};boolean autoAck = true;channel.basicConsume(QUEUE_NAME,autoAck,consumer);}
}

3.2.2.3 Resv2:消费者2

package com.myapp.demoesi.rabbitmq.work;import com.myapp.demoesi.util.ConnectionUtils;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Resv2 {private static final String QUEUE_NAME = "test_work_queue";public static void main(String[] args) throws IOException, TimeoutException {//获取连接Connection connection = ConnectionUtils.getConnection();//获取channelChannel channel = connection.createChannel();//S声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//定义一个消费者Consumer consumer = new DefaultConsumer(channel) {//消息到达触发这个方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);String msg = new String(body, "UTF-8");System.out.println("[resv1] msg:" + msg);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("[2] done");}}};boolean autoAck = true;channel.basicConsume(QUEUE_NAME,autoAck,consumer);}
}

3.2.2 workfair工作队列(公平分发)

3.2.2.1 Send: 在队列里攒了50个消息

package com.myapp.demoesi.rabbitmq.workfair;import com.myapp.demoesi.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {/**p---queue*/private static final String QUEUE_NAME = "test_workfair_queue";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取连接Connection connection = ConnectionUtils.getConnection();//获取channelChannel channel = connection.createChannel();//声明队列boolean durable = true;channel.queueDeclare(QUEUE_NAME, durable, false, false, null);/*消息持久化每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者,一次只处理一个消息
durable持久化,但在声明队列后不能更改参数,因为队列已经存在;但消息不一定持久化*/int prefetchCount = 1;channel.basicQos(prefetchCount);//限制发送给同一个消费者不得超过一个消息for (int i = 0; i < 50; i++) {String msg = "Hello," + i;System.out.println("[work] send: " + msg);channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());Thread.sleep(i * 5);}channel.close();connection.close();}
}

3.2.2.2 Resv1:消费者1

package com.myapp.demoesi.rabbitmq.workfair;import com.myapp.demoesi.util.ConnectionUtils;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Resv1 {private static final String QUEUE_NAME = "test_workfair_queue";public static void main(String[] args) throws IOException, TimeoutException {//获取连接Connection connection = ConnectionUtils.getConnection();//获取channelChannel channel = connection.createChannel();//S声明队列boolean durable = true;channel.queueDeclare(QUEUE_NAME, durable, false, false, null);//消息持久化channel.basicQos(1);//定义一个消费者Consumer consumer = new DefaultConsumer(channel) {int ing = 1;//消息到达触发这个方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);String msg = new String(body, "UTF-8");System.out.println("[resv1] msg:" + msg);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("[" + ing + "] done");ing++;channel.basicAck(envelope.getDeliveryTag(), false);}}};boolean autoAck = false;//自动应答channel.basicConsume(QUEUE_NAME, autoAck, consumer);}
}

3.2.2.3 Resv2:消费者2

package com.myapp.demoesi.rabbitmq.workfair;import com.myapp.demoesi.util.ConnectionUtils;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Resv2 {private static final String QUEUE_NAME = "test_workfair_queue";public static void main(String[] args) throws IOException, TimeoutException {//获取连接Connection connection = ConnectionUtils.getConnection();//获取channelChannel channel = connection.createChannel();//S声明队列boolean durable = true;channel.queueDeclare(QUEUE_NAME, durable, false, false, null);//消息持久化channel.basicQos(1);//定义一个消费者Consumer consumer = new DefaultConsumer(channel) {int ing = 1;//消息到达触发这个方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);String msg = new String(body, "UTF-8");System.out.println("[resv1] msg:" + msg);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("[" + ing + "] done");ing++;//手动回执一个消息channel.basicAck(envelope.getDeliveryTag(), false);}}};boolean autoAck = false;channel.basicConsume(QUEUE_NAME, autoAck, consumer);//自动应答}
}

3.3 publish_subscribe订阅模式 fanout(不处理路由键)

3.3.1 Send: 发送到交换机

package com.myapp.demoesi.rabbitmq.ps;import com.myapp.demoesi.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {private static final String EXCHANGE_NAME="test_exchange_fanout";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();Channel channel=connection.createChannel();//声明交换机channel.exchangeDeclare(EXCHANGE_NAME,"fanout");/*分发交换机没有存储消息的功能,队列才有一方面接收生产者的消息,另一方面是向对列推送消息匿名转发:""fanout(不处理路由键)direct(处理路由键)*///发送消息String msg="Hello ps !";channel.basicPublish(EXCHANGE_NAME,"" , MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());System.out.println("Send: "+msg);channel.close();connection.close();}
}

3.3.2 Resv1: 交换机绑定队列

package com.myapp.demoesi.rabbitmq.ps;import com.myapp.demoesi.util.ConnectionUtils;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Resv1 {private static final String QUEUE_NAME = "test_queue_fanout_email";private static final String EXCHANGE_NAME = "test_exchange_fanout";public static void main(String[] args) throws IOException, TimeoutException {//获取连接Connection connection = ConnectionUtils.getConnection();//获取channelChannel channel = connection.createChannel();//S声明队列boolean durable = true;channel.queueDeclare(QUEUE_NAME, durable, false, false, null);//消息持久化//绑定队列到交换机转发器channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");channel.basicQos(1);//定义一个消费者System.out.println("1 666");Consumer consumer = new DefaultConsumer(channel) {int ing = 1;//消息到达触发这个方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);String msg = new String(body, "UTF-8");System.out.println("[resv1] msg:" + msg);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("[" + ing + "] done");ing++;channel.basicAck(envelope.getDeliveryTag(), false);}}};boolean autoAck = false;//自动应答channel.basicConsume(QUEUE_NAME, autoAck, consumer);}
}

3.3.3 Resv2: 交换机绑定队列

package com.myapp.demoesi.rabbitmq.ps;import com.myapp.demoesi.util.ConnectionUtils;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Resv2 {private static final String QUEUE_NAME = "test_queue_fanout_sms";private static final String EXCHANGE_NAME = "test_exchange_fanout";public static void main(String[] args) throws IOException, TimeoutException {//获取连接Connection connection = ConnectionUtils.getConnection();//获取channelChannel channel = connection.createChannel();//S声明队列boolean durable = true;channel.queueDeclare(QUEUE_NAME, durable, false, false, null);//消息持久化//绑定队列到交换机转发器channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");channel.basicQos(1);//定义一个消费者System.out.println("2 666");Consumer consumer = new DefaultConsumer(channel) {int ing = 1;//消息到达触发这个方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);String msg = new String(body, "UTF-8");System.out.println("[resv2] msg:" + msg);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("[" + ing + "] done");ing++;channel.basicAck(envelope.getDeliveryTag(), false);}}};boolean autoAck = false;//自动应答channel.basicConsume(QUEUE_NAME, autoAck, consumer);}
}

3.4 路由模式 direct(处理路由键)

3.4.1 Send: 发送携带路由键的消息给交换机

package com.myapp.demoesi.rabbitmq.routing;import com.myapp.demoesi.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {private static final String EXCHANGE_NAME="test_exchange_direct";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();//exchangechannel.exchangeDeclare(EXCHANGE_NAME,"direct");String routingKey = "info";String msg = "Hello, "+routingKey+"! ";channel.basicPublish(EXCHANGE_NAME, msg, MessageProperties.PERSISTENT_TEXT_PLAIN, routingKey.getBytes());System.out.println("Send: "+msg);channel.close();connection.close();}
}

3.4.2 Resv1: 绑定"error"路由键和队列"test_queue_direct_1"到交换机上

package com.myapp.demoesi.rabbitmq.routing;import com.myapp.demoesi.util.ConnectionUtils;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Resv1 {private static final String EXCHANGE_NAME = "test_exchange_direct";private static final String QUEUE_NAME = "test_queue_direct_1";public static void main(String[] args) throws IOException, TimeoutException {//获取连接Connection connection = ConnectionUtils.getConnection();//获取channelChannel channel = connection.createChannel();boolean durable = true;channel.queueDeclare(QUEUE_NAME, durable, false, false, null);//消息持久化channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");channel.basicQos(1);Consumer consumer = new DefaultConsumer(channel) {int ing = 1;//消息到达触发这个方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);String msg = new String(body, "UTF-8");System.out.println("[resv1] msg:" + msg);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("[" + ing + "] done");ing++;channel.basicAck(envelope.getDeliveryTag(), false);}}};boolean autoAck = false;//自动应答channel.basicConsume(QUEUE_NAME, autoAck, consumer);}
}

3.4.3 Resv2: 绑定"warning"“info”"error"路由键和队列"test_queue_direct_2"到交换机上

package com.myapp.demoesi.rabbitmq.routing;import com.myapp.demoesi.util.ConnectionUtils;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Resv2 {private static final String EXCHANGE_NAME = "test_exchange_direct";private static final String QUEUE_NAME = "test_queue_direct_2";public static void main(String[] args) throws IOException, TimeoutException {//获取连接Connection connection = ConnectionUtils.getConnection();//获取channelChannel channel = connection.createChannel();boolean durable = true;channel.queueDeclare(QUEUE_NAME, durable, false, false, null);//消息持久化channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warning");channel.basicQos(1);Consumer consumer = new DefaultConsumer(channel) {int ing = 1;//消息到达触发这个方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);String msg = new String(body, "UTF-8");System.out.println("[resv2] msg:" + msg);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("[" + ing + "] done");ing++;channel.basicAck(envelope.getDeliveryTag(), false);}}};boolean autoAck = false;//自动应答channel.basicConsume(QUEUE_NAME, autoAck, consumer);}
}

3.5 topic主题模式 (通配符处理*、#)

3.5.1 Send: routingKey是"goods.delete",可以让goods.#和#.delete绑定的队列接收到

package com.myapp.demoesi.rabbitmq.topic;import com.myapp.demoesi.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {private static final String EXCHANGE_NAME = "test_exchange_topic";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();//exchangechannel.exchangeDeclare(EXCHANGE_NAME, "topic");String routingKey = "goods.delete";String msg = "商品.... " + routingKey + "! ";channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, routingKey.getBytes());System.out.println("Send: " + msg);channel.close();connection.close();}
}

3.5.2 Resv1: 绑定goods.add的"test_queue_topic_1"接收goods.add消息

package com.myapp.demoesi.rabbitmq.topic;import com.myapp.demoesi.util.ConnectionUtils;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Resv1 {private static final String EXCHANGE_NAME = "test_exchange_topic";private static final String QUEUE_NAME = "test_queue_topic_1";public static void main(String[] args) throws IOException, TimeoutException {//获取连接Connection connection = ConnectionUtils.getConnection();//获取channelChannel channel = connection.createChannel();boolean durable = true;channel.queueDeclare(QUEUE_NAME, durable, false, false, null);//消息持久化channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.add");channel.basicQos(1);Consumer consumer = new DefaultConsumer(channel) {int ing = 1;//消息到达触发这个方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);String msg = new String(body, "UTF-8");System.out.println("[resv1] msg:" + msg);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("[" + ing + "] done");ing++;channel.basicAck(envelope.getDeliveryTag(), false);}}};boolean autoAck = false;//自动应答channel.basicConsume(QUEUE_NAME, autoAck, consumer);}
}

3.5.3 Resv2: 绑定goods.#的"test_queue_topic_2"接收goods.#的消息

package com.myapp.demoesi.rabbitmq.topic;import com.myapp.demoesi.util.ConnectionUtils;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Resv2 {private static final String EXCHANGE_NAME = "test_exchange_topic";private static final String QUEUE_NAME = "test_queue_topic_2";public static void main(String[] args) throws IOException, TimeoutException {//获取连接Connection connection = ConnectionUtils.getConnection();//获取channelChannel channel = connection.createChannel();boolean durable = true;channel.queueDeclare(QUEUE_NAME, durable, false, false, null);//消息持久化channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.#");channel.basicQos(1);Consumer consumer = new DefaultConsumer(channel) {int ing = 1;//消息到达触发这个方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);String msg = new String(body, "UTF-8");System.out.println("[resv2] msg:" + msg);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("[" + ing + "] done");ing++;channel.basicAck(envelope.getDeliveryTag(), false);}}};boolean autoAck = false;//自动应答channel.basicConsume(QUEUE_NAME, autoAck, consumer);}
}

(四) rabbitmq的消息确认机制(事务+confirm)

3.1 在rabbitmq中 我们可以通过持久化数据 解决rabbitmq服务器异常 的数据丢失问题

问题:生产者将消息发送出去之后,消息到底有没有到达 rabbitmq 服务器,默认的情况是不知道的;

两种方式:

​ AMQP实现了事务机制,

​ Confirm模式

3.2 事务机制

txSelect txCommit txRollback

txSelect: 用户将当前channel设置成transation模式

txCommit: 用于提交事务

txRollback: 回滚事务

3.2.1 TxSend:发送translation模式的事务

package com.myapp.demoesi.rabbitmq.tx;import com.myapp.demoesi.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class TxSend {private static final String QUEUE_NAME = "test_queue_tx";public static void main(String[] args) throws IOException, TimeoutException {Connection connection =  ConnectionUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME,false,false,false,null);String msg="Hello tx massage! ";try {channel.txSelect();channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());//int xx = 1/0;channel.txCommit();
}catch (Exception e){channel.txRollback();System.out.println("  send message rollback");
}channel.close();
connection.close();}
}

3.2.2 TxResv:正常接收

package com.myapp.demoesi.rabbitmq.tx;import com.myapp.demoesi.util.ConnectionUtils;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class txResv {private static final String QUEUE_NAME="test_queue_tx";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);System.out.println("resv[tx]"+new String(body,"utf-8"));}});}}

此种模式还是很耗时的,采用这种方式,降低了rabbitmq的吞吐量

3.3 Confirm模式

Confirm模式最大的好处在于他是异步

Nack消息

开启confirm模式

channel.confirmSelect()

编程模式:

1.普通 发一条 waitForConfirms()

2.批量的 发一批 waitForConfirms()

3.异步confirm模式:提供一个回调(我并不会)

3.3.1 Confirm单条: Send1

package com.myapp.demoesi.rabbitmq.confirm;import com.myapp.demoesi.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*普通模式*/
public class Send1 {private static final String QUEUE_NAME = "test_queue_confirm1";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {Connection connection =  ConnectionUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME,false,false,false,null);//生产者调用confirmSelect将channel设置为confirm模式  注意channel.confirmSelect();String msg="Hello confirm1 massage! ";channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());//int xx = 1/0;
if (!channel.waitForConfirms()){System.out.println("massage send failed");
}else{System.out.println("massage send ok");
}channel.close();connection.close();}
}

3.3.2 Confirm批量: Send2

package com.myapp.demoesi.rabbitmq.confirm;import com.myapp.demoesi.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*普通模式*/
public class Send2 {private static final String QUEUE_NAME = "test_queue_confirm1";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);//生产者调用confirmSelect将channel设置为confirm模式  注意channel.confirmSelect();String msg = "Hello confirm2 massage! ";for (int i = 0; i < 10; i++) {channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());}//int xx = 1/0;if (!channel.waitForConfirms()) {System.out.println("massage send failed");} else {System.out.println("massage send ok");}channel.close();connection.close();}
}

3.3.3 Resv:

package com.myapp.demoesi.rabbitmq.confirm;import com.myapp.demoesi.util.ConnectionUtils;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Resv {private static final String QUEUE_NAME="test_queue_confirm1";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);System.out.println("resv[confirm]"+new String(body,"utf-8"));}});}}

很简单吧?
你学会了吗?

rabbitmq教程前五种 来自牧马人老王相关推荐

  1. springBoot整合rabbitmq并测试五种常用模型

    之前我们记录了原生java代码使用rabbitmq的方法,很简单,类似于原生jdbc代码一样,将连接对象抽离出来作为工具类,生产者和消费者通过工具类获取连接对象,进而获取通道对象,再注册交换机或者是队 ...

  2. RabbitMQ介绍以及五种工作模式

    早期出现认证系统类似的提供认证服务; 出现了系统间的通信;并发的高需求 每个前端系统与认证系统的通信强耦合 传递消息,获取返回结果的过程,如果出现网络波动,整个传递数据,计算返回结果的流程重走一遍;需 ...

  3. rabbitmq常用的五种模型

    5种常用模型 一.基本消息模型 二丶work消息模型 三丶fanout广播模式/发布/订阅模式 四丶Routing路由模式(direct) 五丶Topics(主题模型) 第一种:简单模式 Simple ...

  4. RabbitMQ入门篇、介绍RabbitMQ常用的五种模式

    RabbitMQ 认识RabbitMQ AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为 ...

  5. 文件在另一个程序中打开,无法删除~【删除文件被占用问题】(保姆级教程,五种解决办法~)

    一,前言 当我们操作电脑要对一个文件进行删除的时候,很多时候都会提示你说你的文件被占用,不能够删除,这个时候我们就非常的苦恼了. 二,解决办法 方法一: 这说明这个文件被打开了,我们需要将这个文件关闭 ...

  6. html好看的按钮css样式大全,css3教程_五种漂亮样式,CSS3动画按钮效果

    CSS代码: .button01 { width: 200px; margin: 50px auto 20px auto; } .button01 a { display: block; height ...

  7. Kubernetes的五种最佳安全实践

    Kubernetes还不到6岁,但它已经是每个人最喜欢的容器编排程序了.云和基础设施监测公司Datadog发现Kubernetes主导着容器市场:"大约45%的运行容器的Datadog客户使 ...

  8. 五种windows密码设置及破解(转)

    在使用电脑的过程中,难免要与各类密码打交道,以下九种密码可能是大家用的最多的:BIOS密码.安装密码.用户密码.电源管理密码.屏保密码.开机密码.上网密码.分级审查密码和共享密码.今天,我们就谈谈这些 ...

  9. OpenCV学习系列教程第五篇:测试和提高代码的效率

    Opencv-Python学习系列教程第五篇 来自opencv-python官方学习文档,本人谨做翻译和注释,以及一些自己的理解 本文由作者翻译并进行代码验证,转载请注明出处~ 官方文档请参阅:htt ...

最新文章

  1. 这才是程序员的爱情观!?
  2. JVM内存管理------垃圾搜集器简介
  3. 06. 用css实现三角形
  4. linux ubuntu桌面进程,如何加快你的Ubuntu桌面性能
  5. UITableviewcell重用机制以及解决重绘出现的重叠现象
  6. 跟燕十八学习PHP-第十五天-php增删改查表数据
  7. 关于级联删除和级联修改
  8. SQL SERVER的SID和表的所有权问题
  9. SystemCenter2012SP1实践(0)本系列摘要目录
  10. 大数据分析的重要性体现在哪里
  11. TCP/IP协议分层模型以及数据的封装和分用
  12. Win7 蓝牙耳机无法使用
  13. 补码乘法、booth算法、Wallace树
  14. html5的指南针app,HTML5 App实战(五):指南针
  15. 【网站点击流数据分析】01-项目业务背景
  16. linux下RabbitMQ的配置和安装
  17. vue中使用antv/g6 绘制关系图、结构图
  18. [Luogu5042/UOJ #100][国家集训队互测2015]丢失的题面/ydc的题面
  19. mysql gh ost 对比_GitHub开源MySQL Online DDL工具gh-ost参数解析
  20. 汉明窗口Hamming Window

热门文章

  1. 图片去水印app-一键去除图片水印
  2. ROS2学习和使用SLAM算法(gmapping/cartographer/orb-slam等)
  3. 服务器监控功能(3种方案)
  4. PointWise网格划分软件简单使用
  5. AICPA CIMA四季度调查:超一半美国企业高管认为美国经济陷入衰退
  6. koa mysql_使用koa+mysql实现一个完整的项目
  7. ROS利用Python脚本实现多点自主导航
  8. 808协议服务器下发,基于部标JT/T 808协议及数据格式的GPS服务器
  9. android 删除字符串中的指定字符
  10. thinkphp漏洞总结