我们都知道,秒杀系统跟商城抢单均有一个共同的明显的特征,即在某个时刻会有成百上千万的请求到达我们的接口,即瞬间这股巨大的流量将涌入我们的系统,我们可以采用下面一图来大致体现这一现象:

当到了“开始秒杀”、“开始抢单”的时刻,此时系统可能会出现这样的几种现象:

  • 应用系统配置承载不了这股瞬间流量,导致系统直接挂掉,即传说中的“宕机”现象;
  • 接口逻辑没有考虑并发情况,数据库读写锁发生冲突,导致最终处理结果跟理论上的结果数据不一致(如商品存库量只有 100,但是高并发情况下,实际表记录的抢到的用户记录数据量却远远大于 100);
  • 应用占据服务器的资源直接飙高,如 CPU、内存、宽带等瞬间直接飙升,导致同库同表甚至可能同 host 的其他服务或者系统出现卡顿或者挂掉的现象;

于是乎,我们需要寻找解决方案!对于目前来讲,网上均有诸多比较不错的解决方案,在此我顺便提一下我们的应用系统采用的常用解决方案,包括:

  • 我们会将处理抢单的整体业务逻辑独立、服务化并做集群部署;
  • 我们会将那股巨大的流量拒在系统的上层,即将其转移至 MQ 而不直接涌入我们的接口,从而减少数据库读写锁冲突的发生以及由于接口逻辑的复杂出现线程堵塞而导致应用占据服务器资源飙升;
  • 我们会将抢单业务所在系统的其他同数据源甚至同表的业务拆分独立出去服务化,并基于某种 RPC 协议走 HTTP 通信进行数据交互、服务通信等等;
  • 采用分布式锁解决同一时间同个手机号、同一时间同个 IP 刷单的现象;

下面,我们用 RabbitMQ 来实战上述的第二点!即我们会在“请求” -> "处理抢单业务的接口" 中间架一层消息中间件做“缓冲”、“缓压”处理,如下图所示:

并发量配置与消息确认机制

正如上面所讲的,对于抢单、秒杀等高并发系统而言,如果我们需要用 RabbitMQ 在 “请求” - “接口” 之间充当限流缓压的角色,那便需要我们对 RabbitMQ 提出更高的要求,即支持高并发的配置,在这里我们需要明确一点,“并发消费者”的配置其实是针对 listener 而言,当配置成功后,我们可以在 MQ 的后端控制台应用看到 consumers 的数量,如下所示:

其中,这个 listener 在这里有 10 个 consumer 实例的配置,每个 consumer 可以预监听消费拉取的消息数量为 5 个(如果同一时间处理不完,会将其缓存在 mq 的客户端等待处理!)

另外,对于某些消息而言,我们有时候需要严格的知道消息是否已经被 consumer 监听消费处理了,即我们有一种消息确认机制来保证我们的消息是否已经真正的被消费处理。在 RabbitMQ 中,消息确认处理机制有三种:Auto - 自动、Manual - 手动、None - 无需确认,而确认机制需要 listener 实现 ChannelAwareMessageListener 接口,并重写其中的确认消费逻辑。在这里我们将用 “手动确认” 的机制来实战用户商城抢单场景。

1.表的结构

分为两张表:1.product (商品表),2.product_robbing_record(商品记录表)

2.采用多线程测试抢单并发 使用普通的mybatis 业务上curd进行抢单测试

创建多线程使用CountDownLatch计数器进行百人疯狂枪单测试

package com.service;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.util.concurrent.CountDownLatch;@Service
public class InitService {private static final Logger log = LoggerFactory.getLogger(InitService.class);public static final int ThreadNum = 100;private static int mobile = 0;@Autowiredprivate CommonMqService commonMqService;//@PostConstructpublic void generateMultiThread(){log.info("开始初始化线程数--->");try{CountDownLatch countDownLatch = new CountDownLatch(1);for(int i =0; i<ThreadNum; i++){new Thread(new RunThread(countDownLatch)).start();}//TODO:启动多个线程countDownLatch.countDown();}catch(Exception e){e.printStackTrace();}}private class RunThread implements Runnable{private final CountDownLatch startLatch;private RunThread(CountDownLatch startLatch) {this.startLatch = startLatch;}@Overridepublic void run() {try{//TODO:线程等待startLatch.await();mobile +=1;concurrencyService.manageRobbing(String.valueOf(mobile));log.info("当前手机号:"+mobile);}catch (Exception e){}}
}
}

对应的业务Service类

package com.service.impl;import com.entity.Product;
import com.entity.ProductRobbingRecord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mapper.ProductMapper;
import com.mapper.ProductRobbingRecordMapper;
import com.service.ConcurrencyService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;@Service
public class ConcurrencyServiceImpl implements ConcurrencyService {private static final Logger log = LoggerFactory.getLogger(ConcurrencyServiceImpl.class);@Autowiredprivate ProductMapper productMapper;@Autowiredprivate ProductRobbingRecordMapper productRobbingRecordMapper;@Autowiredprivate Environment env;@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate ObjectMapper objectMapper;//数据库写死的数据private static final String productNo ="product_123456";@Overridepublic void manageRobbing(String mobile) {try{//通过商品编号查询消息Product product = productMapper.selectByProductNo(productNo);//判断不为空且商品数量必须大于0情况下if(product!=null && product.getTotal() >0){productMapper.updateTotal(productNo);ProductRobbingRecord ev = new ProductRobbingRecord();ev.setMobile(mobile);ev.setProductId(product.getId());productRobbingRecordMapper.insertProductRobbingRecord(ev);}}catch(Exception e){log.error("处理抢单发生异常 mobile={},err={}",mobile,e.fillInStackTrace());}}
}

现在是使用普通业务写法进行测试线程人数为100人,商品单量只有50 进行测试

结果就是抢单出现并发数量,也可以使用sql乐观锁进行并发控制

3.使用消息队列的限流控制抢单并发

3.1.消息模型的配置消息

#rabbitmq
spring.rabbitmq.virtual-host=/
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guestspring.rabbitmq.listener.concurrency=10
spring.rabbitmq.listener.max-concurrency=20
spring.rabbitmq.listener.prefetch=5mq.env=localproduct.robbing.mq.exchang.name=${mq.env}:product:robbing:mq:exchang
product.robbing.mq.routing.key.name=${mq.env}:product:robbing:mq:routing:key
product.robbing.mq.queue.name=${mq.env}:product:robbing:mq:queue

3.2.在RabbitMQconfig中配置确认消费机制以及并发量的配置

package com.config;import com.listener.RobbingListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;@Configuration
public class RabbitmqConfig {private static final Logger log = LoggerFactory.getLogger(RabbitmqConfig.class);@Autowiredprivate Environment env;@Autowiredprivate CachingConnectionFactory connectionFactory;@Autowiredprivate SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;/*** 单一消费者* @return*/@Bean(name = "singleListenerContainer")public SimpleRabbitListenerContainerFactory listenerContainer(){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());factory.setConcurrentConsumers(1);factory.setMaxConcurrentConsumers(1);factory.setPrefetchCount(1);factory.setTxSize(1);factory.setAcknowledgeMode(AcknowledgeMode.AUTO);return factory;}/*** 多个消费者* @return*/@Bean(name = "multiListenerContainer")public SimpleRabbitListenerContainerFactory multiListenerContainer(){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factoryConfigurer.configure(factory,connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());factory.setAcknowledgeMode(AcknowledgeMode.NONE);factory.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.concurrency",int.class));factory.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.max-concurrency",int.class));factory.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.prefetch",int.class));return factory;}//TODO:商品抢单消息模型//direct交换机@Beanpublic DirectExchange robbingExchange(){return new DirectExchange(env.getProperty("product.robbing.mq.exchang.name"),true,false);}//申明队列@Bean(name = "robbingQueue")public Queue robbingQueue(){return new Queue(env.getProperty("product.robbing.mq.queue.name"),true);}//绑定队列与交换机@Beanpublic Binding robbingBinding(){return  BindingBuilder.bind(robbingQueue()).to(robbingExchange()).with(env.getProperty("product.robbing.mq.routing.key.name"));}@Autowiredprivate RobbingListener robbingListener;//置消费队列监听@Beanpublic SimpleMessageListenerContainer listenerContainerRobbing(@Qualifier("robbingQueue") Queue robbingQueue){SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);//TODO: 并发配置//并发消费者container.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.concurrency",int.class));//最大并发用户数container.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.max-concurrency",int.class));//设置预取计数container.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.prefetch",int.class));//TODO:消息确认机制container.setQueues(robbingQueue);container.setMessageListener(robbingListener);container.setAcknowledgeMode(AcknowledgeMode.MANUAL);return container;}}

3.3.RabbitMQ 后端控制台应用查看此队列的并发量配置

3.4.listener 确认消费处理逻辑:在这里我们需要开发抢单的业务逻辑,即“只有当该商品的库存 >0 时,抢单成功,扣减库存量,并将该抢单的用户信息记录入表,异步通知用户抢单成功!

package com.listener;import com.rabbitmq.client.Channel;
import com.service.ConcurrencyService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.List;@Component("robbingListener")
public class RobbingListener implements ChannelAwareBatchMessageListener{private static final Logger log = LoggerFactory.getLogger(RobbingListener.class);@Autowiredprivate ConcurrencyService concurrencyService;@Overridepublic void onMessage(Message message, Channel channel) throws Exception {Long tag = message.getMessageProperties().getDeliveryTag();try{byte[] body = message.getBody();String mobile = new String(body,"UTF-8");//抢单消息业务逻辑concurrencyService.manageRobbing(mobile);//确认消息channel.basicAck(tag,true);}catch (Exception e){log.error("监听的消费消息 无发生异常:{}",e.fillInStackTrace());}}
}

3.5.抢单业务逻辑

package com.service.impl;import com.entity.Product;
import com.entity.ProductRobbingRecord;
import com.mapper.ProductMapper;
import com.mapper.ProductRobbingRecordMapper;
import com.service.ConcurrencyService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class ConcurrencyServiceImpl implements ConcurrencyService {private static final Logger log = LoggerFactory.getLogger(ConcurrencyServiceImpl.class);@Autowiredprivate ProductMapper productMapper;@Autowiredprivate ProductRobbingRecordMapper productRobbingRecordMapper;private static final String productNo ="product_123456";@Overridepublic void manageRobbing(String mobile) {try{Product product = productMapper.selectByProductNo(productNo);if(product!=null && product.getTotal() >0){int result = productMapper.updateTotal(productNo);log.error("result mobile={}",result);if(result>0){ProductRobbingRecord ev = new ProductRobbingRecord();ev.setMobile(mobile);ev.setProductId(product.getId());productRobbingRecordMapper.insertProductRobbingRecord(ev);}}}catch(Exception e){log.error("处理抢单发生异常 mobile={},err={}",mobile,e.fillInStackTrace());}}
}

3.4.采用 CountDownLatch 模拟产生高并发时的多线程请求

采用 CountDownLatch 模拟产生高并发时的多线程请求(或者采用 jmeter 实施压测也可以!),每个请求将携带产生的随机数:充当手机号 -> 充当消息,最终入抢单队列!在这里,我模拟了 50000 个请求,相当于 50000 手机号同一时间发生抢单的请求,而设置的产品库存量为 100,这在 product 数据库表即可设置

package com.service;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.util.concurrent.CountDownLatch;@Service
public class InitService {private static final Logger log = LoggerFactory.getLogger(InitService.class);public static final int ThreadNum = 50000;private static int mobile = 0;@Autowiredprivate ConcurrencyService concurrencyService;@Autowiredprivate CommonMqService commonMqService;//@PostConstructpublic void generateMultiThread(){log.info("开始初始化线程数--->");try{CountDownLatch countDownLatch = new CountDownLatch(1);for(int i =0; i<ThreadNum; i++){new Thread(new RunThread(countDownLatch)).start();}//TODO:启动多个线程countDownLatch.countDown();}catch(Exception e){e.printStackTrace();}}private class RunThread implements Runnable{private final CountDownLatch startLatch;private RunThread(CountDownLatch startLatch) {this.startLatch = startLatch;}@Overridepublic void run() {try{//TODO:线程等待startLatch.await();mobile +=1;//调用队列的生产者发送消息commonMqService.sendRobbingMsg(String.valueOf(mobile));log.info("当前手机号:"+mobile);}catch (Exception e){}}
}
}

3.5.将抢单请求的手机号信息压入队列,等待排队处理

package com.service;import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;/*** 消息发生*/
@Service
public class CommonMqService {private static final Logger log= LoggerFactory.getLogger(CommonMqService.class);@Autowiredprivate Environment env;@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate ObjectMapper objectMapper;/*** 发送枪单消息入队列* @param mobile*/public void sendRobbingMsg(String mobile){try{rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());rabbitTemplate.setExchange(env.getProperty("product.robbing.mq.exchang.name"));rabbitTemplate.setRoutingKey(env.getProperty("product.robbing.mq.routing.key.name"));Message msg=MessageBuilder.withBody(objectMapper.writeValueAsBytes(mobile)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();rabbitTemplate.convertAndSend(msg);}catch (Exception e){log.error("发送抢单消息入队列 发生异常, mobile={}",mobile);}}}

3.6.在最后我们写个 Junit 或者写个 Controller,进行 initService.generateMultiThread(); 调用模拟产生高并发的抢单请求即可

package com.controller;import com.response.BaseResponse;
import com.response.StatusCode;
import com.service.InitService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;@RestController
public class ConcurrencyController {private static final Logger log = LoggerFactory.getLogger(OrderRecordController.class);private static final String Prefix="concurrency";@Autowiredprivate InitService initService;@RequestMapping(value = Prefix+"/robbing/thread",method = RequestMethod.GET)public BaseResponse robbingThread(){BaseResponse response = new BaseResponse(StatusCode.Success);initService.generateMultiThread();return response;}
}

最后,我们当然是跑起来,在控制台我们可以观察到系统不断的在产生新的请求(线程)– 相当于不断的有抢单的手机号涌入我们的系统,然后入队列,listener 监听到请求之后消费处理抢单逻辑!最后我们可以观察两张数据库表:商品库存表、商品成功抢单的用户记录表 - 只有当库存表中商品对应的库存量为 0、商品成功抢单的用户记录刚好 100 时 即表示我们的实战目的以及效果已经达到了!

MQ,在其中可以起到不可磨灭的作用,其字如其名:“消息队列”,而队列具有 “先进先出” 的特点,故而所有进入 MQ 的消息都将 “乖巧” 的在 MQ 上排好队,先来先排队,先来先被处理消费,由此一来至少可以避免 “瞬间时刻一窝蜂的 request 涌入我们的接口” 的情况!

RabbitMQ实战 : 抢单系统并发解决方案相关推荐

  1. 公安联勤指挥调度实战应用系统软件平台解决方案

    公安联勤指挥调度实战应用系统软件平台解决方案 华盛恒辉建设内容及价值 l 基础数据接入:通过接口开发.资源整合,将警用公共资源统一接入警务地图,实现全市一张图的警用公共资源查询.定位.管理可视化. 软 ...

  2. Java 高并发_JAVA并发编程与高并发解决方案 JAVA高并发项目实战课程 没有项目经验的朋友不要错过!...

    JAVA并发编程与高并发解决方案 JAVA高并发项目实战课程 没有项目经验的朋友不要错过! 1.JPG (37.82 KB, 下载次数: 0) 2018-12-3 09:40 上传 2.JPG (28 ...

  3. 【高并发解决方案】5、如何设计一个秒杀系统

    什么是秒杀 秒杀场景一般会在电商网站举行一些活动或者节假日在12306网站上抢票时遇到.对于电商网站中一些稀缺或者特价商品,电商网站一般会在约定时间点对其进行限量销售,因为这些商品的特殊性,会吸引大量 ...

  4. RabbitMQ实战教程

    RabbitMQ实战教程 1.什么是RabbitMQ 1.1 MQ(Message Queue)消息队列 1.1.1 异步处理 1.1.2 应用解耦 1.1.3 流量削峰 1.2 背景知识介绍 1.2 ...

  5. JavaWeb 并发编程 与 高并发解决方案

    在这里写写我学习到和自己所理解的 Java高并发编程和高并发解决方案.现在在各大互联网公司中,随着日益增长的互联网服务需求,高并发处理已经是一个非常常见的问题,在这篇文章里面我们重点讨论两个方面的问题 ...

  6. RabbitMQ实战笔记

    RabbitMQ实战笔记 1 MQ引言 1.1 中间件技术及架构的概述 1.2 什么是MQ 1.3 为什么要用MQ 1.4 MQ的分类 1.5 MQ的选择 2 RabbitMQ 的引言 2.1 Rab ...

  7. 电商项目实战之分布式事务解决方案

    电商项目实战之分布式事务解决方案 本地事务 事务隔离级别 事务传播机制 分布式事务 CAP理论 选举与同步理论 BASE理论 解决方案 2PC模式(XA事务) 柔性事务-TCC事务补偿型方案 柔性事务 ...

  8. php滴滴抢单系统,抢单系统_抢单系统教程_抢单系统视频教程 _课课家

    本套餐将包括两个重磅性的课程与一个赠送学习的课程,分别为SpringBoot实战视频教程与RabbitMQ实战教程跟SSM整合开发之poi导入导出Excel.目的是为了让各位小伙伴可以从零基础一步一个 ...

  9. 《RabbitMQ 实战指南》第五章 RabbitMQ 进阶(下)

    <RabbitMQ 实战指南>第五章 RabbitMQ 进阶(下) 文章目录 <RabbitMQ 实战指南>第五章 RabbitMQ 进阶(下) 一.持久化 二.生产者确认 1 ...

最新文章

  1. 记一下uiscrollView不响应协议的问题
  2. c语言程序连接后扩展名为,C语言程序经过编译、连接后生成的可执行文件的扩展名是...
  3. vc应用CPictureEx类(重载CStatic类)加载gif动画
  4. MyEclipse 2014中 Window--customize perspective 功能 打不开的解决办法
  5. 1056. Mice and Rice (25)
  6. ssh远程连接(ubuntu、windows)
  7. VS C++/ClI调用C++ 外部Dll无法查看变量值
  8. bash 命令提示符_命令行上每天的Bash提示
  9. 小学五年级计算机教学工作总结,小学五年级数学教师工作总结(精选8篇)
  10. 10个最新优秀手机应用界面设计实例
  11. webpack@3.6.0(4) -- 配置模块化开发
  12. arcmap新手教程_ArcGIS入门教程(1)——ArcMap应用基础
  13. 删除有TrustedInstaller权限的文件-亲测有效
  14. html ajax传参数 20,jQuery Ajax传参
  15. 文档管理系统OnlyOffice在线编辑功能
  16. 8.Redis主从复制
  17. jsapi设计_Sketch插件如何架构
  18. 论文的可复现性,能否量化分析?
  19. 数据产品经理类型划分和工作汇报框架
  20. M1W Dock 教程之开发环境配置

热门文章

  1. 2022-2028中国烟酰胺护肤品市场现状研究分析与发展前景预测报告
  2. Excel如何同时查找多个数据
  3. 如何批量处理word中的表格
  4. 蚂蚁金服开源 ——基于 SOFABoot 进行模块化开发
  5. Linux 查看目录下文件数量
  6. Android文件下载与解压
  7. 访问c:\Users\Administrator\Documents\NetSarang\Xshell\buttonlist.ini时磁盘已满
  8. Qtum智能合约使用方法及说明
  9. 小班课3年内将成主流?RoomKit助力机构快速落地小班课业务
  10. python 有关 if __name__ == ‘__main__‘ 的正确理解