异步处理需要消息补偿闭环
使用类似 RabbitMQ、RocketMQ 等 MQ 系统来做消息队列实现异步处理,虽然说消息可 以落地到磁盘保存,即使 MQ 出现问题消息数据也不会丢失,但是异步流程在消息发送、 传输、处理等环节,都可能发生消息丢失。此外,任何 MQ 中间件都无法确保 100% 可 用,需要考虑不可用时异步流程如何继续进行。 因此,对于异步处理流程,必须考虑补偿或者说建立主备双活流程。 我们来看一个用户注册后异步发送欢迎消息的场景。用户注册落数据库的流程为同步流程, 会员服务收到消息后发送欢迎消息的流程为异步流程。
我们来分析一下: 我们来看一下相关的实现代码。 首先,定义 UserController 用于注册 + 发送异步消息。对于注册方法,我们一次性注册 10 个用户,用户注册消息不能发送出去的概率为 50%。 蓝色的线,使用 MQ 进行的异步处理,我们称作主线,可能存在消息丢失的情况(虚线 代表异步调用); 绿色的线,使用补偿 Job 定期进行消息补偿,我们称作备线,用来补偿主线丢失的消 息; 考虑到极端的 MQ 中间件失效的情况,我们要求备线的处理吞吐能力达到主线的能力水 平。
package org.geekbang.time.commonmistakes.asyncprocess.compensation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.IntStream;
@RestController
@Slf4j
@RequestMapping("user")
public class UserController {
@Autowired
private UserService userService;
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("register")
public void register() {
//模拟10个用户注册
IntStream.rangeClosed(1, 10).forEach(i -> {
//落库,存数据库
User user = userService.register();
//模拟50%的消息可能发送失败
if (ThreadLocalRandom.current().nextInt(10) % 2 == 0) {
//通过 rabiitmq 发送消息
rabbitTemplate.convertAndSend(RabbitConfiguration.EXCHANGE, RabbitConfiguration.ROUTING_KEY, user);
log.info("sent mq user {}", user.getId());
}
});
}
}
然后,定义 MemberService 类用于模拟会员服务。会员服务监听用户注册成功的消息, 并发送欢迎短信。我们使用 ConcurrentHashMap 来存放那些发过短信的用户 ID 实现幂 等,避免相同的用户进行补偿时重复发送短信:
package org.geekbang.time.commonmistakes.asyncprocess.compensation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class MemberService {
//记录发送欢迎消息的状态
private Map<Long, Boolean> welcomeStatus = new ConcurrentHashMap<>();
//监听用户注册成功的消息,并发送欢迎消息
@RabbitListener(queues = RabbitConfiguration.QUEUE)
public void listen(User user) {
log.info("receive mq user {}", user.getId());
welcome(user);
}
//发送欢迎消息
public void welcome(User user) {
//消费一条记录,给一个注册成功的用户发送消息
//putIfAbsent 如果传入key对应的value已经存在,就返回存在的value,不进行替换。如果不存在,就添加key和value,返回null
//去重操作
if (welcomeStatus.putIfAbsent(user.getId(), true) == null) {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
}
//发送消息给注册客户
log.info("memberService: welcome new user {}", user.getId());
}
}
}
对于 MQ 消费程序,处理逻辑务必考虑去重(支持幂等),原因有几个:
MQ 消息可能会因为中间件本身配置错误、稳定性等原因出现重复。 自动补偿重复,比如本例,同一条消息可能既走 MQ 也走补偿,肯定会出现重复,而且 考虑到高内聚,补偿 Job 本身不会做去重处理。 人工补偿重复。出现消息堆积时,异步处理流程必然会延迟。如果我们提供了通过后台 进行补偿的功能,那么在处理遇到延迟的时候,很可能会先进行人工补偿,过了一段时 间后处理程序又收到消息了,重复处理。我之前就遇到过一次由 MQ 故障引发的事故, MQ 中堆积了几十万条发放资金的消息,导致业务无法及时处理,运营以为程序出错了 就先通过后台进行了人工处理,结果 MQ 系统恢复后消息又被重复处理了一次,造成大 量资金重复发放。
接下来,定义补偿 Job 也就是备线操作。 我们在 CompensationJob 中定义一个 @Scheduled 定时任务,5 秒做一次补偿操作,因 为 Job 并不知道哪些用户注册的消息可能丢失,所以是全量补偿,补偿逻辑是:每 5 秒补 偿一次,按顺序一次补偿 5 个用户,下一次补偿操作从上一次补偿的最后一个用户 ID 开 始;对于补偿任务我们提交到线程池进行“异步”处理,提高处理能力。
package org.geekbang.time.commonmistakes.asyncprocess.compensation;
import jodd.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class CompensationJob {
private static ThreadPoolExecutor compensationThreadPool = new ThreadPoolExecutor(
10, 10,
1, TimeUnit.HOURS,
new ArrayBlockingQueue<>(1000),
new ThreadFactoryBuilder().setNameFormat("compensation-threadpool-%d").get());
@Autowired
private UserService userService;
@Autowired
private MemberService memberService;
private long offset = 0;
//10秒后开始补偿,每5秒补偿一次
@Scheduled(initialDelay = 10_000, fixedRate = 5_000)
public void compensationJob() {
log.info("开始从用户ID {} 补偿", offset);
userService.getUsersAfterIdWithLimit(offset, 5).forEach(user -> {
compensationThreadPool.execute(() -> memberService.welcome(user));
offset = user.getId();
});
}
}
为了实现高内聚,主线和备线处理消息,最好使用同一个方法。比如,本例中 MemberService 监听到 MQ 消息和 CompensationJob 补偿,调用的都是 welcome 方 法。 此外值得一说的是,Demo 中的补偿逻辑比较简单,生产级的代码应该在以下几个方面进 行加强:
考虑配置补偿的频次、每次处理数量,以及补偿线程池大小等参数为合适的值,以满足 补偿的吞吐量。 考虑备线补偿数据进行适当延迟。比如,对注册时间在 30 秒之前的用户再进行补偿,以 方便和主线 MQ 实时流程错开,避免冲突。 诸如当前补偿到哪个用户的 offset 数据,需要落地数据库。 补偿 Job 本身需要高可用,可以使用类似 XXLJob 或 ElasticJob 等任务系统
运行程序,执行注册方法注册 10 个用户,输出如下
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.4.12)
2021-11-25 15:33:40.155 INFO 11524 --- [ main] o.g.t.c.a.c.CommonMistakesApplication : Starting CommonMistakesApplication using Java 1.8.0_121 on DESKTOP-AJV2N6C with PID 11524 (D:\2021\LEO\拉勾\demo\target\classes started by STAR in D:\2021\LEO\拉勾\demo)
2021-11-25 15:33:40.159 INFO 11524 --- [ main] o.g.t.c.a.c.CommonMistakesApplication : No active profile set, falling back to default profiles: default
2021-11-25 15:33:42.616 INFO 11524 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8080 (http)
2021-11-25 15:33:42.634 INFO 11524 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2021-11-25 15:33:42.634 INFO 11524 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.54]
2021-11-25 15:33:42.860 INFO 11524 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2021-11-25 15:33:42.860 INFO 11524 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 2598 ms
2021-11-25 15:33:44.801 INFO 11524 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2021-11-25 15:33:44.804 INFO 11524 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.125.50.250:5672]
2021-11-25 15:33:44.982 INFO 11524 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#69e308c6:0/SimpleConnection@5d1b9c3d [delegate=amqp://admin@192.125.50.250:5672/, localPort= 1064]
2021-11-25 15:33:45.125 INFO 11524 --- [ main] o.g.t.c.a.c.CommonMistakesApplication : Started CommonMistakesApplication in 5.634 seconds (JVM running for 6.408)
2021-11-25 15:33:55.122 INFO 11524 --- [ scheduling-1] o.g.t.c.a.compensation.CompensationJob : 开始从用户ID 0 补偿
2021-11-25 15:34:00.124 INFO 11524 --- [ scheduling-1] o.g.t.c.a.compensation.CompensationJob : 开始从用户ID 0 补偿
2021-11-25 15:34:05.126 INFO 11524 --- [ scheduling-1] o.g.t.c.a.compensation.CompensationJob : 开始从用户ID 0 补偿
2021-11-25 15:34:10.115 INFO 11524 --- [ scheduling-1] o.g.t.c.a.compensation.CompensationJob : 开始从用户ID 0 补偿
2021-11-25 15:34:11.629 INFO 11524 --- [nio-8080-exec-1] o.apache.tomcat.util.http.parser.Cookie : A cookie header was received ["supportFilterViews\":\"1\",\"utilityId\":\"1\",\"regionId\":\"\",\"utilityLocale\":{\"currencyDefaultDigit\":1,\"currencyPrefix\":\"\",\"currencySuffix\":\"Tk\",\"dateFormat\":\"MM/dd/yyyy\",\"id\":\"\",\"layoutDirection\":0,\"numberDecimalSeparator\":\".\",\"numberGroupingSeparator\":\",\",\"numberGroupingSize\":3,\"numberSecondaryGroupingSize\":3,\"softwareVendor\":\"\",\"timeFormat\":\"HH:mm:ss\",\"timeZoneOffset\":0,\"utilityDisplayName\":\"DPDC\",\"utilityId\":1,\"utilityLogo\":\"\"},\"currencySuffix\":\"Tk\",\"userNo\":\"ami\",\"deptId\":\"-9999\",\"currencyPrefix\":null,\"userName\":\"ami\",\"userId\":\"1\",\"deptNo\":\"-9999\"}"; theme=blue; StiMobileDesignerDictionarySettings=%7B%22createFieldOnDoubleClick%22%3Afalse%2C%22createLabel%22%3Afalse%2C%22useAliases%22%3Afalse%7D; sessionStatus=; moduleNo=sysMenu.subSystem.vending; systemId1=2; permNo=sysMenu.vending.integratedQuery; recentVisitedPages=[%22/hes/pDatPurchaseRcdController/list___Integrated%20Query___%22]; StimulsoftMobileDesignerLastTabOnPropertiesPanel=Properties] that contained an invalid cookie. That cookie will be ignored.
Note: further occurrences of this error will be logged at DEBUG level.
2021-11-25 15:34:11.645 INFO 11524 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2021-11-25 15:34:11.646 INFO 11524 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2021-11-25 15:34:11.647 INFO 11524 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 1 ms
2021-11-25 15:34:11.704 INFO 11524 --- [nio-8080-exec-1] o.g.t.c.a.compensation.UserController : sent mq user 1
2021-11-25 15:34:11.705 INFO 11524 --- [nio-8080-exec-1] o.g.t.c.a.compensation.UserController : sent mq user 2
2021-11-25 15:34:11.705 INFO 11524 --- [nio-8080-exec-1] o.g.t.c.a.compensation.UserController : sent mq user 4
2021-11-25 15:34:11.706 INFO 11524 --- [nio-8080-exec-1] o.g.t.c.a.compensation.UserController : sent mq user 5
2021-11-25 15:34:11.706 INFO 11524 --- [nio-8080-exec-1] o.g.t.c.a.compensation.UserController : sent mq user 6
2021-11-25 15:34:11.707 INFO 11524 --- [nio-8080-exec-1] o.g.t.c.a.compensation.UserController : sent mq user 7
2021-11-25 15:34:11.707 INFO 11524 --- [nio-8080-exec-1] o.g.t.c.a.compensation.UserController : sent mq user 8
2021-11-25 15:34:11.724 INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService : receive mq user 1
2021-11-25 15:34:13.734 INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService : memberService: welcome new user 1
2021-11-25 15:34:13.735 INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService : receive mq user 2
2021-11-25 15:34:15.122 INFO 11524 --- [ scheduling-1] o.g.t.c.a.compensation.CompensationJob : 开始从用户ID 0 补偿
2021-11-25 15:34:15.748 INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService : memberService: welcome new user 2
2021-11-25 15:34:15.749 INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService : receive mq user 4
2021-11-25 15:34:15.749 INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService : receive mq user 5
2021-11-25 15:34:15.750 INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService : receive mq user 6
2021-11-25 15:34:17.138 INFO 11524 --- [on-threadpool-3] o.g.t.c.a.compensation.MemberService : memberService: welcome new user 4
2021-11-25 15:34:17.138 INFO 11524 --- [on-threadpool-2] o.g.t.c.a.compensation.MemberService : memberService: welcome new user 3
2021-11-25 15:34:17.138 INFO 11524 --- [on-threadpool-4] o.g.t.c.a.compensation.MemberService : memberService: welcome new user 5
2021-11-25 15:34:17.752 INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService : memberService: welcome new user 6
2021-11-25 15:34:17.753 INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService : receive mq user 7
2021-11-25 15:34:19.767 INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService : memberService: welcome new user 7
2021-11-25 15:34:19.768 INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService : receive mq user 8
2021-11-25 15:34:20.124 INFO 11524 --- [ scheduling-1] o.g.t.c.a.compensation.CompensationJob : 开始从用户ID 5 补偿
2021-11-25 15:34:21.776 INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService : memberService: welcome new user 8
2021-11-25 15:34:22.134 INFO 11524 --- [on-threadpool-9] o.g.t.c.a.compensation.MemberService : memberService: welcome new user 9
2021-11-25 15:34:25.129 INFO 11524 --- [ scheduling-1] o.g.t.c.a.compensation.CompensationJob : 开始从用户ID 9 补偿
2021-11-25 15:34:27.137 INFO 11524 --- [on-threadpool-1] o.g.t.c.a.compensation.MemberService : memberService: welcome new user 10
2021-11-25 15:34:30.129 INFO 11524 --- [ scheduling-1] o.g.t.c.a.compensation.CompensationJob : 开始从用户ID 10 补偿
2021-11-25 15:34:35.121 INFO 11524 --- [ scheduling-1] o.g.t.c.a.compensation.CompensationJob : 开始从用户ID 10 补偿
2021-11-25 15:34:40.125 INFO 11524 --- [ scheduling-1] o.g.t.c.a.compensation.CompensationJob : 开始从用户ID 10 补偿
可以看到:
总共 10 个用户,MQ 发送成功的用户有四个,分别是用户 1、5、7、8。 补偿任务第一次运行,补偿了用户 2、3、4,第二次运行补偿了用户 6、9,第三次运行 补充了用户 10。
最后提一下,针对消息的补偿闭环处理的最高标准是,能够达到补偿全量数据的吞吐量。也 就是说,如果补偿备线足够完善,即使直接把 MQ 停机,虽然会略微影响处理的及时性, 但至少确保流程都能正常执行。
异步处理需要消息补偿闭环相关推荐
- 分布式事务之消息补偿解决方案
分布式事务之消息补偿解决方案 参考文章: (1)分布式事务之消息补偿解决方案 (2)https://www.cnblogs.com/lanxiaoke/p/8321657.html 备忘一下.
- WebAPi的可视化输出模式(RabbitMQ、消息补偿相关)——所有webapi似乎都缺失的一个功能...
最近的工作我在做一个有关于消息发送和接受封装工作.大概流程是这样的,消息中间件是采用rabbitmq,为了保证消息的绝对无丢失,我们需要在发送和接受前对消息进行DB落地.在发送前我会先进行DB的插入, ...
- WebAPi的可视化输出模式(RabbitMQ、消息补偿相关)——所有webapi似乎都缺失的一个功能
最近的工作我在做一个有关于消息发送和接受封装工作.大概流程是这样的,消息中间件是采用rabbitmq,为了保证消息的绝对无丢失,我们需要在发送和接受前对消息进行DB落地.在发送前我会先进行DB的插入, ...
- 什么是消息补偿机制?
为什么还要消息补偿机制呢? 难道消息还会丢失,没错,系统是在一个复杂的环境,不要想的太简单了,虽然以上的三种方案,基本可以保证消息的高可用不丢失的问题,但是作为有追求的程序员来讲,要绝对保证我的系统的 ...
- 消息的同步发送,异步发送以及消息发送的可靠性
最近写的一个通信框架中有两种最基本的消息发送方式:同步发送和异步发送. 同步方式: 消息的发送方发A送一条消息到接收端B,B收到消息之后需要对消息进行处理,然后发送ACK确认消息回A,A收到B的ACK ...
- RabbitMQ的消息补偿机制
目录 前言: 常见问题及解决思路 一.消息防丢方案 二.消息防堆积方案 三.消息发送失败补偿方案 3.1 消息发送失败处理方案 3.2 消息发送失败补偿方案 3.3 confirm方案对比 四. 消息 ...
- Java教程:RabbitMq如何开启发布手动确认模式,采用及时或异步方式确定消息是否发送到队列
引言: 自我们安装好rabbitmq之时,系统默认生产者与消费者发布消费机制为自动模式,也就是说无需我们知道是否成功,即发布方法调取之后,消费者无异常后,整个流程完毕,但由于我们业务当中,由于网络波动 ...
- swoole php input,介绍swoole异步群发模板消息
1.用的是TP5.1的框架,swoole分成一个客户端发送接收消息,一个服务器负责处理信息 服务端代码,服务器要先安装swoole拓展,用 php server.php 启动进程监听 推荐(免费):s ...
- rabbitmq消息队列 ack机制(消息确认机制)和消息补偿机制
参考:https://blog.csdn.net/pan_junbiao/article/details/112956537 ack 机制就是消息在 生产者在发布消息以后,消息存在内存中,如果消息被确 ...
最新文章
- Window Server 2008 R2 安装 Share Point 2013
- 「NOI2017」泳池
- 导航选中后标记的样式实现滑动效果
- Java的包裹wrap
- 启动oracle00119,oracle启动报ORA-00119错误
- 概率假设密度滤波 matlab,高斯混合概率假设密度滤波器
- g11 android 4.4,HTC G11 Incredible S 稳定流畅Android4.0.4华丽体验Sense4.1 省电耐用
- AI时代,你的职业会是?99%的人都无法直面!
- C语言获取某个分割符之前的内容
- poll/epoll/keyed-poll/keyed-epoll的唤醒--分层次的解决方案
- Python使用类来创建对象
- spring boot使用Jedis整合Redis
- ORACLE PL/SQL编程之八: 把触发器说透 |来自cnblogs的EricHu|
- jQuery判断Dom对象是否存在
- ios html清除缓存图片,iOS 清理文件缓存(示例代码)
- C语言编程齿轮轮廓线坐标,C语言程序实现齿轮基本参数几何尺寸计算
- 《产品经理面试攻略》PART 3:准备好作品
- AcrelEMS-IDC综合能效管理系统在某数据中心的应用
- Sourcemap是什么?Sourcemap的作用及用法概括
- 在IPCAM上实现RTSP协议直播-live555
热门文章
- ker矩阵是什么意思_矩阵形式下的最小二乘法推导
- 四位“计算机之父”之争
- 显色指数(CRI)计算软件分享
- unity3d 怎么生成网页版_Unity3D 基础教程3D网页游戏场景打包与加载
- javascript设计模式-模块模式(module pattern)
- python 图表_测试多图表展示Python版
- ev3 java编程_使用C语言开发EV3程序(原创,转载请注明出处)
- 《机器学习实战》学习笔记(三):决策树
- GObject对象系统
- 用dom4j解析xml错误-Content is not allowed in prolog前言中不允许有内容