RabbitMQ是用于应用程序之间或者程序的不同组件之间的消息通信,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量,也就是生产-消费模型,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。

RabbitMQ   设置持久化, 如果生产端发送消息,消费端突然挂掉了,消息还存在队列,等消费端重启了,消费端能获取到消息。

RabbitMQ的两大核心组件是Exchange和Queue。

说明:

Exchange又称交换器,它接受消息和路由信息,然后将消息发送给消息队

列。

Queue是一个具名缓冲区,它们代表一组消费者应用程序保存消息。

接下来介绍Producer 和 Consumer 两种类型

1.生产者

第一步:实现消息类,主要是保存调用哪个路由key和交换器(也是走哪条线)、要传的数据

/** * 消息 * */
public class RabbitMessage implements Serializable {private static final long serialVersionUID = -6487839157908352120L;private Class<?>[] paramTypes;// 参数类型private String exchange;// 交换器private Object[] params;private String routeKey;// 路由keypublic RabbitMessage() {}public RabbitMessage(String exchange, String routeKey, Object... params) {this.params = params;this.exchange = exchange;this.routeKey = routeKey;}@SuppressWarnings("rawtypes")public RabbitMessage(String exchange, String routeKey, String methodName,Object... params) {this.params = params;this.exchange = exchange;this.routeKey = routeKey;int len = params.length;Class[] clazzArray = new Class[len];for (int i = 0; i < len; i++)clazzArray[i] = params[i].getClass();this.paramTypes = clazzArray;}public byte[] getSerialBytes() {byte[] res = new byte[0];ByteArrayOutputStream baos = new ByteArrayOutputStream();ObjectOutputStream oos;try {oos = new ObjectOutputStream(baos);oos.writeObject(this);oos.close();res = baos.toByteArray();} catch (IOException e) {e.printStackTrace();}return res;}public String getRouteKey() {return routeKey;}public String getExchange() {return exchange;}public void setExchange(String exchange) {this.exchange = exchange;}public void setRouteKey(String routeKey) {this.routeKey = routeKey;}public Class<?>[] getParamTypes() {return paramTypes;}public Object[] getParams() {return params;}
}

第二步:实现生产者前提,是要设置调用安装RabbitMQ的IP、端口、线程数、交换器类型等

配置一个global.properties文件

通过SpringMvc把global.properties文件读进来

<!-- 注入属性文件 --><bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"><property name="locations"><list><value>classpath:global.properties</value></list></property></bean>

第三步:实现生产者类,这里面主要用到的技术有java.util.concurrent.Executors(上一篇有介绍过)实现线程执行

1)实现连接管理

/** * 连接管理 * */
public class ConnectionManage {private volatile Connection connection;public ConnectionManage(String rmqServerIP, int rmqServerPort)throws IOException {ConnectionFactory cf = new ConnectionFactory();cf.setHost(rmqServerIP);cf.setPort(rmqServerPort);connection = cf.newConnection();}@SuppressWarnings("finally")public Channel createChannel() {Channel channel = null;try {channel = connection.createChannel();} catch (ShutdownSignalException e1) {} catch (IOException e) {}return channel;}public void shutdown() throws IOException {if (connection != null)connection.close();}
<pre name="code" class="java">

这边可以设置监听,是否连接断掉connection.addShutdownListener(shutdoenListner);//如果断掉,可以继续连接

2)实现生产者

在SpringMVC配置文件XML中加入,把global.properties文件读出来并设置值

 <bean id="rmqProducer" class="cn.test.rabbitmq.RmqProducer"><property name="rmqServerIP" value="${rmq.ip}" /><property name="rmqServerPort" value="${rmq.port}" /><property name="threadPoolNum" value="${rmq.producer.num}" /><property name="exchange" value="testExchange" /><property name="exchangeType" value="topic" /></bean>
/*** 生产着**/
public class RmqProducer implements InitializingBean,DisposableBean
{private String rmqServerIP;//ip地址private int rmqServerPort;//端口    private int threadPoolNum;//线程数private String exchangeType;//类型private String exchange;//交换器    private ConnectionManage connectManage;private Channel channel;             /*** 初始化*/@Overridepublic void afterPropertiesSet() throws Exception {//创建连接管理器connectManage=new ConnectionManage(rmqServerIP,rmqServerPort);boolean durable=true;//是否持久化boolean autoDelete=false;//是否自动删除Channel channel=connectManage.createChannel();channel.exchangeDeclare(exchange, exchangeType, durable,autoDelete,null);}/*** 发送信息* @param msg*/public void sendMessage(final RabbitMessage  msg){channel.basicPublish(msg.getExchange(),msg.getRouteKey(),MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getSerialBytes());}/*** * @throws Exception*/@Overridepublic void destroy() throws Exception {connectManage.shutdown();}public String getRmqServerIP() {return rmqServerIP;}public void setRmqServerIP(String rmqServerIP) {this.rmqServerIP = rmqServerIP;}public String getExchangeType() {return exchangeType;}public void setExchangeType(String exchangeType) {this.exchangeType = exchangeType;}public int getRmqServerPort() {return rmqServerPort;}public void setRmqServerPort(int rmqServerPort) {this.rmqServerPort = rmqServerPort;}public String getExchange() {return exchange;}public void setExchange(String exchange) {this.exchange = exchange;}}

说明:

1). channel.exchangeDeclare(exchange, exchangeType, durable,autoDelete,null);

exchange:交换机名字

exchangeType类型

durable是否持久化

autoDelete不使用时是否自动删除

2).channel.basicPublish(msg.getExchange(),msg.getRouteKey(),MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getSerialBytes());

exchange:交换机名字

routeKey:路由关键字

msg.getSerialBytes() :消息主体

Channel是线程好全的,但是最好是每个线程里用自己的Channel,因为在单个Channel里排队是有可能慢一些的。所以我们可以采用多线程处理,每个线程对应Channel,这样速度会比较快,具体实现:

java.util.concurrent.ExecutorService多线程的管理和实现,上一篇有介绍

ConcurrentHashMap允许多个修改操作并发进行,其关键在于使用了锁分离技术

//每个线程对应Channel

               //启动线程池channelManager=new ConcurrentHashMap<Thread, Channel>();threadPool=Executors.newFixedThreadPool(threadPoolNum, new ThreadFactory(){@Overridepublic Thread newThread(Runnable r) {Thread thread=new Thread(r);Channel channel = connectManage.createChannel();if(channel!=null)channelManager.put(thread, channel);//创建线程和channel对应起来return thread;}            });

//采用自己的Channel来发送消息

                 Runnable runnable=new Runnable() {@Overridepublic void run() {Thread thread=Thread.currentThread();Channel channel=channelManager.get(thread);if(channel!=null)channelManager.put(thread, channel);try {channel.basicPublish(msg.getExchange(),msg.getRouteKey(),MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getSerialBytes());} catch (IOException e) {e.printStackTrace();}}};threadPool.execute(runnable);

测试类:

 @Testpublic void test() throws IOException{String exchange="testExchange";交换器String routeKey="testQueue";//队列//参数Map<String,Object> param=new HashMap<String, Object>();param.put("data","hello");RabbitMessage  msg=new RabbitMessage(exchange,routeKey, param);//发送消息rmqProducer.sendMessage(msg);}

2.消费者

采用多线程进行处理消息,这样每个线程对应Channel,处理速度会比较快。

在SpringMVC配置文件XML中加入,把global.properties文件读出来并设置值

 <bean id="consumer" class="cn.test.rabbitmq.RmqConsumerSerial"><property name="rmqServerIp" value="${rmq.ip}"></property><property name="rmqServerPort" value="${rmq.port}"/><property name="exchange" value="testExchange"></property><property name="threadPoolNum" value="${rmq.producer.num}"/><property name="queueName" value="testQueue"></property><property name="exchangeType" value="topic"/><property name="qos" value="1"></property></bean>

实现消费者

@Overridepublic void afterPropertiesSet() throws Exception {start();}@Overridepublic void destroy() throws Exception {stop();}public void start() throws IOException{connectManage=new ConnectionManage(rmqServerIp,rmqServerPort,threadPoolNum); //向rmq创建exchange,queueboolean durable=true,exclusive=false,autoDelete=false;Channel channel=connectManage.createChannel();channel.exchangeDeclare(exchange, exchangeType, durable,autoDelete,null);channel.queueDeclare(queueName, durable, exclusive, autoDelete, null);channel.queueBind(queueName, exchange, routeKey);channel.close();//启动线程池channelManager=new HashMap<Thread, Channel>();threadPool=Executors.newFixedThreadPool(threadPoolNum, new ThreadFactory(){@Overridepublic Thread newThread(Runnable r) {Thread thread=new Thread(r);try {Channel channel = connectManage.createChannel();if(channel!=null){channelManager.put(thread, channel);channel.basicQos(qos);}} catch (IOException e) {logger.warn(e.getMessage());                  }return thread;}            });for(int i=0;i<threadPoolNum;i++)threadPool.execute(getRunable());}protected  Runnable getRunable(){return new Runnable() {@Overridepublic void run() {Thread thread=Thread.currentThread();final Channel channel=channelManager.get(thread);boolean autoAck=false;DefaultConsumer consumer =  new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{long deliveryTag = envelope.getDeliveryTag();boolean suc=false;ObjectInputStream ois=new ObjectInputStream(new ByteArrayInputStream(body));                           try {Object obj=ois.readObject();RabbitMessage rmqMsg = RabbitMessage.class.cast(obj);        Object[] objs=rmqMsg.getParams();System.out.println("rmqMsg.getParams()=="+rmqMsg.getParams()[0].toString());suc=true;} catch (Exception e) {}            if(suc)channel.basicAck(deliveryTag, false);elsechannel.basicNack(deliveryTag, false,true);}};         try {channel.basicConsume(queueName, autoAck, consumer);                }catch (IOException e) {logger.warn(e.getMessage());} }};}

说明:

1)channel.queueDeclare().getQueue() 得到的是一个随机queue,断开连接后即删除。

2)channel.basicQos(qos) 设置最大的投送字节数

3)channel.basicNack(deliveryTag, false,true);false代表失败,true是要重新发送,

结果:

可以通过反射机制进行调用具体的类,来根据不同的队列来处理不同的信息。

总结:

这边RabbitMQ与SpringMVC配置,没用到SpringMVC里的RabbitMQ,下一篇会介绍。

RabbitMQ与SpringMVC集成并实现发送消息和接收消息(持久化)方案一相关推荐

  1. 简单Rabbitmq 发送消息和接收消息

    简单Rabbitmq 发送消息和接收消息 1 先在Rabbitmq配置文件中预先创建好交换器,队列,路由等信息. 2 创建生产者发送消息 @Autowiredprivate RabbitTemplat ...

  2. 【Java从0到架构师】RocketMQ 使用 - 发送消息、接收消息、特殊消息

    RocketMQ 消息中间件 入门案例 NameServer 地址 发送消息 同步发送消息 异步发送消息 一次性发送消息 生产者组.消息封装 接收消息 消费方式:推式消费.拉式消费 消息方式:集群模式 ...

  3. 企业微信 接收消息服务器,接收消息与事件

    [TOC] 关于接收消息 为了能够让自建应用和企业微信进行双向通信,企业可以在应用的管理后台开启接收消息模式. 开启接收消息模式的企业,需要提供可用的接收消息服务器URL. 开启接收消息模式后,用户在 ...

  4. C# 企业微信:开启消息接受接收消息推送消息

    前言:微信吧!接触的人都会100%各种踩坑,就算同样东西去年做过,今年来一样踩坑,因为太多你稍微不记得一点点的细节就能让你研究N久.为此,我要把这个过程详细的记录下来. 一.开启消息接受 1.拿到企业 ...

  5. 浅析 postMessage 方法介绍、如何接收数据(监听message事件及其属性介绍)、使用postMessage的安全注意事项、具体使用方式(父子页面如何互发消息、接收消息)

    postMessage 是 html5 引入的API,postMessage()方法允许来自不同源的脚本采用异步方式进行有效的通信,可以实现跨文本文档.多窗口.跨域消息传递,多用于窗口间数据通信,这也 ...

  6. Jenkins配置邮件通知服务,完整教程(含发送成功无法接收邮件处理方案)

    前言:涂涂改改,查查找找,不知不觉又三点了,废话不多说直接上教程. 准备事项: 1.安装Email Extension Plugin插件 2.安装Date Parameter插件(一个可使用的时间函数 ...

  7. ActiveMQ 发送和接收消息

    一.添加 jar 包 <dependency><groupId>org.apache.activemq</groupId><artifactId>act ...

  8. Netty:实现同步发送并接收消息的一种方式

    Netty创建通信服务时使用Nio异步通信, 配置代码(bootstrap.channel(NioSocketChannel.class);),要怎样实现这样一个同步发送消息并接收消息功能,虽然这样做 ...

  9. 消费者接收消息过程?

    1.Producer先连接到Broker,建立连接Connection,开启一个信道(Channel). 2.向Broker请求消费响应的队列中消息,可能会设置响应的回调函数. 3.等待Broker回 ...

最新文章

  1. swift -charts框架雷达图
  2. 【Opencv实战】这个印章“神器”够牛,节省了时间提高了效率,厉害~(附完整源码)
  3. php测试框架,PHPUnit使用
  4. comsol分析时总位移代表什么_【仿真百科】什么是结构力学?
  5. 【EF学习笔记07】----------加载关联表的数据 贪婪加载
  6. Android 驱动(3)---Android驱动开发知识储备
  7. HTML 5--Grouping and Nesting Styles
  8. 邮件代理发送功能更新和Exchange 6月份补丁更新提示
  9. WPF开发时光之痕日记本(一)——富文本编辑器
  10. _beginthread 和 CreateThread 区别
  11. WIN7中如何卸载IE8或IE9
  12. 苹果wifi网速慢怎么办_三步解决家里网速慢的问题
  13. 多线程编程实例(使用CompletableFuture)
  14. win10 添加打印机
  15. 弘辽科技:淘宝新链接要怎么补流量?有没有提升新链接的办法?
  16. linux查看电源状态命令,Linux下查看电池损耗等信息
  17. 计算机大数据的前景方向_未来计算机大数据的发展方向
  18. SSM ==> 超市管理系统(mysql)
  19. [USACO Oct08] 挖水井题解
  20. 如何用代码实现iPhone手机软件注销和手机重启

热门文章

  1. 关于python计算生态的命名、哪个选项的描述是正确的_以下选项是 Python 计算生态检索主站的是( )。_成本管理会计答案_学小易找答案...
  2. 如何停止ping ip -t?
  3. 2022N1叉车司机考试题模拟考试题库及答案
  4. MySQL——homework03
  5. TensorFlow 2.0深度学习算法实战 第十章 卷积神经网络
  6. 如何改变网文大神们一睁眼就欠读者6000字的局面
  7. 微服务混合云部署实践
  8. html设置使图片自动移动,css怎么移动图片?
  9. nacos服务注册流程
  10. RFID让企业的固定资产实现了极速盘点