大家好,今天来聊一聊 RocketMQ 客户端消息消费失败,怎么办?

下面是 RocketMQ 推出模式的一段代码:

public static void main(String[] args) throws InterruptedException, MQClientException {
Tracer tracer = initTracer();
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new ConsumeMessageOpenTracingHookImpl(tracer));
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try{
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
}catch (Exception e){
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}

从这段代码可以看出,消费者消费完消息后会返回一个消费状态,那消费状态有哪些呢?参见类 ConsumeConcurrentlyStatus 中定义:

  • 消费成功,返回 CONSUME_SUCCESS;

  • 消费失败,返回 RECONSUME_LATER。

下面代码就是返回上面两个状态的逻辑,对于消费状态,如果返回 null,会给它赋值 RECONSUME_LATER,处理逻辑如下:

//ConsumeRequest 类
public void run() {
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
//省略部分逻辑
long beginTimestamp = System.currentTimeMillis();
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
//省略部分逻辑
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {}
//省略部分逻辑
if (null == status) {
//省略日志
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
//省略部分逻辑
if (!processQueue.isDropped()) {
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {}}

这部分代码的 UML 类图如下:

上面代码中的 processConsumeResult 方法就是消费失败后客户端的处理逻辑:

public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
//ackIndex 初始值是 Integer.MAX_VALUE;
int ackIndex = context.getAckIndex();
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
//省略部分逻辑
break;
case RECONSUME_LATER:
ackIndex = -1;
//省略部分逻辑
break;
default:
break;
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
//广播模式下这里只打印日志
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
//发送回 Broker 失败的消息,5s 后再次消费
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
//更新本地保存的偏移量
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}

1 消费成功

上面的代码逻辑中,如果消费成功,ackIndex 变量的值就是消息数量减少少 1,所以上面的 switch 逻辑是不会执行的,因为广播模式下,只是打印一段日志(没有其他逻辑),而集群模式下,for 循环的起始 i 变量已经等于消息数量,循环里面的代码不会执行

因此,如果消息消费成功,只会走最下面的逻辑,更新本地保存的消息偏移量。

2 消费失败

ackIndex 变量值等于 -1。

2.1 广播模式

在消费失败的情况下,广播模式的代码只是打印了一段日志,之后更新了本地保存的消息偏移量,因此我们知道广播模式消息消费失败后就不会重新消费了,相当于丢弃了消息

2.2 集群模式

从上面代码的 for 循环中,会把所有的消息都发送回去去 Broker,这样这批消息还能再次被拉取到进行消费。

对于发送给 Broker 失败的消息,会延迟 5s 后再次消费。代码如下:

private void submitConsumeRequestLater(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue
) {
this.scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
ConsumeMessageConcurrentlyService.this.submitConsumeRequest(msgs, processQueue, messageQueue, true);
}
}, 5000, TimeUnit.MILLISECONDS);}

更新本地保存的消息偏移量时,会从消息列表中发送回 Broker 失败的消息先删除掉。

注意:从上面逻辑可以看到,在拉取到一批消息进行消费时,只要有一条消息消费失败,这批消息都会进行重试,因此消费端做好幂等是必要的

下面再看一下发送失败的消息给 Broker 的代码是,发送消息时,请求的 code 码是 CONSUMER_SEND_MSG_BACK。根据这个请求码就能找 Broker 端的处理逻辑。

如果发送回 Broker 时抛出异常,需要重新发送一个新的消息,这里有四点需要注意:

  • 新消息的 Topic 变成【 %RETRY% + consumerGroup】;

  • 新消息的 RETRY_TOPIC 这个属性赋值为之前的 Topic;

  • 新消息的重试次数属性加 1;

  • 新消息的 DELAY 属性等于重试次数 + 3.

public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
} catch (Exception e) {
//Topic 变成 %RETRY% + consumerGroup
Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
String originMsgId = MessageAccessor.getOriginMessageId(msg);
MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
//RETRY_TOPIC 赋值
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
//重试次数+1
MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
//最大重试次数
MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
//DELAY = 重试次数 + 3
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
this.mQClientFactory.getDefaultMQProducer().send(newMsg);
} finally {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
}
}

2.3 Broker 处理

上面已经讲过,对于处理失败的消息,消费端会发送回 Broker,不过这里有一点需要注意,发送回 Broker 时,消息的 Topic 变成【"%RETRY%" + namespace + "%" + 原始 topic】,封装逻辑在源码 ClientConfig.withNamespace。

根据请求码 CONSUMER_SEND_MSG_BACK 可以定位到 Broker 的处理逻辑在类 SendMessageProcessor,方法 asyncConsumerSendMsgBack。

2.3.1 进死信队列

如果重试次数超过了最大重试次数(默认 16 次),或者 delayLevel 值小于 0,则消息进死信队列,死信队列的 Topic 为【"%DLQ%" + 消费组】,代码如下:

//asyncConsumerSendMsgBack 方法
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP;
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE | PermName.PERM_READ, 0);
msgExt.setDelayTimeLevel(0);
} 

2.3.2 发送 CommitLog

如果延迟级别(DELAY)等于 0,则延迟级别就等于重试次数加 3。

有个地方需要注意,发送到延迟队列的消息重新进行了封装,封装这个消息用的并不是客户端发来的那个消息,而是从 CommitLog 根据偏移量查找的,代码如下:

MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
if (null == msgExt) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("look message by offset failed, " + requestHeader.getOffset());
return CompletableFuture.completedFuture(response);
}

如果查询失败,就会给客户端返回系统错误。

这里有个重要的细节,这个消息写入 CommitLog 时,会判断 DELAY 是否大于 0,如果大于 0,就会修改 Topic。代码如下:

//CommitLog 类 asyncPutMessage 方法
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
//从源码看,这里最大值是18
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
//queueId = delayLevel - 1
int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId)
}

这里把 Topic 修改为 SCHEDULE_TOPIC_XXXX,供延时队列来调度。进入延时队列后,延时队列会按照下面的时间进行调度:

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

复制代码

上面代码可以看到,延时消息的调度有 18 个等级,最小的 1s,最大的 2h。而从下面的代码我们可以看到,调度使用第三个等级开始的:

if (0 == delayLevel) { delayLevel = 3 + msgExt.getReconsumeTimes();}msgExt.setDelayTimeLevel(delayLevel);

复制代码

2.3.3 延时队列

延时队列的代码逻辑在类 ScheduleMessageService,这里的 start 方法触发延时队列的调度,而 start 方法的业务入口在 BrokerStartup 的初始化。

首先,会计算出每个延时等级对应的延时时间(处理到 ms 级别),放到 delayLevelTable,它是一个 ConcurrentHashMap,然后创建一个核心线程数等于 18 的定时线程池,依次对每个级别的延时进行调度。这个任务启动后,会每 100ms 执行一次。代码如下:

public void start() {
if (started.compareAndSet(false, true)) {
this.load();
this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));
//省略异步
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
//省略异步
this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
}
}
//省略}

调度逻辑中,首先根据 Topic 和 queueId 找到对应的消费队列,然后从里面连续读取消息:

public void executeOnTimeup() {
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
//省略空处理
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
//省略空处理
long nextOffset = this.offset;
try {
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
//CQ_STORE_UNIT_SIZE = 20,因为 ConsumeQueue 中一个元素占 20 字节
for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
//offset占8个字节
long offsetPy = bufferCQ.getByteBuffer().getLong();
//消息大小占4个字节
int sizePy = bufferCQ.getByteBuffer().getInt();
//ConsumeQueue中tagsCode是一个投递时间点
long tagsCode = bufferCQ.getByteBuffer().getLong();
if (cq.isExtAddr(tagsCode)) {
if (cq.getExt(tagsCode, cqExtUnit)) {
tagsCode = cqExtUnit.getTagsCode();
} else {
//can't find ext content.So re compute tags code.
long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
}
long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown = deliverTimestamp - now;
if (countdown > 0) {
//时间未到,等待下次调度
this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
return;
}
MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
//省略事务消息
boolean deliverSuc;
//同步异步都有,只保留同步代码
deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);
}
nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
} catch (Exception e) {
} finally {
bufferCQ.release();
}
//DELAY_FOR_A_WHILE是 100ms
this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
}

因为 messageTimeup 方法使用了原始的 Topic 和 QueueId 新建了消息,所以上面的 syncDeliver 方式是将消息重新投递到原始的队列中,这样消费者可以再次拉取到这条消息进行消费。注意:上面 ConsumeQueue 的 tagsCode 是一个时间点,很容易误解为是 tag 的 hashCode,MessageQueue 的存储元素中最后 8 字节确实是 tag 的 hashCode。

3 总结

消费者消费失败后,会把消费发回给 Broker 进行处理。下图是客户端处理流程:

Broker 收到消息后,会把消息重新发送到 CommitLog,发送到 CommitLog 之前,首先会修改 Topic 为 SCHEDULE_TOPIC_XXXX,这样就发送到了延时队列,延时队列再根据延时级别把消息投递到原始的队列,这样消费者就能再次拉取到。流程如下图:

从流程来看,消费者批量拉取消息,如果部分消息消费失败,那就会整批全部重试。所以做好幂等是必要的。

如果本文对你有帮助,别忘记给我个3连 ,点赞,转发,评论,

咱们下期见!答案获取方式:已赞 已评 已关~

学习更多JAVA知识与技巧,关注与私信博主(03)

阿里二面:RocketMQ 消费失败了,怎么处理?相关推荐

  1. RocketMQ消费失败如何处理?如何保证消费消息的幂等性?

    文章目录 1. 消息消费失败如何处理? 2. 如何保证消费消息的幂等性? 1. 消息消费失败如何处理? 当消费者从Broker获取到消息后会进行消费,并返回消费状态.如下代码所示 //broker推消 ...

  2. RocketMQ消费失败重试机制分析

    现象:mq消费1次,重试3次,然后停止,如下实例展示 首次(reconsumeTimes=0) MQ_CON_MSG gmcf-lsc-zhongbang-repu-calc-from-topic M ...

  3. RocketMQ消息消费源码分析(二消息的消费)

    首先回到DefaultMQPushConsumerImpl  start方法 public synchronized void start() throws MQClientException {sw ...

  4. Spring Cloud Stream消费失败后的处理策略(二):自定义错误处理逻辑

    应用场景  上一篇<Spring Cloud Stream消费失败后的处理策略(一):自动重试>介绍了默认就会生效的消息重试功能.对于一些因环境原因.网络抖动等不稳定因素引发的问题可以起到 ...

  5. 监控RocketMQ消费数据延迟告警发送企业微信

    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 背景 1 RocketMQ介绍 1.1 RocketMQ 特点 1.2 RocketMQ 优势 1.3.RocketMQ环境 ...

  6. 万字图文详解阿里二面分布式十二问

    分布式理论 1. 说说CAP原则? CAP原则又称CAP定理,指的是在一个分布式系统中,Consistency(一致性). Availability(可用性).Partition tolerance( ...

  7. 4万字聊聊阿里二面,能抗多少?

    我是Leo.今天聊一下阿里二面. 友情提示:觉得长收藏的时候请点右上角,底部弹出菜单点收藏.我怕你拉不到底部 聊聊Redis面试题 3万字聊聊什么是Redis(完结篇) 3万字聊聊什么是MySQL(初 ...

  8. rocketmq 消费方式_RocketMQ事务消费和顺序消费详解

    一.RocketMq有3中消息类型 1.普通消费 2. 顺序消费 3.事务消费 顺序消费场景 在网购的时候,我们需要下单,那么下单需要假如有三个顺序,第一.创建订单 ,第二:订单付款,第三:订单完成. ...

  9. RocketMQ 消费端限流

    RocketMQ 消费端限流 RocketMQ 消费端限流 首先我们要明白为什么需要限流?如果不使用限流呢? 通常情况下,当客户端生产的消息很多时,消费者消费消息速度低于生产者消费速度,我们该如何解决 ...

最新文章

  1. The Pediatric Cancer Genome Project   儿童癌症基因组计划
  2. java 必备_Java基础必备
  3. android 5.0 ios 8,iOS 8与Android 5.0大比拼:功能相同 体验不同
  4. Zookeeper的安装与配置
  5. python3.5.3下载安装教程_在Python3.5下安装和测试
  6. Python bytearray/bytes/string区别 - Python零基础入门教程
  7. flowjo软件使用方法_管家婆软件使用方法出库教程,管家婆软件做账流程视频_双全科技...
  8. PostgreSQL的时间/日期函数使用
  9. 三次样条插值matlab,Matlab关于三次样条插值
  10. linux进程管理概念,Linux教程之进程的概念和进程管理命令的使用
  11. vue学习笔记-16-vue的数组方法
  12. 【编译原理 思维导图】 陈火旺第三版 前七章
  13. 方舟代码_源代码丢失的方舟
  14. c#学习笔记---BackgroundWorker 详解
  15. html保护眼睛背景色,保护眼睛的颜色和各种背景颜色设置方法
  16. 【土旦】vue 解决ios H5底部输入框 获取焦点时弹出虚拟键盘挡住输入框 以及监听键盘收起事件...
  17. 今日头条文章量如何打造爆款
  18. 用批处理命令打开控制面板选项
  19. RTX30系列-Ubuntu系统配置与深度学习环境Pytorch配置
  20. elementUI 时间格式化

热门文章

  1. 谷歌 console_使用Google Search Console有效增加网站流量的15条提示
  2. (30)zabbix Trapper 监控项配置
  3. 序列划分c语言,看懂了这些,你对缠论中的线段划分就基本掌握了!
  4. bzoj2565题解
  5. 转:latex 表格紧跟指定的文字后面
  6. layui使用模板渲染数据
  7. rust怎么拆除墙壁指令_腐蚀RUST基本指令及服务器指令大全
  8. php实现aes ecb模式加密,PHP、Python、Java的AES ECB加密实现-Fun言
  9. 【C++模板】类模板的全部特例化和局部特例化(偏特化-partial specialization)
  10. oracle插入数字类型能用单引号括起来为什么