MetaQ (全称 Metamorphosis )是一个高性能、高可用、可扩展的分布式消息中间件 ,MetaQ 具有消息存储顺序写、吞吐量大和支持本地和XA 事务等特性,适用于大吞吐量、顺序消息、广播和日志数据传输等场景, MetaQ在阿里巴巴各个子公司被广泛应用,每天转发 250 亿 + 条消息。主要应用于异步解耦, Mysql 数据复制,收集日志等场景 。。
我是做移动互联网广告系统的,在工作中有很多场景使用到了MetaQ,例如:广告的存储、效果数据的上报,多机房扣费等都需要依赖MetaQ,由于公司已经使用MeatQ作为消息中间件的时间很久了,已经有了模板,所以很多的时候就是直接拿来使用,对里面为什么做这样那样的封装没有去深入的了解,刚好这段时间有空就去看了看源码,给自己总结沉淀一下,做到不仅知道怎么用,还要知道为什么这样做。
一、生产者
发送消息是由生产者MessageProduce触发,MessageProduce从MessageSessionFactory中创建出来具体实现如下:

 MetaClientConfig metaClientConfig = new MetaClientConfig();ZKConfig zkConfig = new ZkUtils.ZKConfig()zkConfig.zkConnect = "127.0.0.1:2181";metaClientConfig.setZkConfig(zkConfig)MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(metaClientConfig );// create producer,强烈建议使用单例MessageProducer producer = sessionFactory.createProducer();// publish topicfinal String topic = "meta-test";producer.publish(topic);SendResult sendResult = producer.sendMessage(new Message(topic, "xxxx".getBytes()));

我们可以看出MessageProduce 是通过工厂创建的 ,MetaMessageSessionFactory需要一个参数就MetaClientConfig这个类,MetaClientConfig里面是什么了?MetaClientConfig中有个工具类ZkUtils,通过名字就知道是和zk交互的类,
和zk交互我们知道创建一个客户端需要几个参数:
1. zkConnect (zk的ip地址)
2. zkSessionTimeoutMs(zk的会话超时时间)
3. zkConnectionTimeoutMs(zk的连接超时时间)
4. zkSyncTimeMs(zk心跳时间)

我们知道Sping IOC容器就是用来创建发现维护类与类之间的关系的,MetaQ团队当然也想到了这个,那他是这么实现的呢?
在com.taobao.metamorphosis.client.extension.spring 中有如下几个类:
1. AbstractMetaqMessageSessionFactory
2. DefaultMessageListener
3. JavaSerializationMessageBodyConverter
4. MessageBodyConverter
5. MessageBuilder
6. MessageListenerContainer
7. MetaqMessage
8. MetaqMessageSessionFactoryBean
9. MetaqTemplate
10. MetaqTopic
11. XAMetaqMessageSessionFactoryBean

先来看看MetaqTemplate这个类,这个类提供发送消息的方法,

    public SendResult send(MessageBuilder builder) throws InterruptedException {Message msg = builder.build(this.messageBodyConverter);final String topic = msg.getTopic();MessageProducer producer = this.getOrCreateProducer(topic);try {return producer.sendMessage(msg);}catch (MetaClientException e) {return new SendResult(false, null, -1, ExceptionUtils.getFullStackTrace(e));}}

我们发现使用send方法的时候还要MessageBodyConverter的类,这个类是用来做什么的呢?:

    /*** Convert a message object to byte array.* * @param body* @return* @throws MetaClientException*/public byte[] toByteArray(T body) throws MetaClientException;/*** Convert a byte array to message object.* * @param bs* @return* @throws MetaClientException*/public T fromByteArray(byte[] bs) throws MetaClientException;

可以看到这里定义了两个方法 用来把消息转换为二进制,以及从二进制中恢复消息,我们知道数据在网络上传输都是二进制的方式进行传输的,这个接口很方便我们做扩展,灵活的实现自己的转换规则,比如采用其他序列化协议,如protobufs,hessian等等,当然如果你不想实现自己的消息转换类,这里提供了一个实现类:JavaSerializationMessageBodyConverter

public class JavaSerializationMessageBodyConverter implements MessageBodyConverter<Serializable> {JavaSerializer serializer = new JavaSerializer();JavaDeserializer deserializer = new JavaDeserializer();@Overridepublic byte[] toByteArray(Serializable body) throws MetaClientException {try {return this.serializer.encodeObject(body);}catch (IOException e) {throw new MetaClientException(e);}}@Overridepublic Serializable fromByteArray(byte[] bs) throws MetaClientException {try {return (Serializable) this.deserializer.decodeObject(bs);}catch (IOException e) {throw new MetaClientException(e);}}}

JavaSerializationMessageBodyConverter 实现了MessageBodyConverter ,对消息体进行序列化和反序列化。
send方法中还调用了getOrCreateProducer我们来看看这个方法:

 public MessageProducer getOrCreateProducer(final String topic) {if (!this.shareProducer) {FutureTask<MessageProducer> task = this.producers.get(topic);if (task == null) {task = new FutureTask<MessageProducer>(new Callable<MessageProducer>() {@Overridepublic MessageProducer call() throws Exception {MessageProducer producer = MetaqTemplate.this.messageSessionFactory.createProducer();producer.publish(topic);if (!StringUtils.isBlank(MetaqTemplate.this.defaultTopic)) {producer.setDefaultTopic(MetaqTemplate.this.defaultTopic);}return producer;}});FutureTask<MessageProducer> oldTask = this.producers.putIfAbsent(topic, task);if (oldTask != null) {task = oldTask;}else {task.run();}}try {MessageProducer producer = task.get();return producer;}catch (ExecutionException e) {throw ThreadUtils.launderThrowable(e.getCause());}catch (InterruptedException e) {Thread.currentThread().interrupt();}}else {if (this.sharedProducer == null) {synchronized (this) {if (this.sharedProducer == null) {this.sharedProducer = this.messageSessionFactory.createProducer();if (!StringUtils.isBlank(this.defaultTopic)) {this.sharedProducer.setDefaultTopic(this.defaultTopic);}}}}this.sharedProducer.publish(topic);return this.sharedProducer;}throw new IllegalStateException("Could not create producer for topic '" + topic + "'");}

看到熟悉的 messageSessionFactory ,创建生产者的时候就需要这个工厂类来创建,我们在回过头来看看MetaqTemplate这个类:

    private MessageSessionFactory messageSessionFactory;private String defaultTopic;private MessageBodyConverter<?> messageBodyConverter;private boolean shareProducer = false;private volatile MessageProducer sharedProducer;

有好几个属性,我们只要传入一个MessageSessionFactory 及MessageBodyConverter对象即可:
到此我们就可以创建MetaqTemplate这个类了:

先来创建MessageSessionFactory ,这里使用MetaqMessageSessionFactoryBean这个实现类:

<!--  message session factory -->  <bean id="sessionFactory" class="com.taobao.metamorphosis.client.extension.spring.MetaqMessageSessionFactoryBean">  <property name="zkConnect" value="127.0.0.1:2181"/>  <property name="zkSessionTimeoutMs" value="30000"/>  <property name="zkConnectionTimeoutMs" value="30000"/>  <property name="zkSyncTimeMs" value="5000"/>
</bean>  

这样我们就创建了一个工厂类了,然后我们需要创建一个消息转换类,这里使用默认实现类:
JavaSerializationMessageBodyConverter

<!--  message body converter using java serialization. -->  <bean id="messageBodyConverter"    class="com.taobao.metamorphosis.client.extension.spring.JavaSerializationMessageBodyConverter"/>  

我们需要创建MetaqTemplate元素都准备好了,可以创建MetaqTemplate类了:

<!--  template to send messages. -->  <bean id ="metaqTemplate" class="com.taobao.metamorphosis.client.extension.spring.MetaqTemplate">      <property name="messageSessionFactory" ref="sessionFactory"/>  <property name="messageBodyConverter" ref="messageBodyConverter"/>  </bean>  

可以发送消息了:

final String topic = "date";
final SendResult sendResult =
template.send(MessageBuilder.withTopic(topic).withBody(new Date()); 

二、消费者
看完生产者我在来看看消费者,接受消息是由消费者MessageConsume触发,MessageConsume从MessageSessionFactory中创建出来具体实现如下:

   MetaClientConfig metaClientConfig = new MetaClientConfig();ZKConfig zkConfig = new ZkUtils.ZKConfig()zkConfig.zkConnect = "127.0.0.1:2181";metaClientConfig.setZkConfig(zkConfig)MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(metaClientConfig );// subscribed topicfinal String topic = "meta-test";// consumer groupfinal String group = "meta-example";// create consumer,强烈建议使用单例MessageConsumer consumer = sessionFactory.createConsumer(new ConsumerConfig(group));// subscribe topicconsumer.subscribe(topic, 1024 * 1024, new MessageListener() {public void recieveMessages(Message message) {System.out.println("Receive message " + new String(message.getData()));}public Executor getExecutor() {// Thread pool to process messages,maybe null.return null;}});// complete subscribeconsumer.completeSubscribe();}

消费者也是通过MetaMessageSessionFactory 去创建的,然后调用subscribe 实现消息的订阅接受及处理,我们来看看这个类MessageListenerContainer:

@Overridepublic void afterPropertiesSet() throws Exception {log.info("Start to initialize message listener container.");if (this.subscribers != null) {Set<MessageConsumer> consumers = new HashSet<MessageConsumer>();for (Map.Entry<MetaqTopic, ? extends DefaultMessageListener<?>> entry : this.subscribers.entrySet()) {final MetaqTopic topic = entry.getKey();final DefaultMessageListener<?> listener = entry.getValue();if (topic == null) {throw new IllegalArgumentException("Topic is null");}if (StringUtils.isBlank(topic.getTopic())) {throw new IllegalArgumentException("Blank topic");}MessageConsumer consumer = this.getMessageConsumer(topic);if (consumer == null) {throw new IllegalStateException("Get or create consumer failed");}log.info("Subscribe topic=" + topic.getTopic() + " with group=" + topic.getGroup());if (listener.getMessageBodyConverter() == null) {listener.setMessageBodyConverter(this.messageBodyConverter);}consumer.subscribe(topic.getTopic(), topic.getMaxBufferSize(), listener);consumers.add(consumer);}for (MessageConsumer consumer : consumers) {consumer.completeSubscribe();}}log.info("Initialize message listener container successfully.");}

可以看到这个类在初始完成后会创建一个消费者,然后调用消费者的subscribe方法订阅和处理消息,创建这个类需要下面这个几个类:MetaqTopic 、DefaultMessageListener或者其子类,下面我来分别看看这个两个类:

MetaqTopic 主要有如下几个属性:

    private ConsumerConfig consumerConfig = new ConsumerConfig();private String topic;private int maxBufferSize = 1024 * 1024;

我们知道创建消费者的时候需要指定topic及每次消费的大小,MetaqTopic 这个就是用来指定这些属性值的

再来看看 DefaultMessageListener:

    @Overridepublic void recieveMessages(Message message) throws InterruptedException {if (this.messageBodyConverter != null) {try {T body = (T) this.messageBodyConverter.fromByteArray(message.getData());this.onReceiveMessages(new MetaqMessage<T>(message, body));}catch (Exception e) {log.error("Convert message body from byte array failed,msg id is " + message.getId() + " and topic is "+ message.getTopic(), e);message.setRollbackOnly();}}else {this.onReceiveMessages(new MetaqMessage<T>(message, null));}}public abstract void onReceiveMessages(MetaqMessage<T> msg);

这个类实现了recieveMessages处理消息的方法,在方法中我们调用了MessageBodyConverter 这个类转换消息,然后调用了onReceiveMessages 这个方法,需要我们自己来实现真正的消息处理,也就是我们需要实现DefaultMessageListener这个类中onReceiveMessages 方法来处理消息就可以了。

这里简单进行一个实现:

import com.taobao.metamorphosis.client.extension.spring.DefaultMessageListener;
import com.taobao.metamorphosis.client.extension.spring.MetaqMessage;
import java.util.Date;  /** * Process date messages listener. *  * @author dennis *  */
public class DateMessageListener extends DefaultMessageListener<Date> {  @Override  public void onReceiveMessages(MetaqMessage<Date> msg) {  Date date = msg.getBody();  System.out.println("receive date message:" + date);  }  }  

这样我们所需的要素就都有了,现在看看怎么用spring来配置:

<!--  topics to be subscribed. -->  <bean id = "dateTopic" class="com.taobao.metamorphosis.client.extension.spring.MetaqTopic">  <!-- consumer group -->  <property name="group" value="testGroup"/>  <!--  topic -->  <property name="topic" value="date"/>  <!--  max buffer size to fetch messages -->  <property name="maxBufferSize" value="16384"/>  </bean>  
 <!--  message listener -->  <bean id= "messageListener" class="com.taobao.metamorphosis.example.spring.DateMessageListener">  <!--  threads to process these messages. -->  <property name="processThreads" value="10"/>  </bean>  
<!--  listener container to subscribe topics -->  <bean id ="listenerContainer" class="com.taobao.metamorphosis.client.extension.spring.MessageListenerContainer">   <property name="messageSessionFactory" ref="sessionFactory"/>  <property name="messageBodyConverter" ref="messageBodyConverter"/>  <property name="subscribers">  <map>  <entry key-ref="dateTopic" value-ref="messageListener"/>  </map>  </property>  </bean>  

只要配置好这些后就可以通过我们实现的监听器DateMessageListener 自动处理消息了。

写到这差不多就整理完了,代码比较多,只找了几个关键的地方进行分析,着重点落在了这么结合Spring使用。由于能力有限制,写到不到位的地方多多见谅。。

我是一只小蜗牛,虽然速度慢,但我一直在努力向前爬。。

MetaQ源码阅读及与Spring结合使用相关推荐

  1. 【源码阅读】看Spring Boot如何自动装配ActiveMQ收发组件

    源于好奇,我研究了一下Spring Boot中ActiveMQ相关组件是如何自动装配的.记录如下. 源码路径 本文以Spring Boot 1.5.10.RELEASE版本为例. 在spring-bo ...

  2. Spring源码阅读 源码环境搭建(一)

    ring 源码阅读的搭建(一) 一 下载spring源码 进入官方网页:https://spring.io/projects/spring-framework 进入相关的github位置,下载zip包 ...

  3. spring源码阅读(3)-- 容器启动之BeanFactoryPostProcessor

    接着上文<spring源码阅读(2)-- 容器启动之加载BeanDefinition>,当spring加载完所有BeanDefinition时,并不会马上去创建bean,而是先配置bean ...

  4. 【Spring 源码阅读】Spring IoC、AOP 原理小总结

    Spring IoC.AOP 原理小总结 前言 版本约定 正文 Spring BeanFactory 容器初始化过程 IoC 的过程 bean 完整的创建流程如下 AOP 的过程 Annotation ...

  5. Spring源码阅读(一)——整体结构

    Spring 总共大约有20个模块,由1300多个不同的文件构成. Spring源码阅读可以分为三个路线:IOC,AOP,外部组件. 个人主页:tuzhenyu's page 原文地址:Spring源 ...

  6. Spring源码阅读之bean对象的创建过程

    Spring源码阅读之bean对象的创建过程 ​ Spring是通过IOC容器来管理对象的,该容器不仅仅只是帮我们创建了对象那么简单,它负责了对象的整个生命周期-创建.装配.销毁.这种方式成为控制反转 ...

  7. Spring Boot 2.0系列文章(四):Spring Boot 2.0 源码阅读环境搭建

    前提 前几天面试的时候,被问过 Spring Boot 的自动配置源码怎么实现的,没看过源码的我只能投降��了. 这不,赶紧来补补了,所以才有了这篇文章的出现,Spring Boot 2. 0 源码阅 ...

  8. Spring框架源码阅读读后感

    Spring框架源码阅读读后感 spring的bean生命周期,从上到下依次完成,本人在阅读源码时总结得出此步骤,当然,spring是一个强大的框架,其对bean的生命周期管理只是其中的一部分,本人也 ...

  9. spring 源码阅读笔记-从浅到深的解析

    目录 第一章 源码安装 文章目录 目录 前言 一.spring源码下载 二.构建源码及使用 1.源码构建 2.使用构建源码 总结 前言 由于spring的源码常常以语言和高深莫测的地位存在,而源码解析 ...

最新文章

  1. Win8/Win8.1常见错误代码的解决方法汇总
  2. Unity Dotween官方案例学习
  3. openssh for windows
  4. JAVA NIO学习一:NIO简介、NIOIO的主要区别
  5. 如何做好Code Review
  6. SecureCRT报错ImportError: No module named itertools(解决方案无法复现)
  7. 理请求时出现未知错误.服务器返回的状态码为: 500,react-native
  8. flink和kafka区别_Apache Flink和Kafka入门
  9. 越老越值钱,除了程序员!
  10. 蚂蚁三面题目(java开发岗):Java锁机制+JVM+线程池+事务+中间件
  11. JimStoneAjax如何跟DWR竞争?
  12. 解决dpdk中出现IOMMU not found的问题
  13. matlab定义变量var,设置变量数据类型 - MATLAB setvartype - MathWorks 中国
  14. 应用时间序列案例-基于R语言
  15. 深圳大学物计算机黄yilin,中国科学引文数据库(CSCD)收录本校教师论文情况.doc...
  16. 基于opencv与android的手机远程监控
  17. 软件开发一般学什么?
  18. 赤裸裸的统计学:除去大数据的枯燥外衣,呈现真实的数字之美 - 电子书下载 -(百度网盘 高清版PDF格式)...
  19. TI用2000万小时给出使用氮化镓的理由
  20. VMware Workstation之MacOS系统安装

热门文章

  1. 有用的URL,大量的干货!!!!!!!!!!!!
  2. 使用 Petalinux 定制 Linux 系统
  3. 通过Nginx实现直播软件源码的推流和拉流
  4. 阿里云acp考试多少分考过?阿里云acp考试怎么查询?
  5. 合肥工业大学正版MATLAB,合工大Matlab上机题目4
  6. AI绘画-Midjourney基础1-突破想象的界限:掌握文本引导的图像生成技巧
  7. 17_2Representation Learning and Generative Learning Deep Convolutional_Progressive Growing Style GAN
  8. i lost my chenyi
  9. 【水文模型】08 水文研究热点
  10. ios 侧滑返回停顿_如何使用 iOS 系统自带侧滑返回功能