RocketMq提供了原生maven包:rocketmq-client,SpringBoot利用注入的方式可以很好的集成Rocket原生maven包。本文通过自定义jar包的形式对rocketmq-client进行封装,这样便于后期对消息队列的统一控制,包括client升级,幂等处理,日志打印,灰度处理等等。

1. 生产者包装

消息队列生产消息主要包括两部分:不同类型生产者(包括defaultMQProducer 或transactionMQProducer)和消息发送方式(同步消息,异步消息,顺序消息),因此我们考虑封RocketProducer用于集成不同类型生产者,produceMessage用于发送不同类型消息。

maven引用:

<dependencies><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>5.1.5.RELEASE</version><scope>provided</scope></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.4</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.6</version><scope>provided</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.25</version></dependency><dependency><groupId>io.zipkin.brave</groupId><artifactId>brave</artifactId><version>5.6.1</version><scope>provided</scope></dependency></dependencies>

Producer包装,内部实现方法:createTransactionProcucer和createDefaultMqProducer,用户可以根据自身需要选择对应的生产者类型。

public class RocketMqProducer implements Closeable {private DefaultMQProducer defaultMQProducer;private TransactionMQProducer transactionMQProducer;private AtomicInteger transactionIndex = new AtomicInteger(0);private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();private String namesrvAddr;private String produceGroup;@Overridepublic void close() throws IOException {if(this.defaultMQProducer!=null){this.defaultMQProducer.shutdown();}}public RocketMqProducer(){}public TransactionMQProducer getTransactionMQProducer() {return transactionMQProducer;}public DefaultMQProducer getDefaultMQProducer(){return this.defaultMQProducer;}public DefaultMQProducer initDefaultMqProducer() throws MQClientException {this.defaultMQProducer=createDefaultMqProducer(new DefaultMQProducer(this.produceGroup));this.defaultMQProducer.start();return this.defaultMQProducer;}public TransactionMQProducer initTransactionMQProducer() throws MQClientException {this.transactionMQProducer=createTransactionProcucer(new TransactionMQProducer("tx"+this.produceGroup));this.transactionMQProducer.start();return this.transactionMQProducer;}public DefaultMQProducer createDefaultMqProducer(DefaultMQProducer defaultMQProducer){defaultMQProducer.setNamesrvAddr(this.namesrvAddr);defaultMQProducer.setInstanceName(this.produceGroup);//客户端回调线程数,默认:Runtime.getRuntime.availableProcessors,当前cpu核心数defaultMQProducer.setClientCallbackExecutorThreads(4);//持久化消费者时间间隔defaultMQProducer.setPersistConsumerOffsetInterval(5000);defaultMQProducer.setSendMsgTimeout(10000);//生产发送消息超时时间defaultMQProducer.setCompressMsgBodyOverHowmuch(4096); //消费体容量上限,默认是4mdefaultMQProducer.setRetryAnotherBrokerWhenNotStoreOK(false); //是否在内部发送失败时重试另一个brokerdefaultMQProducer.setRetryTimesWhenSendFailed(2);//同步模式下重试次数限制defaultMQProducer.setRetryTimesWhenSendAsyncFailed(2); //异步模式下重试次数限制return defaultMQProducer;}public TransactionMQProducer createTransactionProcucer(TransactionMQProducer transactionMQProducer){transactionMQProducer.setNamesrvAddr(this.namesrvAddr);ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName("client-transaction-msg-check-thread");return thread;}});transactionMQProducer.setExecutorService(executorService);transactionMQProducer.setTransactionListener(new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object o) {int value = transactionIndex.getAndIncrement();int status = value % 3;localTrans.put(msg.getTransactionId(), status);return LocalTransactionState.UNKNOW;}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {Integer status = localTrans.get(msg.getTransactionId());if (null != status) {switch (status) {case 0:return LocalTransactionState.UNKNOW;case 1:return LocalTransactionState.COMMIT_MESSAGE;case 2:return LocalTransactionState.ROLLBACK_MESSAGE;}}return LocalTransactionState.COMMIT_MESSAGE;}});return transactionMQProducer;}public void setNamesrvAddr(String namesrvAddr){this.namesrvAddr=namesrvAddr;}public void setProduceGroup(String produceGroup){this.produceGroup=produceGroup;}public static void main(String[] args) {RocketMqProducer rocketMqProducer=new RocketMqProducer();try {rocketMqProducer.setProduceGroup("whp-rokectmq");rocketMqProducer.setNamesrvAddr("192.168.1.10:9876");rocketMqProducer.initDefaultMqProducer();} catch (MQClientException e) {}ProduceMessage produceMessage=new ProduceMessage();produceMessage.setDefaultMQProducer(rocketMqProducer.getDefaultMQProducer());try {produceMessage.sendMessage("whp-test","hello wolrd");} catch (Exception e) {throw new RuntimeException(e);}}
}

ProduceMessage封装,用于发送不同类型消息。

public class ProduceMessage {private DefaultMQProducer defaultMQProducer;private TransactionMQProducer transactionMQProducer;public void setTransactionMQProducer(TransactionMQProducer transactionMQProducer) {this.transactionMQProducer = transactionMQProducer;}public void setDefaultMQProducer(DefaultMQProducer defaultMQProducer) {this.defaultMQProducer = defaultMQProducer;}public ProduceMessage(){}public void sendMessage(String topic,String msg) throws Exception{this.sendMessage(topic,"",msg);}public void sendMessage(String topic,String tags,String msg) throws Exception{this.sendMessage(topic,tags,"",msg);}public void sendMessage(String topic,String tags,String keys,String msg) throws Exception{Message message=new Message(topic,tags,keys,msg.getBytes("utf-8"));this.defaultMQProducer.send(message);}public void sendAsyncMessage(String topic, String msg, SendCallback sendCallback) throws Exception {this.sendAsyncMessage(topic,"",msg,sendCallback);}public void sendAsyncMessage(String topic,String tags,String msg,SendCallback sendCallback) throws Exception{this.sendAsyncMessage(topic,tags,"",msg,sendCallback);}/**** @param topic* @param tags* @param keys 索引建,空格分隔,快速检索到消息* @param msg* @param sendCallback* @throws Exception*/public void sendAsyncMessage(String topic,String tags,String keys,String msg,SendCallback sendCallback) throws Exception{Message message=new Message(topic,tags,keys,msg.getBytes("utf-8"));message.isWaitStoreMsgOK();this.defaultMQProducer.send(message,sendCallback);}public void sendDelayMessage(String topic,String msg,int delayLevel) throws Exception{Message message=new Message(topic,msg.getBytes("utf-8"));message.setDelayTimeLevel(delayLevel);this.defaultMQProducer.send(message);}public void sendOrderMessage(String topic,String msg,String key) throws Exception{Message message=new Message(topic,msg.getBytes("utf-8"));this.defaultMQProducer.send(message,new SelectMessageQueueByHash(),key);}public void sendOrderDelayMessage(String topic,String msg,int delayLevel,String key) throws Exception{Message message=new Message(topic,msg.getBytes("utf-8"));message.setDelayTimeLevel(delayLevel);this.defaultMQProducer.send(message,new SelectMessageQueueByHash(),key);}public void sendTransactionMessage(String topic,String msg) throws Exception{Message message=new Message(topic,msg.getBytes("utf-8"));this.transactionMQProducer.sendMessageInTransaction(message,null);}
}

SpringBoot引入后,只要实例化两个实体:【RocketMqProducer】 配置RocketMq地址信息,同时创建需要的生产者类型;【ProduceMessage】用于发送消息。

@Slf4j
@Configuration
public class RocketMqConfig {@Value("${spring.application.name}")private String group;@Value("${rocketmq.server:}")private String namesServerAddress;@Autowiredprivate Map<String, IRocketConsumer> consumers;@Bean(name="rocketMqProducer",initMethod = "initDefaultMqProducer",destroyMethod = "close")public RocketMqProducer rocketMqProducer(){RocketMqProducer rocketMqProducer=new RocketMqProducer();rocketMqProducer.setProduceGroup(group);rocketMqProducer.setNamesrvAddr(namesServerAddress);return rocketMqProducer;}@Beanpublic ProduceMessage produceMessage(@Qualifier("rocketMqProducer") RocketMqProducer rocketMqProducer){ProduceMessage produceMessage=new ProduceMessage();produceMessage.setDefaultMQProducer(rocketMqProducer.getDefaultMQProducer());return produceMessage;}
}@Service
@Slf4j
public class RocketMqService {@Autowiredprivate ProduceMessage produceMessage;private static final String TEST_TOPIC="whp-test";public Boolean produceTest(String message) {try {produceMessage.sendMessage(TEST_TOPIC,message);} catch (Exception e) {return false;}return true;}
}

2. 消费者包装

每个服务当中可能涉及多个Topic的消费,为了避免每次消费时创建对应消费者,可以通过在方法上使用注解的方式建立消费者,具体执行则应用java反射技术。

(1)注解定义:

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MqClient {String groupName() default "";//默认组名String topic() default ""; //topicString tag() default "*";//tagint minThread() default 2;//最小线程int maxThread() default 5;//最大线程int batchSize() default 32; //批量大小32ConsumeMode consumeMode() default ConsumeMode.PUSH; //消费方式,默认推boolean isOrder() default false; //是否是顺序的
}public enum ConsumeMode {PULL,PUSH;
}

(2)方便注解的获取,定义消费者公共接口:

/*** 所有消费者实现该方法*/
public interface IRocketConsumer {
}

(3)消费者封装实现:

public class RoketMqConsumer implements Closeable {private Logger logger= LoggerFactory.getLogger(RoketMqConsumer.class);private String nameServerAddress;private String instanceName;private String consumerGroup;/*** 记录所有的消费者*/private Map<String,IRocketConsumer> rocketConsumerMap;private Map<String,Thread> threadMap;public void setInstanceName(String instanceName) {this.instanceName = instanceName;}public void setConsumerGroup(String consumerGroup) {this.consumerGroup = consumerGroup;}public void setRocketConsumerMap(Map<String, IRocketConsumer> rocketConsumerMap) {this.rocketConsumerMap = rocketConsumerMap;}public void setNameServerAddress(String nameServerAddress) {this.nameServerAddress = nameServerAddress;}public void initRocketMqConsumer(){threadMap=new HashMap<>();rocketConsumerMap.forEach((className,consumer)->{this.excuteTask(consumer);});}public RoketMqConsumer(){}private void excuteTask(IRocketConsumer consumer) {Method[] methods = consumer.getClass().getMethods();for(int i=0;i<methods.length;i++){Method method = methods[i];MqClient annotation = (MqClient)method.getAnnotation(MqClient.class);if(annotation!=null){String threadName=consumer.getClass().getSimpleName().concat(method.getName());Thread thread = new Thread(new Runnable() {//每个消费方法启动单独线程操作@Overridepublic void run() {try {RoketMqConsumer.this.createDefaultMqPushConsumer(annotation,consumer,method);} catch (MQClientException e) {logger.error("task执行失败,message:{}",e.getMessage());}}},threadName);thread.start();threadMap.put(threadName,thread);}}}public void createDefaultMqPushConsumer(MqClient mqClient, IRocketConsumer rocketConsumer, Method method) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(mqClient.groupName());consumer.setNamesrvAddr(this.nameServerAddress);if(this.instanceName==null){consumer.setInstanceName(this.consumerGroup);}else{consumer.setInstanceName(this.instanceName);}consumer.setConsumeThreadMin(mqClient.minThread());consumer.setConsumeThreadMax(mqClient.maxThread());//新的group创建时从哪里开始消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);consumer.subscribe(mqClient.topic(),mqClient.tag());if(mqClient.isOrder()){consumer.registerMessageListener(new MessageListenerOrderly(){@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {try {method.invoke(rocketConsumer,list);} catch (RocketMqException e) {throw new RuntimeException(e);} catch (Exception ex) {logger.info("消费异常,e:{}",ex.getMessage());  //稍后重试return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}return ConsumeOrderlyStatus.SUCCESS;}});}else{consumer.registerMessageListener(new MessageListenerConcurrently(){@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {try {logger.info("consumer recevie message:{}",new String(list.get(0).getBody(), StandardCharsets.UTF_8));method.invoke(rocketConsumer,list);} catch (RocketMqException e) {throw new RuntimeException(e);} catch (Exception ex) {logger.info("消费异常,e:{}",ex.getMessage());  //出现异常后面重试return ConsumeConcurrentlyStatus.RECONSUME_LATER;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});}consumer.start();}@Overridepublic void close() throws IOException {threadMap.forEach((n,thread)->{thread.stop();});}
}

3. 总结

为方便功能扩展,Spring boot针对原生客户端封装则是一个友好的方式,本文使用注解+反射的方法对客户端进行了简易封装,让RocketMq的引入变得更加简单。

SpringBoot集成原生rocketmq-client相关推荐

  1. RocketMQ有哪些消息类型?springboot如何整合rocketMQ

    文章目录 1. rocketMQ的消息类型 1.1 消息的发送和接收方式 1.2 顺序消息 1.3 广播消息 1.4 延时消息 1.5 批量消息 1.6 过滤消息 1.7 事务消息 2. ACL权限控 ...

  2. rocketmq client端源码分析(1)-consumer实现

    rocketmq客户端实现如果集成了spring-boot则写一个监听就可以实现业务逻辑.这个流程是这样的呢. 首先我们实现了监听接口RocketMQListener或者RocketMQReplyLi ...

  3. springboot集成es7.2自定义注解创建索引

    各位好,我们把之前的坑填一下,我在上上篇文章中写了springboot集成es7 的方法,并且集成了es原生客户端  High  Level Rest Client, 也说明了原因, 我用的版本较高, ...

  4. kafka(组件分析 整合springboot集成 实战)

    kafka 组件 搭建 springboot集成 实战 kafka 组件 搭建 springboot集成 实战 1.应用场景 1.1 kafka场景 1.2 kafka特性 1.3 消息对比 1.4 ...

  5. 【SpringBoot高级篇】SpringBoot集成Elasticsearch搜索引擎

    [SpringBoot高级篇]SpringBoot集成Elasticsearch搜索引擎 1. 什么是Elasticsearch? 2. 安装并运行Elasticsearch 2.1 拉取镜像 2.2 ...

  6. java常用日志框架日志门面及实现 SLF4J 、Jboss-logging 、JCL、Log4j、Logback、Log4j2、JUL,springboot集成 log4j、log4j2

    java常用日志框架日志门面SLF4J .Jboss-logging .JCL.Log4j及实现 Logback.Log4j2.JUL,springboot集成 log4j.log4j2 .logba ...

  7. springboot集成elasticsearch7实现全文检索及分页

    springboot集成elasticsearch7实现全文检索及分页 elasticsearch系列文章前面已经更新过三篇(https://blog.csdn.net/lsqingfeng/cate ...

  8. 一文搞定:SpringBoot 集成 Apollo 配置中心

    公众号后台回复"面试",获取精品学习资料 扫描下方海报了解专栏详情 本文来源: http://www.mydlq.club/article/42/ <Java工程师面试突击( ...

  9. springboot集成mqtt

    文章目录 一.MQTT说明 1.1.mqtt文档 1.2.MQTT消息服务质量 1.1.1.归纳 二.MQTT环境搭建 三.boot集成原生mqtt 1.1.项目结构 1.2.依赖 1.3.appli ...

最新文章

  1. 皮一皮:千万别得罪一个文科生...
  2. c语言 static 关键字的作用
  3. 分布式文档系统-document id的手动指定与自动生成两种方式解析(来自学习笔记:龙果学院ES课程)
  4. java 中 阻塞队列 非阻塞队列 和普通队列的区别
  5. 关于Linux下的umask
  6. 一文带你看懂物联网开源操作系统
  7. VC++6.0中内存泄漏检测 转
  8. SAP License:客户特别总帐统驭科目某天余额取数逻辑
  9. JDBC Driver
  10. 帖子如何实现显示浏览次数_我是如何一步步的在并行编程中将lock锁次数降到最低实现无锁编程...
  11. 推荐第三方SQL查询工具
  12. Spring MVC 3.0 深入
  13. iOS App 签名的原理 App 重签名(二)
  14. SAP License:ERP实施方案包括哪些内容?
  15. 用思维导图描绘5G场景
  16. 03_Linux ARM架构-安装elasticsearch 7.16-银河麒麟V10操作系统
  17. 3dsMax Biped骨骼缩放
  18. Python Flask基础教程(入门)
  19. idea 如何导入和导出自己设置好的主题背景
  20. 屡遭欧盟天价罚单,欧洲国家为何总看谷歌、苹果、Facebook不顺眼?

热门文章

  1. 手机如何将Word文档转换为PDF扫描文件
  2. throw inside finally block
  3. 原生JS无缝轮播图(左右切换、导航跟随)
  4. 测试结果类型为: ESTJ
  5. java录制声音(采集声卡音频数据)
  6. 东野圭吾梦幻花读后感_让梦幻花绽放在心灵深处——读东野圭吾《梦幻花》有感...
  7. Java读取单个字符
  8. java微信开发者模式开发_微信开发之启用开发者模式(三)
  9. 长三角24城市绿色全要素生产率、产业协同面板数据(2011-2019年)
  10. 计算机网络隧道工程,隧道工程