SpringBoot集成原生rocketmq-client
RocketMq提供了原生maven包:rocketmq-client,SpringBoot利用注入的方式可以很好的集成Rocket原生maven包。本文通过自定义jar包的形式对rocketmq-client进行封装,这样便于后期对消息队列的统一控制,包括client升级,幂等处理,日志打印,灰度处理等等。
1. 生产者包装
消息队列生产消息主要包括两部分:不同类型生产者(包括defaultMQProducer 或transactionMQProducer)和消息发送方式(同步消息,异步消息,顺序消息),因此我们考虑封RocketProducer用于集成不同类型生产者,produceMessage用于发送不同类型消息。
maven引用:
<dependencies><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>5.1.5.RELEASE</version><scope>provided</scope></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.4</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.6</version><scope>provided</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.25</version></dependency><dependency><groupId>io.zipkin.brave</groupId><artifactId>brave</artifactId><version>5.6.1</version><scope>provided</scope></dependency></dependencies>
Producer包装,内部实现方法:createTransactionProcucer和createDefaultMqProducer,用户可以根据自身需要选择对应的生产者类型。
public class RocketMqProducer implements Closeable {private DefaultMQProducer defaultMQProducer;private TransactionMQProducer transactionMQProducer;private AtomicInteger transactionIndex = new AtomicInteger(0);private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();private String namesrvAddr;private String produceGroup;@Overridepublic void close() throws IOException {if(this.defaultMQProducer!=null){this.defaultMQProducer.shutdown();}}public RocketMqProducer(){}public TransactionMQProducer getTransactionMQProducer() {return transactionMQProducer;}public DefaultMQProducer getDefaultMQProducer(){return this.defaultMQProducer;}public DefaultMQProducer initDefaultMqProducer() throws MQClientException {this.defaultMQProducer=createDefaultMqProducer(new DefaultMQProducer(this.produceGroup));this.defaultMQProducer.start();return this.defaultMQProducer;}public TransactionMQProducer initTransactionMQProducer() throws MQClientException {this.transactionMQProducer=createTransactionProcucer(new TransactionMQProducer("tx"+this.produceGroup));this.transactionMQProducer.start();return this.transactionMQProducer;}public DefaultMQProducer createDefaultMqProducer(DefaultMQProducer defaultMQProducer){defaultMQProducer.setNamesrvAddr(this.namesrvAddr);defaultMQProducer.setInstanceName(this.produceGroup);//客户端回调线程数,默认:Runtime.getRuntime.availableProcessors,当前cpu核心数defaultMQProducer.setClientCallbackExecutorThreads(4);//持久化消费者时间间隔defaultMQProducer.setPersistConsumerOffsetInterval(5000);defaultMQProducer.setSendMsgTimeout(10000);//生产发送消息超时时间defaultMQProducer.setCompressMsgBodyOverHowmuch(4096); //消费体容量上限,默认是4mdefaultMQProducer.setRetryAnotherBrokerWhenNotStoreOK(false); //是否在内部发送失败时重试另一个brokerdefaultMQProducer.setRetryTimesWhenSendFailed(2);//同步模式下重试次数限制defaultMQProducer.setRetryTimesWhenSendAsyncFailed(2); //异步模式下重试次数限制return defaultMQProducer;}public TransactionMQProducer createTransactionProcucer(TransactionMQProducer transactionMQProducer){transactionMQProducer.setNamesrvAddr(this.namesrvAddr);ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName("client-transaction-msg-check-thread");return thread;}});transactionMQProducer.setExecutorService(executorService);transactionMQProducer.setTransactionListener(new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object o) {int value = transactionIndex.getAndIncrement();int status = value % 3;localTrans.put(msg.getTransactionId(), status);return LocalTransactionState.UNKNOW;}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {Integer status = localTrans.get(msg.getTransactionId());if (null != status) {switch (status) {case 0:return LocalTransactionState.UNKNOW;case 1:return LocalTransactionState.COMMIT_MESSAGE;case 2:return LocalTransactionState.ROLLBACK_MESSAGE;}}return LocalTransactionState.COMMIT_MESSAGE;}});return transactionMQProducer;}public void setNamesrvAddr(String namesrvAddr){this.namesrvAddr=namesrvAddr;}public void setProduceGroup(String produceGroup){this.produceGroup=produceGroup;}public static void main(String[] args) {RocketMqProducer rocketMqProducer=new RocketMqProducer();try {rocketMqProducer.setProduceGroup("whp-rokectmq");rocketMqProducer.setNamesrvAddr("192.168.1.10:9876");rocketMqProducer.initDefaultMqProducer();} catch (MQClientException e) {}ProduceMessage produceMessage=new ProduceMessage();produceMessage.setDefaultMQProducer(rocketMqProducer.getDefaultMQProducer());try {produceMessage.sendMessage("whp-test","hello wolrd");} catch (Exception e) {throw new RuntimeException(e);}}
}
ProduceMessage封装,用于发送不同类型消息。
public class ProduceMessage {private DefaultMQProducer defaultMQProducer;private TransactionMQProducer transactionMQProducer;public void setTransactionMQProducer(TransactionMQProducer transactionMQProducer) {this.transactionMQProducer = transactionMQProducer;}public void setDefaultMQProducer(DefaultMQProducer defaultMQProducer) {this.defaultMQProducer = defaultMQProducer;}public ProduceMessage(){}public void sendMessage(String topic,String msg) throws Exception{this.sendMessage(topic,"",msg);}public void sendMessage(String topic,String tags,String msg) throws Exception{this.sendMessage(topic,tags,"",msg);}public void sendMessage(String topic,String tags,String keys,String msg) throws Exception{Message message=new Message(topic,tags,keys,msg.getBytes("utf-8"));this.defaultMQProducer.send(message);}public void sendAsyncMessage(String topic, String msg, SendCallback sendCallback) throws Exception {this.sendAsyncMessage(topic,"",msg,sendCallback);}public void sendAsyncMessage(String topic,String tags,String msg,SendCallback sendCallback) throws Exception{this.sendAsyncMessage(topic,tags,"",msg,sendCallback);}/**** @param topic* @param tags* @param keys 索引建,空格分隔,快速检索到消息* @param msg* @param sendCallback* @throws Exception*/public void sendAsyncMessage(String topic,String tags,String keys,String msg,SendCallback sendCallback) throws Exception{Message message=new Message(topic,tags,keys,msg.getBytes("utf-8"));message.isWaitStoreMsgOK();this.defaultMQProducer.send(message,sendCallback);}public void sendDelayMessage(String topic,String msg,int delayLevel) throws Exception{Message message=new Message(topic,msg.getBytes("utf-8"));message.setDelayTimeLevel(delayLevel);this.defaultMQProducer.send(message);}public void sendOrderMessage(String topic,String msg,String key) throws Exception{Message message=new Message(topic,msg.getBytes("utf-8"));this.defaultMQProducer.send(message,new SelectMessageQueueByHash(),key);}public void sendOrderDelayMessage(String topic,String msg,int delayLevel,String key) throws Exception{Message message=new Message(topic,msg.getBytes("utf-8"));message.setDelayTimeLevel(delayLevel);this.defaultMQProducer.send(message,new SelectMessageQueueByHash(),key);}public void sendTransactionMessage(String topic,String msg) throws Exception{Message message=new Message(topic,msg.getBytes("utf-8"));this.transactionMQProducer.sendMessageInTransaction(message,null);}
}
SpringBoot引入后,只要实例化两个实体:【RocketMqProducer】 配置RocketMq地址信息,同时创建需要的生产者类型;【ProduceMessage】用于发送消息。
@Slf4j
@Configuration
public class RocketMqConfig {@Value("${spring.application.name}")private String group;@Value("${rocketmq.server:}")private String namesServerAddress;@Autowiredprivate Map<String, IRocketConsumer> consumers;@Bean(name="rocketMqProducer",initMethod = "initDefaultMqProducer",destroyMethod = "close")public RocketMqProducer rocketMqProducer(){RocketMqProducer rocketMqProducer=new RocketMqProducer();rocketMqProducer.setProduceGroup(group);rocketMqProducer.setNamesrvAddr(namesServerAddress);return rocketMqProducer;}@Beanpublic ProduceMessage produceMessage(@Qualifier("rocketMqProducer") RocketMqProducer rocketMqProducer){ProduceMessage produceMessage=new ProduceMessage();produceMessage.setDefaultMQProducer(rocketMqProducer.getDefaultMQProducer());return produceMessage;}
}@Service
@Slf4j
public class RocketMqService {@Autowiredprivate ProduceMessage produceMessage;private static final String TEST_TOPIC="whp-test";public Boolean produceTest(String message) {try {produceMessage.sendMessage(TEST_TOPIC,message);} catch (Exception e) {return false;}return true;}
}
2. 消费者包装
每个服务当中可能涉及多个Topic的消费,为了避免每次消费时创建对应消费者,可以通过在方法上使用注解的方式建立消费者,具体执行则应用java反射技术。
(1)注解定义:
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MqClient {String groupName() default "";//默认组名String topic() default ""; //topicString tag() default "*";//tagint minThread() default 2;//最小线程int maxThread() default 5;//最大线程int batchSize() default 32; //批量大小32ConsumeMode consumeMode() default ConsumeMode.PUSH; //消费方式,默认推boolean isOrder() default false; //是否是顺序的
}public enum ConsumeMode {PULL,PUSH;
}
(2)方便注解的获取,定义消费者公共接口:
/*** 所有消费者实现该方法*/
public interface IRocketConsumer {
}
(3)消费者封装实现:
public class RoketMqConsumer implements Closeable {private Logger logger= LoggerFactory.getLogger(RoketMqConsumer.class);private String nameServerAddress;private String instanceName;private String consumerGroup;/*** 记录所有的消费者*/private Map<String,IRocketConsumer> rocketConsumerMap;private Map<String,Thread> threadMap;public void setInstanceName(String instanceName) {this.instanceName = instanceName;}public void setConsumerGroup(String consumerGroup) {this.consumerGroup = consumerGroup;}public void setRocketConsumerMap(Map<String, IRocketConsumer> rocketConsumerMap) {this.rocketConsumerMap = rocketConsumerMap;}public void setNameServerAddress(String nameServerAddress) {this.nameServerAddress = nameServerAddress;}public void initRocketMqConsumer(){threadMap=new HashMap<>();rocketConsumerMap.forEach((className,consumer)->{this.excuteTask(consumer);});}public RoketMqConsumer(){}private void excuteTask(IRocketConsumer consumer) {Method[] methods = consumer.getClass().getMethods();for(int i=0;i<methods.length;i++){Method method = methods[i];MqClient annotation = (MqClient)method.getAnnotation(MqClient.class);if(annotation!=null){String threadName=consumer.getClass().getSimpleName().concat(method.getName());Thread thread = new Thread(new Runnable() {//每个消费方法启动单独线程操作@Overridepublic void run() {try {RoketMqConsumer.this.createDefaultMqPushConsumer(annotation,consumer,method);} catch (MQClientException e) {logger.error("task执行失败,message:{}",e.getMessage());}}},threadName);thread.start();threadMap.put(threadName,thread);}}}public void createDefaultMqPushConsumer(MqClient mqClient, IRocketConsumer rocketConsumer, Method method) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(mqClient.groupName());consumer.setNamesrvAddr(this.nameServerAddress);if(this.instanceName==null){consumer.setInstanceName(this.consumerGroup);}else{consumer.setInstanceName(this.instanceName);}consumer.setConsumeThreadMin(mqClient.minThread());consumer.setConsumeThreadMax(mqClient.maxThread());//新的group创建时从哪里开始消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);consumer.subscribe(mqClient.topic(),mqClient.tag());if(mqClient.isOrder()){consumer.registerMessageListener(new MessageListenerOrderly(){@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {try {method.invoke(rocketConsumer,list);} catch (RocketMqException e) {throw new RuntimeException(e);} catch (Exception ex) {logger.info("消费异常,e:{}",ex.getMessage()); //稍后重试return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}return ConsumeOrderlyStatus.SUCCESS;}});}else{consumer.registerMessageListener(new MessageListenerConcurrently(){@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {try {logger.info("consumer recevie message:{}",new String(list.get(0).getBody(), StandardCharsets.UTF_8));method.invoke(rocketConsumer,list);} catch (RocketMqException e) {throw new RuntimeException(e);} catch (Exception ex) {logger.info("消费异常,e:{}",ex.getMessage()); //出现异常后面重试return ConsumeConcurrentlyStatus.RECONSUME_LATER;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});}consumer.start();}@Overridepublic void close() throws IOException {threadMap.forEach((n,thread)->{thread.stop();});}
}
3. 总结
为方便功能扩展,Spring boot针对原生客户端封装则是一个友好的方式,本文使用注解+反射的方法对客户端进行了简易封装,让RocketMq的引入变得更加简单。
SpringBoot集成原生rocketmq-client相关推荐
- RocketMQ有哪些消息类型?springboot如何整合rocketMQ
文章目录 1. rocketMQ的消息类型 1.1 消息的发送和接收方式 1.2 顺序消息 1.3 广播消息 1.4 延时消息 1.5 批量消息 1.6 过滤消息 1.7 事务消息 2. ACL权限控 ...
- rocketmq client端源码分析(1)-consumer实现
rocketmq客户端实现如果集成了spring-boot则写一个监听就可以实现业务逻辑.这个流程是这样的呢. 首先我们实现了监听接口RocketMQListener或者RocketMQReplyLi ...
- springboot集成es7.2自定义注解创建索引
各位好,我们把之前的坑填一下,我在上上篇文章中写了springboot集成es7 的方法,并且集成了es原生客户端 High Level Rest Client, 也说明了原因, 我用的版本较高, ...
- kafka(组件分析 整合springboot集成 实战)
kafka 组件 搭建 springboot集成 实战 kafka 组件 搭建 springboot集成 实战 1.应用场景 1.1 kafka场景 1.2 kafka特性 1.3 消息对比 1.4 ...
- 【SpringBoot高级篇】SpringBoot集成Elasticsearch搜索引擎
[SpringBoot高级篇]SpringBoot集成Elasticsearch搜索引擎 1. 什么是Elasticsearch? 2. 安装并运行Elasticsearch 2.1 拉取镜像 2.2 ...
- java常用日志框架日志门面及实现 SLF4J 、Jboss-logging 、JCL、Log4j、Logback、Log4j2、JUL,springboot集成 log4j、log4j2
java常用日志框架日志门面SLF4J .Jboss-logging .JCL.Log4j及实现 Logback.Log4j2.JUL,springboot集成 log4j.log4j2 .logba ...
- springboot集成elasticsearch7实现全文检索及分页
springboot集成elasticsearch7实现全文检索及分页 elasticsearch系列文章前面已经更新过三篇(https://blog.csdn.net/lsqingfeng/cate ...
- 一文搞定:SpringBoot 集成 Apollo 配置中心
公众号后台回复"面试",获取精品学习资料 扫描下方海报了解专栏详情 本文来源: http://www.mydlq.club/article/42/ <Java工程师面试突击( ...
- springboot集成mqtt
文章目录 一.MQTT说明 1.1.mqtt文档 1.2.MQTT消息服务质量 1.1.1.归纳 二.MQTT环境搭建 三.boot集成原生mqtt 1.1.项目结构 1.2.依赖 1.3.appli ...
最新文章
- 皮一皮:千万别得罪一个文科生...
- c语言 static 关键字的作用
- 分布式文档系统-document id的手动指定与自动生成两种方式解析(来自学习笔记:龙果学院ES课程)
- java 中 阻塞队列 非阻塞队列 和普通队列的区别
- 关于Linux下的umask
- 一文带你看懂物联网开源操作系统
- VC++6.0中内存泄漏检测 转
- SAP License:客户特别总帐统驭科目某天余额取数逻辑
- JDBC Driver
- 帖子如何实现显示浏览次数_我是如何一步步的在并行编程中将lock锁次数降到最低实现无锁编程...
- 推荐第三方SQL查询工具
- Spring MVC 3.0 深入
- iOS App 签名的原理 App 重签名(二)
- SAP License:ERP实施方案包括哪些内容?
- 用思维导图描绘5G场景
- 03_Linux ARM架构-安装elasticsearch 7.16-银河麒麟V10操作系统
- 3dsMax Biped骨骼缩放
- Python Flask基础教程(入门)
- idea 如何导入和导出自己设置好的主题背景
- 屡遭欧盟天价罚单,欧洲国家为何总看谷歌、苹果、Facebook不顺眼?