使用类似 RabbitMQ、RocketMQ 等 MQ 系统来做消息队列实现异步处理,虽然说消息可 以落地到磁盘保存,即使 MQ 出现问题消息数据也不会丢失,但是异步流程在消息发送、 传输、处理等环节,都可能发生消息丢失。此外,任何 MQ 中间件都无法确保 100% 可 用,需要考虑不可用时异步流程如何继续进行。 因此,对于异步处理流程,必须考虑补偿或者说建立主备双活流程。 我们来看一个用户注册后异步发送欢迎消息的场景。用户注册落数据库的流程为同步流程, 会员服务收到消息后发送欢迎消息的流程为异步流程。

我们来分析一下: 我们来看一下相关的实现代码。 首先,定义 UserController 用于注册 + 发送异步消息。对于注册方法,我们一次性注册 10 个用户,用户注册消息不能发送出去的概率为 50%。 蓝色的线,使用 MQ 进行的异步处理,我们称作主线,可能存在消息丢失的情况(虚线 代表异步调用); 绿色的线,使用补偿 Job 定期进行消息补偿,我们称作备线,用来补偿主线丢失的消 息; 考虑到极端的 MQ 中间件失效的情况,我们要求备线的处理吞吐能力达到主线的能力水 平。

package org.geekbang.time.commonmistakes.asyncprocess.compensation;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.IntStream;

@RestController
@Slf4j
@RequestMapping("user")
public class UserController {
    @Autowired
    private UserService userService;
    @Autowired
    private RabbitTemplate rabbitTemplate;

@GetMapping("register")
    public void register() {
        //模拟10个用户注册
        IntStream.rangeClosed(1, 10).forEach(i -> {
            //落库,存数据库
            User user = userService.register();
            //模拟50%的消息可能发送失败
            if (ThreadLocalRandom.current().nextInt(10) % 2 == 0) {
                //通过 rabiitmq 发送消息
                rabbitTemplate.convertAndSend(RabbitConfiguration.EXCHANGE, RabbitConfiguration.ROUTING_KEY, user);
                log.info("sent mq user {}", user.getId());
            }
        });
    }
}

然后,定义 MemberService 类用于模拟会员服务。会员服务监听用户注册成功的消息, 并发送欢迎短信。我们使用 ConcurrentHashMap 来存放那些发过短信的用户 ID 实现幂 等,避免相同的用户进行补偿时重复发送短信:

package org.geekbang.time.commonmistakes.asyncprocess.compensation;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

@Component
@Slf4j
public class MemberService {
    //记录发送欢迎消息的状态
    private Map<Long, Boolean> welcomeStatus = new ConcurrentHashMap<>();
    //监听用户注册成功的消息,并发送欢迎消息
    @RabbitListener(queues = RabbitConfiguration.QUEUE)
    public void listen(User user) {
        log.info("receive mq user {}", user.getId());
        welcome(user);
    }
    
    //发送欢迎消息
    public void welcome(User user) {
        //消费一条记录,给一个注册成功的用户发送消息
        //putIfAbsent   如果传入key对应的value已经存在,就返回存在的value,不进行替换。如果不存在,就添加key和value,返回null
        //去重操作
        if (welcomeStatus.putIfAbsent(user.getId(), true) == null) {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
            }
            //发送消息给注册客户
            log.info("memberService: welcome new user {}", user.getId());
        }
    }
}

对于 MQ 消费程序,处理逻辑务必考虑去重(支持幂等),原因有几个:

MQ 消息可能会因为中间件本身配置错误、稳定性等原因出现重复。 自动补偿重复,比如本例,同一条消息可能既走 MQ 也走补偿,肯定会出现重复,而且 考虑到高内聚,补偿 Job 本身不会做去重处理。 人工补偿重复。出现消息堆积时,异步处理流程必然会延迟。如果我们提供了通过后台 进行补偿的功能,那么在处理遇到延迟的时候,很可能会先进行人工补偿,过了一段时 间后处理程序又收到消息了,重复处理。我之前就遇到过一次由 MQ 故障引发的事故, MQ 中堆积了几十万条发放资金的消息,导致业务无法及时处理,运营以为程序出错了 就先通过后台进行了人工处理,结果 MQ 系统恢复后消息又被重复处理了一次,造成大 量资金重复发放。

接下来,定义补偿 Job 也就是备线操作。 我们在 CompensationJob 中定义一个 @Scheduled 定时任务,5 秒做一次补偿操作,因 为 Job 并不知道哪些用户注册的消息可能丢失,所以是全量补偿,补偿逻辑是:每 5 秒补 偿一次,按顺序一次补偿 5 个用户,下一次补偿操作从上一次补偿的最后一个用户 ID 开 始;对于补偿任务我们提交到线程池进行“异步”处理,提高处理能力。

package org.geekbang.time.commonmistakes.asyncprocess.compensation;

import jodd.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Component
@Slf4j
public class CompensationJob {
    private static ThreadPoolExecutor compensationThreadPool = new ThreadPoolExecutor(
            10, 10,
            1, TimeUnit.HOURS,
            new ArrayBlockingQueue<>(1000),
            new ThreadFactoryBuilder().setNameFormat("compensation-threadpool-%d").get());
    @Autowired
    private UserService userService;
    @Autowired
    private MemberService memberService;
    private long offset = 0;

//10秒后开始补偿,每5秒补偿一次

@Scheduled(initialDelay = 10_000, fixedRate = 5_000)
    public void compensationJob() {
        log.info("开始从用户ID {} 补偿", offset);
        userService.getUsersAfterIdWithLimit(offset, 5).forEach(user -> {
            compensationThreadPool.execute(() -> memberService.welcome(user));
            offset = user.getId();
        });
    }
}

为了实现高内聚,主线和备线处理消息,最好使用同一个方法。比如,本例中 MemberService 监听到 MQ 消息和 CompensationJob 补偿,调用的都是 welcome 方 法。 此外值得一说的是,Demo 中的补偿逻辑比较简单,生产级的代码应该在以下几个方面进 行加强:

考虑配置补偿的频次、每次处理数量,以及补偿线程池大小等参数为合适的值,以满足 补偿的吞吐量。 考虑备线补偿数据进行适当延迟。比如,对注册时间在 30 秒之前的用户再进行补偿,以 方便和主线 MQ 实时流程错开,避免冲突。 诸如当前补偿到哪个用户的 offset 数据,需要落地数据库。 补偿 Job 本身需要高可用,可以使用类似 XXLJob 或 ElasticJob 等任务系统

运行程序,执行注册方法注册 10 个用户,输出如下
  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::               (v2.4.12)

2021-11-25 15:33:40.155  INFO 11524 --- [           main] o.g.t.c.a.c.CommonMistakesApplication    : Starting CommonMistakesApplication using Java 1.8.0_121 on DESKTOP-AJV2N6C with PID 11524 (D:\2021\LEO\拉勾\demo\target\classes started by STAR in D:\2021\LEO\拉勾\demo)
2021-11-25 15:33:40.159  INFO 11524 --- [           main] o.g.t.c.a.c.CommonMistakesApplication    : No active profile set, falling back to default profiles: default
2021-11-25 15:33:42.616  INFO 11524 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 8080 (http)
2021-11-25 15:33:42.634  INFO 11524 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2021-11-25 15:33:42.634  INFO 11524 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.54]
2021-11-25 15:33:42.860  INFO 11524 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2021-11-25 15:33:42.860  INFO 11524 --- [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 2598 ms
2021-11-25 15:33:44.801  INFO 11524 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2021-11-25 15:33:44.804  INFO 11524 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.125.50.250:5672]
2021-11-25 15:33:44.982  INFO 11524 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#69e308c6:0/SimpleConnection@5d1b9c3d [delegate=amqp://admin@192.125.50.250:5672/, localPort= 1064]
2021-11-25 15:33:45.125  INFO 11524 --- [           main] o.g.t.c.a.c.CommonMistakesApplication    : Started CommonMistakesApplication in 5.634 seconds (JVM running for 6.408)
2021-11-25 15:33:55.122  INFO 11524 --- [   scheduling-1] o.g.t.c.a.compensation.CompensationJob   : 开始从用户ID 0 补偿
2021-11-25 15:34:00.124  INFO 11524 --- [   scheduling-1] o.g.t.c.a.compensation.CompensationJob   : 开始从用户ID 0 补偿
2021-11-25 15:34:05.126  INFO 11524 --- [   scheduling-1] o.g.t.c.a.compensation.CompensationJob   : 开始从用户ID 0 补偿
2021-11-25 15:34:10.115  INFO 11524 --- [   scheduling-1] o.g.t.c.a.compensation.CompensationJob   : 开始从用户ID 0 补偿
2021-11-25 15:34:11.629  INFO 11524 --- [nio-8080-exec-1] o.apache.tomcat.util.http.parser.Cookie  : A cookie header was received ["supportFilterViews\":\"1\",\"utilityId\":\"1\",\"regionId\":\"\",\"utilityLocale\":{\"currencyDefaultDigit\":1,\"currencyPrefix\":\"\",\"currencySuffix\":\"Tk\",\"dateFormat\":\"MM/dd/yyyy\",\"id\":\"\",\"layoutDirection\":0,\"numberDecimalSeparator\":\".\",\"numberGroupingSeparator\":\",\",\"numberGroupingSize\":3,\"numberSecondaryGroupingSize\":3,\"softwareVendor\":\"\",\"timeFormat\":\"HH:mm:ss\",\"timeZoneOffset\":0,\"utilityDisplayName\":\"DPDC\",\"utilityId\":1,\"utilityLogo\":\"\"},\"currencySuffix\":\"Tk\",\"userNo\":\"ami\",\"deptId\":\"-9999\",\"currencyPrefix\":null,\"userName\":\"ami\",\"userId\":\"1\",\"deptNo\":\"-9999\"}"; theme=blue; StiMobileDesignerDictionarySettings=%7B%22createFieldOnDoubleClick%22%3Afalse%2C%22createLabel%22%3Afalse%2C%22useAliases%22%3Afalse%7D; sessionStatus=; moduleNo=sysMenu.subSystem.vending; systemId1=2; permNo=sysMenu.vending.integratedQuery; recentVisitedPages=[%22/hes/pDatPurchaseRcdController/list___Integrated%20Query___%22]; StimulsoftMobileDesignerLastTabOnPropertiesPanel=Properties] that contained an invalid cookie. That cookie will be ignored.
 Note: further occurrences of this error will be logged at DEBUG level.
2021-11-25 15:34:11.645  INFO 11524 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2021-11-25 15:34:11.646  INFO 11524 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2021-11-25 15:34:11.647  INFO 11524 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 1 ms
2021-11-25 15:34:11.704  INFO 11524 --- [nio-8080-exec-1] o.g.t.c.a.compensation.UserController    : sent mq user 1
2021-11-25 15:34:11.705  INFO 11524 --- [nio-8080-exec-1] o.g.t.c.a.compensation.UserController    : sent mq user 2
2021-11-25 15:34:11.705  INFO 11524 --- [nio-8080-exec-1] o.g.t.c.a.compensation.UserController    : sent mq user 4
2021-11-25 15:34:11.706  INFO 11524 --- [nio-8080-exec-1] o.g.t.c.a.compensation.UserController    : sent mq user 5
2021-11-25 15:34:11.706  INFO 11524 --- [nio-8080-exec-1] o.g.t.c.a.compensation.UserController    : sent mq user 6
2021-11-25 15:34:11.707  INFO 11524 --- [nio-8080-exec-1] o.g.t.c.a.compensation.UserController    : sent mq user 7
2021-11-25 15:34:11.707  INFO 11524 --- [nio-8080-exec-1] o.g.t.c.a.compensation.UserController    : sent mq user 8
2021-11-25 15:34:11.724  INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService     : receive mq user 1
2021-11-25 15:34:13.734  INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService     : memberService: welcome new user 1
2021-11-25 15:34:13.735  INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService     : receive mq user 2
2021-11-25 15:34:15.122  INFO 11524 --- [   scheduling-1] o.g.t.c.a.compensation.CompensationJob   : 开始从用户ID 0 补偿
2021-11-25 15:34:15.748  INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService     : memberService: welcome new user 2
2021-11-25 15:34:15.749  INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService     : receive mq user 4
2021-11-25 15:34:15.749  INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService     : receive mq user 5
2021-11-25 15:34:15.750  INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService     : receive mq user 6
2021-11-25 15:34:17.138  INFO 11524 --- [on-threadpool-3] o.g.t.c.a.compensation.MemberService     : memberService: welcome new user 4
2021-11-25 15:34:17.138  INFO 11524 --- [on-threadpool-2] o.g.t.c.a.compensation.MemberService     : memberService: welcome new user 3
2021-11-25 15:34:17.138  INFO 11524 --- [on-threadpool-4] o.g.t.c.a.compensation.MemberService     : memberService: welcome new user 5
2021-11-25 15:34:17.752  INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService     : memberService: welcome new user 6
2021-11-25 15:34:17.753  INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService     : receive mq user 7
2021-11-25 15:34:19.767  INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService     : memberService: welcome new user 7
2021-11-25 15:34:19.768  INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService     : receive mq user 8
2021-11-25 15:34:20.124  INFO 11524 --- [   scheduling-1] o.g.t.c.a.compensation.CompensationJob   : 开始从用户ID 5 补偿
2021-11-25 15:34:21.776  INFO 11524 --- [ntContainer#0-1] o.g.t.c.a.compensation.MemberService     : memberService: welcome new user 8
2021-11-25 15:34:22.134  INFO 11524 --- [on-threadpool-9] o.g.t.c.a.compensation.MemberService     : memberService: welcome new user 9
2021-11-25 15:34:25.129  INFO 11524 --- [   scheduling-1] o.g.t.c.a.compensation.CompensationJob   : 开始从用户ID 9 补偿
2021-11-25 15:34:27.137  INFO 11524 --- [on-threadpool-1] o.g.t.c.a.compensation.MemberService     : memberService: welcome new user 10
2021-11-25 15:34:30.129  INFO 11524 --- [   scheduling-1] o.g.t.c.a.compensation.CompensationJob   : 开始从用户ID 10 补偿
2021-11-25 15:34:35.121  INFO 11524 --- [   scheduling-1] o.g.t.c.a.compensation.CompensationJob   : 开始从用户ID 10 补偿
2021-11-25 15:34:40.125  INFO 11524 --- [   scheduling-1] o.g.t.c.a.compensation.CompensationJob   : 开始从用户ID 10 补偿

可以看到:

总共 10 个用户,MQ 发送成功的用户有四个,分别是用户 1、5、7、8。 补偿任务第一次运行,补偿了用户 2、3、4,第二次运行补偿了用户 6、9,第三次运行 补充了用户 10。

最后提一下,针对消息的补偿闭环处理的最高标准是,能够达到补偿全量数据的吞吐量。也 就是说,如果补偿备线足够完善,即使直接把 MQ 停机,虽然会略微影响处理的及时性, 但至少确保流程都能正常执行。

异步处理需要消息补偿闭环相关推荐

  1. 分布式事务之消息补偿解决方案

    分布式事务之消息补偿解决方案 参考文章: (1)分布式事务之消息补偿解决方案 (2)https://www.cnblogs.com/lanxiaoke/p/8321657.html 备忘一下.

  2. WebAPi的可视化输出模式(RabbitMQ、消息补偿相关)——所有webapi似乎都缺失的一个功能...

    最近的工作我在做一个有关于消息发送和接受封装工作.大概流程是这样的,消息中间件是采用rabbitmq,为了保证消息的绝对无丢失,我们需要在发送和接受前对消息进行DB落地.在发送前我会先进行DB的插入, ...

  3. WebAPi的可视化输出模式(RabbitMQ、消息补偿相关)——所有webapi似乎都缺失的一个功能

    最近的工作我在做一个有关于消息发送和接受封装工作.大概流程是这样的,消息中间件是采用rabbitmq,为了保证消息的绝对无丢失,我们需要在发送和接受前对消息进行DB落地.在发送前我会先进行DB的插入, ...

  4. 什么是消息补偿机制?

    为什么还要消息补偿机制呢? 难道消息还会丢失,没错,系统是在一个复杂的环境,不要想的太简单了,虽然以上的三种方案,基本可以保证消息的高可用不丢失的问题,但是作为有追求的程序员来讲,要绝对保证我的系统的 ...

  5. 消息的同步发送,异步发送以及消息发送的可靠性

    最近写的一个通信框架中有两种最基本的消息发送方式:同步发送和异步发送. 同步方式: 消息的发送方发A送一条消息到接收端B,B收到消息之后需要对消息进行处理,然后发送ACK确认消息回A,A收到B的ACK ...

  6. RabbitMQ的消息补偿机制

    目录 前言: 常见问题及解决思路 一.消息防丢方案 二.消息防堆积方案 三.消息发送失败补偿方案 3.1 消息发送失败处理方案 3.2 消息发送失败补偿方案 3.3 confirm方案对比 四. 消息 ...

  7. Java教程:RabbitMq如何开启发布手动确认模式,采用及时或异步方式确定消息是否发送到队列

    引言: 自我们安装好rabbitmq之时,系统默认生产者与消费者发布消费机制为自动模式,也就是说无需我们知道是否成功,即发布方法调取之后,消费者无异常后,整个流程完毕,但由于我们业务当中,由于网络波动 ...

  8. swoole php input,介绍swoole异步群发模板消息

    1.用的是TP5.1的框架,swoole分成一个客户端发送接收消息,一个服务器负责处理信息 服务端代码,服务器要先安装swoole拓展,用 php server.php 启动进程监听 推荐(免费):s ...

  9. rabbitmq消息队列 ack机制(消息确认机制)和消息补偿机制

    参考:https://blog.csdn.net/pan_junbiao/article/details/112956537 ack 机制就是消息在 生产者在发布消息以后,消息存在内存中,如果消息被确 ...

最新文章

  1. Window Server 2008 R2 安装 Share Point 2013
  2. 「NOI2017」泳池
  3. 导航选中后标记的样式实现滑动效果
  4. Java的包裹wrap
  5. 启动oracle00119,oracle启动报ORA-00119错误
  6. 概率假设密度滤波 matlab,高斯混合概率假设密度滤波器
  7. g11 android 4.4,HTC G11 Incredible S 稳定流畅Android4.0.4华丽体验Sense4.1 省电耐用
  8. AI时代,你的职业会是?99%的人都无法直面!
  9. C语言获取某个分割符之前的内容
  10. poll/epoll/keyed-poll/keyed-epoll的唤醒--分层次的解决方案
  11. Python使用类来创建对象
  12. spring boot使用Jedis整合Redis
  13. ORACLE PL/SQL编程之八: 把触发器说透 |来自cnblogs的EricHu|
  14. jQuery判断Dom对象是否存在
  15. ios html清除缓存图片,iOS 清理文件缓存(示例代码)
  16. C语言编程齿轮轮廓线坐标,C语言程序实现齿轮基本参数几何尺寸计算
  17. 《产品经理面试攻略》PART 3:准备好作品
  18. AcrelEMS-IDC综合能效管理系统在某数据中心的应用
  19. Sourcemap是什么?Sourcemap的作用及用法概括
  20. 在IPCAM上实现RTSP协议直播-live555

热门文章

  1. ker矩阵是什么意思_矩阵形式下的最小二乘法推导
  2. 四位“计算机之父”之争
  3. 显色指数(CRI)计算软件分享
  4. unity3d 怎么生成网页版_Unity3D 基础教程3D网页游戏场景打包与加载
  5. javascript设计模式-模块模式(module pattern)
  6. python 图表_测试多图表展示Python版
  7. ev3 java编程_使用C语言开发EV3程序(原创,转载请注明出处)
  8. 《机器学习实战》学习笔记(三):决策树
  9. GObject对象系统
  10. 用dom4j解析xml错误-Content is not allowed in prolog前言中不允许有内容