ActiveMQ 原理与应用
在介绍activemq之前,先简单介绍JMS,它是J2EE的13个规范之一,提供的是消息中间件的规范。
JMS包括以下基本构件:
连接工厂,是客户用来创建连接的对象,ActiveMQ提供的是ActiveMQConnectionFactory;
连接connection;
会话session,是发送和接收消息的上下文,用于创建消息生产者,消息消费者,相比rocketMQ会话session是提供事务性的;
目的地destination,指定生产消息的目的地和消费消息的来源对象;
生产者、消费者,由会话创建的对象,顾名思义。
消息通信机制
点对点模式,每个消息只有1个消费者,它的目的地称为queue队列;
发布/订阅模式,每个消息可以有多个消费者,而且订阅一个主题的消费者,只能消费自它订阅之后发布的消息。
消息确认机制
Session.AUTO_ACKNOWLEDGE,直接使用receive方法。
Session.CLIENT_ACKNOWLEDGE,通过消息的acknowledge 方法确认消息。
Session.DUPS_ACKNOWLEDGE,该选择只是会话迟钝第确认消息的提交。如果JMS provider 失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMS provider 必须把消息头的JMSRedelivered 字段设置为true。
package com.java.activemq;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;/*** 消息生产者* @author xing.liu**/
public class JMSProducer {private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址private static final int SENDNUM=10; // 发送的消息数量public static void main(String[] args) {ConnectionFactory connectionFactory; // 连接工厂Connection connection = null; // 连接Session session; // 会话 接受或者发送消息的线程Destination destination; // 消息的目的地MessageProducer messageProducer; // 消息生产者// 实例化连接工厂connectionFactory=new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);try {connection=connectionFactory.createConnection(); // 通过连接工厂获取连接connection.start(); // 启动连接// 创建Session,第一个参数是开启事务true,第二个参数是消息确认机制属性session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); destination=session.createQueue("FirstQueue1"); // 创建消息队列messageProducer=session.createProducer(destination); // 创建消息生产者sendMessage(session, messageProducer); // 发送消息session.commit();} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();} finally{if(connection!=null){try {connection.close();} catch (JMSException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}}/*** 发送消息* @param session* @param messageProducer* @throws Exception*/public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{for(int i=0;i<JMSProducer.SENDNUM;i++){TextMessage message=session.createTextMessage("ActiveMQ 发送的消息"+i);System.out.println("发送消息:"+"ActiveMQ 发送的消息"+i);messageProducer.send(message);}}
}
原理很多,但是把原理映射在实践中,就更容易懂了。一起看看下面的这个小例子,通过代码回想原理,最后运行显示到控制台上,验证结论。
新建一个普通Java Project,引进activemq的jar包,build到项目中。新建生产者producer、消费者consumer,如下:
package com.java.activemq;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;/*** 消息消费者* @author xing.liu**/
public class JMSConsumer {private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址public static void main(String[] args) {ConnectionFactory connectionFactory; // 连接工厂Connection connection = null; // 连接Session session; // 会话 接受或者发送消息的线程Destination destination; // 消息的目的地MessageConsumer messageConsumer; // 消息的消费者// 实例化连接工厂connectionFactory=new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);try {connection=connectionFactory.createConnection(); // 通过连接工厂获取连接connection.start(); // 启动连接session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Sessiondestination=session.createQueue("FirstQueue1"); // 创建连接的消息队列messageConsumer=session.createConsumer(destination); // 创建消息消费者while(true){TextMessage textMessage=(TextMessage)messageConsumer.receive(100000);if(textMessage!=null){System.out.println("收到的消息:"+textMessage.getText());}else{break;}}} catch (JMSException e) {// TODO Auto-generated catch blocke.printStackTrace();} }
}
这个demo是简单的点对点模式下,目的地对象是queue队列,代码中可以看出session.createQueue;从代码中可以联想到上边的原理,也是简而易懂。messageConsumer.receive(100000)这使用的是receive模式,这种模式让消费者consumer不断的问producer要消息,很乱的。
安装了ActiveMQ服务器,在浏览器打开后台监控,地址 http://127.0.0.1:8161/admin/
可以在Queues中看到消息未被消费的条数,已消费的条数。后台监控系统都是一样的,类似rocketMQ,我就不介绍了,多点点就能明白。
所以生产中我们常用的是监听listener模式,看看下边这个带监听的consumer:
package com.java.activemq;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;/*** 消息消费者* @author xing**/
public class JMSConsumer2 {private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址public static void main(String[] args) {ConnectionFactory connectionFactory; // 连接工厂Connection connection = null; // 连接Session session; // 会话 接受或者发送消息的线程Destination destination; // 消息的目的地MessageConsumer messageConsumer; // 消息的消费者// 实例化连接工厂connectionFactory=new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD, JMSConsumer2.BROKEURL);try {connection=connectionFactory.createConnection(); // 通过连接工厂获取连接connection.start(); // 启动连接session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Sessiondestination=session.createQueue("FirstQueue1"); // 创建连接的消息队列messageConsumer=session.createConsumer(destination); // 创建消息消费者messageConsumer.setMessageListener(new Listener()); // 注册消息监听} catch (JMSException e) {// TODO Auto-generated catch blocke.printStackTrace();} }
}
package com.java.activemq;import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;/*** 消息监听* @author xing**/
public class Listener implements MessageListener{@Overridepublic void onMessage(Message message) {try {System.out.println("收到的消息:"+((TextMessage)message).getText());} catch (JMSException e) {e.printStackTrace();}}
}
这种消息监听模式,只是多一个注册消息监听messageConsumer.setMessageListener(new Listener());
ActiveMQ 原理与应用相关推荐
- 分布式消息通信ActiveMQ原理-持久化策略-笔记
2019独角兽企业重金招聘Python工程师标准>>> 消息的持久化策略分析 消息持久性对于可靠消息传递来说是一种比较好的方法, 即时发送者和接受者不是同时在线或者消息中心在发送者发 ...
- activemq原理 java_面试题activemq的原理详解
在面试中经常会碰到activemq的问题,相信小伙伴们都挺烦恼的吧,这里小编精心整理了一些关于activemq的面试题,下面快一起来看看吧. 一.activemq是什么? activeMQ是一种开源的 ...
- ActiveMQ原理分析
持久化消息和非持久化消息的发送策略 消息同步发送和异步发送 ActiveMQ支持同步.异步两种发送模式将消息发送到broker上.同步发送过程中,发送者发送一条消息会阻塞直到broker反馈一个确认消 ...
- ActiveMQ原理教程
传输原理 ActiveMQ中Consumer特征详解与优化 主题:ActiveMQ源码架构解析第一节 架构设计:系统间通信(21)--ActiveMQ的安装与使用 架构设计:系统间通信(22)--提高 ...
- activemq原理_ActiveMQ(二)
上一篇文章对ActiveMQ有了初步认识,了解了其大致原理.接下来说说实战中ActiveMQ的应用. 幸运的金荷,公众号:梁同学CodingActiveMQ(一) 八.Broker *上一篇讲到,启动 ...
- activemq原理 java_分布式消息通信ActiveMQ原理-持久化策略-笔记
消息的持久化策略分析 消息持久性对于可靠消息传递来说是一种比较好的方法, 即时发送者和接受者不是同时在线或者消息中心在发送者发送消息后宕机了,在消息中心重启后仍然可以将消息发送出去. 消息持久性的原理 ...
- 从ActiveMQ原理到实例
在介绍activemq之前,先简单介绍JMS,它是J2EE的13个规范之一,提供的是消息中间件的规范. JMS包括以下基本构件: 连接工厂,是客户用来创建连接的对象,ActiveMQ提供的是Activ ...
- ActiveMQ 原理分析—消息持久化篇
消息持久化策略 背景 当消息发送者(provider)发送消息后消费者(consumer)没启动.故障, 或者消息中心在发送者发送消息后宕机了.ActiveMQ是如何保证消息不丢失,消费者能够正常的消 ...
- 分布式消息通信ActiveMQ原理 分析一
本章知识点: 1. 持久化消息和非持久化消息的发送策略2. 消息的持久化方案及实践3. 消费端消费消息的原理 持久化消息与非持久化消息的发送策略 消息同步发送和异步发送 同步发送过程中,发送者发送一条 ...
最新文章
- 用python打造一款文件搜索工具,所有功能自己定义!
- x264各个版本下载
- 汇编语言--call 指令
- Exception in thread AWT-EventQueue-0 java.lang.IllegalThreadStateException
- 双稳态电路的两个稳定状态是什么_干货|常见的脉冲电路到底有何用途和特点?终于了解了!|脉冲|晶体管|双稳|单稳|振荡器...
- 骗子收录网站源码PHP搭建
- 备忘录AIX主机下用SHELL脚本编写FTP传某个目录下的文件到LINUX主机
- ChaiNext:比特币再度考验5W关口
- bzoj 1016: [JSOI2008]最小生成树计数
- Android 修改屏幕亮度
- SEODONG Medical推出创新干眼症治疗仪器,深受海外消费者好评
- 流落在帝都的那些80后北漂IT人,你们的未来在哪儿
- LOJ#10064. 「一本通 3.1 例 1」黑暗城堡
- foo/bar/baz/qux
- STM32F103C8T6基础开发教程(HAL库)—点亮第一颗LED灯
- Javascript禁止鼠标和键盘
- Eclipse护眼背景及字体设置
- 支付宝小程序(支付)
- 面试题---C/C++与单片机
- 关于“无穷”的概念---数学笔记“无穷”