导读

网站高可用指的就是:在绝大多的时间里,网站一直处于可以对外提供服务的正常状态。

一般以“年”为单位来统计,“9”的个数越多,代表一年中允许的不可用时间就越短,相应的可用等级也就越高。比如,业界常见的网站若能做到 4 个“9”,即:年可用时间达到99.99%,换算下来就是一年内只有 53 分钟的时间网站是处于不可用状态,就已经是算是非常优秀了。

限流,降级和熔断是应对互联网高并发场景的方案之一,也是尝尝被问道的部分,今天单独讲一下【限流】的算法和实现。


正文

一、为什么要限流?

限流在很多场景中用来限制并发和请求量,比如说秒杀抢购,保护自身系统和下游系统不被巨型流量冲垮等。

以微博为例,例如:某明星被爆出了八卦,瞬时访问量从平时的50万增加到了500万,系统的规划能力最多可以支撑200万访问,那么就要执行限流规则,保证网站是一个可用的状态,不至于服务器崩溃,所有请求不可用。

有人可能会追问:既然存在并发500万的可能,为什么不把系统做到支撑500万?

根据“二八原则”解释,系统性能80%时间都是冗余状态,只有20%的时间处于短缺状态。出于成本考虑,既然有其他方案能解决(优化)高并发场景,属实没有必要为了浪费过多的成本。说白了,省钱就是“限流,降级和熔断”思路解决高并发场景的意义。


二、限流的算法

限流算法很多,常见的有三类,分别是:计数器算法、漏桶算法、令牌桶算法,下面逐一讲解。

  • 计数器:在一段时间间隔内(时间窗/时间区间),处理请求的最大数量固定,超过部分不做处理。
  • 漏桶:漏桶大小固定,处理速度固定,但请求进入速度不固定(在突发情况请求过多时,会丢弃过多的请求)。
  • 令牌桶:令牌桶的大小固定,令牌的产生速度固定,但是消耗令牌(即请求)速度不固定(可以应对一些某些时间请求过多的情况);每个请求都会从令牌桶中取出令牌,如果没有令牌则丢弃该次请求。

2.1 计数器算法

  • 算法定义:

在一段时间间隔内(时间窗/时间区间),处理请求的最大数量固定,超过部分不做处理。

计数器算法是限流算法里最简单也是最容易实现的一种算法。简单粗暴,比如,指定线程池大小,指定数据库连接池大小、nginx连接数等,这都属于计数器算法。

举个例子:我们规定对于A接口,1分钟的访问次数不能超过100次,超过的请求丢弃(丢弃属于策略的一种),那么我们可以这么做:

  • 开始的时候,我们可以设置一个计数器counter,每当一个请求过来的时候,counter就加1;

  • 如果counter的值大于100并且该请求与第一个请求的间隔时间还在1分钟之内,那么说明请求数过多,拒绝访问,执行策略处理(等待,丢弃,抛异常...);

  • 如果该请求与第一个请求的间隔时间大于1分钟,且counter的值还在限流范围内,那么就重置 counter,就是这么简单粗暴。

  • 算法实现:
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;// 计速器 限速
@Slf4j
public class CounterLimiter {// 起始时间private static long startTime = System.currentTimeMillis();// 时间区间的时间间隔 msprivate static long interval = 1000;// 每秒限制数量private static long maxCount = 2;// 累加器private static AtomicLong accumulator = new AtomicLong();// 计数判断, 是否超出限制private static long tryAcquire(long taskId, int turn) {long nowTime = System.currentTimeMillis();// 在时间区间之内if (nowTime < startTime + interval) {long count = accumulator.incrementAndGet();if (count <= maxCount) {return count;} else {return -count;}} else {//在时间区间之外synchronized (CounterLimiter.class) {log.info("新时间区到了,taskId{}, turn {}..", taskId, turn);// 再一次判断,防止重复初始化if (nowTime > startTime + interval) {accumulator.set(0);startTime = nowTime;}}return 0;}}// 线程池,用于多线程模拟测试private ExecutorService pool = Executors.newFixedThreadPool(10);@Testpublic void testLimit() {// 被限制的次数AtomicInteger limited = new AtomicInteger(0);// 线程数final int threads = 2;// 每条线程的执行轮数final int turns = 20;// 同步器CountDownLatch countDownLatch = new CountDownLatch(threads);long start = System.currentTimeMillis();for (int i = 0; i < threads; i++) {pool.submit(() -> {try {for (int j = 0; j < turns; j++) {long taskId = Thread.currentThread().getId();long index = tryAcquire(taskId, j);if (index <= 0) {// 被限制的次数累积limited.getAndIncrement();}Thread.sleep(200);}} catch (Exception e) {e.printStackTrace();}// 等待所有线程结束countDownLatch.countDown();});}try {countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}float time = (System.currentTimeMillis() - start) / 1000F;// 输出统计结果log.info("限制的次数为:" + limited.get() + ",通过的次数为:" + (threads * turns - limited.get()));log.info("限制的比例为:" + (float) limited.get() / (float) (threads * turns));log.info("运行的时长为:" + time);}
}
  • 算法问题:

这个算法虽然简单,但是有一个十分致命的问题,那就是临界问题,我们看下图:

从上图中我们可以看到,假设:有一个恶意用户,他在0:59时,瞬间发送了100个请求,并且1:00又瞬间发送了100个请求,那么其实这个用户在 1秒里面,瞬间发送了200个请求。

而我们刚才规定的是1分钟最多100个请求(规划的吞吐量),也就是每秒钟最多1.7个请求,用户通过在时间窗口的重置节点处突发请求, 可以瞬间超过我们的速率限制。用户有可能通过算法的这个漏洞,瞬间压垮我们的应用。

2.2 漏桶算法

  • 算法原理

漏桶算法其实很简单,可以粗略的认为就是注水漏水过程,往桶中以任意速率流入水,以一定速率流出水,当水超过桶容量(capacity)则丢弃,因为桶容量是不变的,保证了整体的速率。

可以看出漏桶算法能强行限制数据的传输速率,起到了缓冲与学风的作用,如图所示。

漏桶限流大致的规则如下:

(1)进水口(对应客户端请求)以任意速率流入进入漏桶。

(2)漏桶的容量是固定的,出水(放行)速率也是固定的。

(3)漏桶容量是不变的,如果处理速度太慢,桶内水量会超出了桶的容量,则后面流入的水滴会溢出,表示请求拒绝。

  • 算法实现

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;// 漏桶 限流
@Slf4j
public class LeakBucketLimiter {// 计算的起始时间private static long lastOutTime = System.currentTimeMillis();// 流出速率 每秒 2 次private static int leakRate = 2;// 桶的容量private static int capacity = 2;//剩余的水量private static AtomicInteger water = new AtomicInteger(0);//返回值说明:// false 没有被限制到// true 被限流public static synchronized boolean isLimit(long taskId, int turn) {// 如果是空桶,就当前时间作为漏出的时间if (water.get() == 0) {lastOutTime = System.currentTimeMillis();water.addAndGet(1);return false;}// 执行漏水int waterLeaked = ((int) ((System.currentTimeMillis() - lastOutTime) / 1000)) * leakRate;// 计算剩余水量int waterLeft = water.get() - waterLeaked;water.set(Math.max(0, waterLeft));// 重新更新leakTimeStamplastOutTime = System.currentTimeMillis();// 尝试加水,并且水还未满 ,放行if ((water.get()) < capacity) {water.addAndGet(1);return false;} else {// 水满,拒绝加水, 限流return true;}}//线程池,用于多线程模拟测试private ExecutorService pool = Executors.newFixedThreadPool(10);@Testpublic void testLimit() {// 被限制的次数AtomicInteger limited = new AtomicInteger(0);// 线程数final int threads = 2;// 每条线程的执行轮数final int turns = 20;// 线程同步器CountDownLatch countDownLatch = new CountDownLatch(threads);long start = System.currentTimeMillis();for (int i = 0; i < threads; i++) {pool.submit(() -> {try {for (int j = 0; j < turns; j++) {long taskId = Thread.currentThread().getId();boolean intercepted = isLimit(taskId, j);if (intercepted) {// 被限制的次数累积limited.getAndIncrement();}Thread.sleep(200);}} catch (Exception e) {e.printStackTrace();}//等待所有线程结束countDownLatch.countDown();});}try {countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}float time = (System.currentTimeMillis() - start) / 1000F;//输出统计结果log.info("限制的次数为:" + limited.get() + ",通过的次数为:" + (threads * turns - limited.get()));log.info("限制的比例为:" + (float) limited.get() / (float) (threads * turns));log.info("运行的时长为:" + time);}
}
  • 算法问题

漏桶的出水速度固定,也就是请求放行速度是固定的,不能灵活的应对后端能力提升。

比如,想要通过动态扩容,使后端流量从1000QPS提升到1WQPS,漏桶就没有办法实现。

所以常常这样讲,漏桶不能有效应对突发流量,只能起到平滑突发流量(整流)的作用。

2.3 令牌桶算法

  • 算法原理

令牌桶算法以一个设定的速率产生令牌并放入令牌桶,每次用户请求都得申请令牌,如果令牌不足,则拒绝请求。

当然,令牌的数量也是有上限的。令牌的数量与时间和发放速率强相关,时间流逝的时间越长,会不断往桶里加入越多的令牌,如果令牌发放的速度比申请速度快,令牌桶会放满令牌,直到令牌占满整个令牌桶,如图所示。

令牌桶限流大致的规则如下:

(1)进水口按照某个速度,向桶中放入令牌。
(2)令牌的容量是固定的,但是放行的速度不是固定的,只要桶中还有剩余令牌,一旦请求过来就能申请成功,然后放行。
(3)如果令牌的发放速度,慢于请求到来速度,桶内就无牌可领,请求就会被拒绝。

总之,令牌的发送速率可以设置,从而可以对突发的出口流量进行有效的应对。

令牌桶与漏桶相似,不同的是令牌桶桶中放了一些令牌,服务请求到达后,要获取令牌之后才会得到服务。令牌使用的灵活性赋予了令牌桶使用场景的灵活性,除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发传输。

  • 算法实现

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;// 令牌桶 限速
@Slf4j
public class TokenBucketLimiter {// 上一次令牌发放时间public long lastTime = System.currentTimeMillis();// 桶的容量public int capacity = 2;// 令牌生成速度 /spublic int rate = 2;// 当前令牌数量public AtomicInteger tokens = new AtomicInteger(0);// 返回值说明:false 没有被限制到,true 被限流public synchronized boolean isLimited(long taskId, int applyCount) {long now = System.currentTimeMillis();//时间间隔,单位为 mslong gap = now - lastTime;//计算时间段内的令牌数int reverse_permits = (int) (gap * rate / 1000);int all_permits = tokens.get() + reverse_permits;// 当前令牌数tokens.set(Math.min(capacity, all_permits));log.info("tokens {} capacity {} gap {} ", tokens, capacity, gap);if (tokens.get() < applyCount) {// 若拿不到令牌,则拒绝// log.info("被限流了.." + taskId + ", applyCount: " + applyCount);return true;} else {// 还有令牌,领取令牌tokens.getAndAdd( - applyCount);lastTime = now;// log.info("剩余令牌.." + tokens);return false;}}//线程池,用于多线程模拟测试private ExecutorService pool = Executors.newFixedThreadPool(10);@Testpublic void testLimit() {// 被限制的次数AtomicInteger limited = new AtomicInteger(0);// 线程数final int threads = 2;// 每条线程的执行轮数final int turns = 20;// 同步器CountDownLatch countDownLatch = new CountDownLatch(threads);long start = System.currentTimeMillis();for (int i = 0; i < threads; i++) {pool.submit(() -> {try {for (int j = 0; j < turns; j++) {long taskId = Thread.currentThread().getId();boolean intercepted = isLimited(taskId, 1);if (intercepted) {// 被限制的次数累积limited.getAndIncrement();}Thread.sleep(200);}} catch (Exception e) {e.printStackTrace();}//等待所有线程结束countDownLatch.countDown();});}try {countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}float time = (System.currentTimeMillis() - start) / 1000F;//输出统计结果log.info("限制的次数为:" + limited.get() + ",通过的次数为:" + (threads * turns - limited.get()));log.info("限制的比例为:" + (float) limited.get() / (float) (threads * turns));log.info("运行的时长为:" + time);}
}
  • 算法优点

令牌桶的好处之一就是可以方便地应对突发出口流量(后端能力的提升)。比如,可以改变令牌的发放速度,算法能按照新的发送速率调大令牌的发放数量,使得出口突发流量能被处理。


三、技术实现

3.1 Nginx漏桶限流

  • 在http块里边定义限流的内存区域 zone:
  limit_req_zone  $arg_sku_id  zone=skuzone:10m      rate=6r/m;limit_req_zone  $http_user_id  zone=userzone:10m      rate=6r/m;limit_req_zone  $binary_remote_addr  zone=perip:10m      rate=6r/m;limit_req_zone  $server_name        zone=perserver:1m   rate=10r/s;
  • 在location块中使用 限流zone:

    #  ratelimit by sku idlocation  = /ratelimit/sku {limit_req  zone=skuzone;echo "正常的响应";}

3.2 redission分布式组件

setnx() 方法。redission 分布式限流采用令牌桶思想和固定时间窗口,trySetRate方法设置桶的大小,利用redis key过期机制达到时间窗口目的,控制固定时间窗口内允许通过的请求量。

3.3 redis+lua分布式限流组件

在redis中,为了避免重复发送脚本数据浪费网络资源,可以使用script load命令进行脚本数据缓存,并且返回一个哈希码作为脚本的调用句柄,每次调用脚本只需要发送哈希码来调用即可。

--- 此脚本的环境: redis 内部,不是运行在 nginx 内部---方法:申请令牌
--- -1 failed
--- 1 success
--- @param key key 限流关键字
--- @param apply  申请的令牌数量
local function acquire(key, apply)local times = redis.call('TIME');-- times[1] 秒数   -- times[2] 微秒数local curr_mill_second = times[1] * 1000000 + times[2];curr_mill_second = curr_mill_second / 1000;local cacheInfo = redis.pcall("HMGET", key, "last_mill_second", "curr_permits", "max_permits", "rate")--- 局部变量:上次申请的时间local last_mill_second = cacheInfo[1];--- 局部变量:之前的令牌数local curr_permits = tonumber(cacheInfo[2]);--- 局部变量:桶的容量local max_permits = tonumber(cacheInfo[3]);--- 局部变量:令牌的发放速率local rate = cacheInfo[4];--- 局部变量:本次的令牌数local local_curr_permits = 0;if (type(last_mill_second) ~= 'boolean' and last_mill_second ~= nil) then-- 计算时间段内的令牌数local reverse_permits = math.floor(((curr_mill_second - last_mill_second) / 1000) * rate);-- 令牌总数local expect_curr_permits = reverse_permits + curr_permits;-- 可以申请的令牌总数local_curr_permits = math.min(expect_curr_permits, max_permits);else-- 第一次获取令牌redis.pcall("HSET", key, "last_mill_second", curr_mill_second)local_curr_permits = max_permits;endlocal result = -1;-- 有足够的令牌可以申请if (local_curr_permits - apply >= 0) then-- 保存剩余的令牌redis.pcall("HSET", key, "curr_permits", local_curr_permits - apply);-- 为下次的令牌获取,保存时间redis.pcall("HSET", key, "last_mill_second", curr_mill_second)-- 返回令牌获取成功result = 1;else-- 返回令牌获取失败result = -1;endreturn result
end
--eg
-- /usr/local/redis/bin/redis-cli  -a 123456  --eval   /vagrant/LuaDemoProject/src/luaScript/redis/rate_limiter.lua key , acquire 1  1-- 获取 sha编码的命令
-- /usr/local/redis/bin/redis-cli  -a 123456  script load "$(cat  /vagrant/LuaDemoProject/src/luaScript/redis/rate_limiter.lua)"
-- /usr/local/redis/bin/redis-cli  -a 123456  script exists  "cf43613f172388c34a1130a760fc699a5ee6f2a9"-- /usr/local/redis/bin/redis-cli -a 123456  evalsha   "cf43613f172388c34a1130a760fc699a5ee6f2a9" 1 "rate_limiter:seckill:1"  init 1  1
-- /usr/local/redis/bin/redis-cli -a 123456  evalsha   "cf43613f172388c34a1130a760fc699a5ee6f2a9" 1 "rate_limiter:seckill:1"  acquire 1--local rateLimiterSha = "e4e49e4c7b23f0bf7a2bfee73e8a01629e33324b";---方法:初始化限流 Key
--- 1 success
--- @param key key
--- @param max_permits  桶的容量
--- @param rate  令牌的发放速率
local function init(key, max_permits, rate)local rate_limit_info = redis.pcall("HMGET", key, "last_mill_second", "curr_permits", "max_permits", "rate")local org_max_permits = tonumber(rate_limit_info[3])local org_rate = rate_limit_info[4]if (org_max_permits == nil) or (rate ~= org_rate or max_permits ~= org_max_permits) thenredis.pcall("HMSET", key, "max_permits", max_permits, "rate", rate, "curr_permits", max_permits)endreturn 1;
end
--eg
-- /usr/local/redis/bin/redis-cli -a 123456 --eval   /vagrant/LuaDemoProject/src/luaScript/redis/rate_limiter.lua key , init 1  1
-- /usr/local/redis/bin/redis-cli -a 123456 --eval   /vagrant/LuaDemoProject/src/luaScript/redis/rate_limiter.lua  "rate_limiter:seckill:1"  , init 1  1---方法:删除限流 Key
local function delete(key)redis.pcall("DEL", key)return 1;
end
--eg
-- /usr/local/redis/bin/redis-cli  --eval   /vagrant/LuaDemoProject/src/luaScript/redis/rate_limiter.lua key , deletelocal key = KEYS[1]
local method = ARGV[1]
if method == 'acquire' thenreturn acquire(key, ARGV[2], ARGV[3])
elseif method == 'init' thenreturn init(key, ARGV[2], ARGV[3])
elseif method == 'delete' thenreturn delete(key)
else--ignore
end

3.4 Guava RateLimiter

Guava 是Java领域优秀的开源项目,它包含了Google在Java项目中使用一些核心库,包含集合(Collections),缓存(Caching),并发编程库(Concurrency),常用注解(Common annotations),String操作,I/O操作方面的众多非常实用的函数。

Guava的 RateLimiter提供了令牌桶算法实现:平滑突发限流(SmoothBursty)和平滑预热限流(SmoothWarmingUp)实现。


结论

相较于降级和熔断,限流方法的用处是最广的,只需要关注服务器的承受能力,不需要关注集群,不需要区分核心业务,更不需要将非核心服务停掉以满足核心服务的可用性,所以,限流也是解决可用性最容易想到的方案。


高并发策略之限流:计数器、漏桶、令牌桶 三大算法的原理与实战(史上最全)相关推荐

  1. 高并发中的 限流、熔断、降级、预热、背压你都知道是什么意思吗?

    首先,我们需要明确一下这几个名词出现的场景:分布式高并发环境.如果你的产品卖相不好,没人鸟它,那它就用不着这几个属性.不需要任何加成,低并发系统就能工作的很好. 分布式系统是一个整体,调用关系错综复杂 ...

  2. Java高并发系统的限流策略

    限流算法 令牌桶(Token Bucket).漏桶(leaky bucket)和计数器算法是最常用的三种限流的算法. 计数器限流算法也是比较常用的,主要用来限制总并发数,比如数据库连接池大小.线程池大 ...

  3. 谈谈高并发系统的限流

    开涛大神在博客中说过:在开发高并发系统时有三把利器用来保护系统:缓存.降级和限流.本文结合作者的一些经验介绍限流的相关概念.算法和常规的实现方式. 缓存 缓存比较好理解,在大型高并发系统中,如果没有缓 ...

  4. 慌了,居然被问到怎么做高并发系统的限流

    点击上方"朱小厮的博客",选择"设为星标" 后台回复"加群"加入公众号专属技术群 来源:uee.me/cDuRD 在开发高并发系统时有三把利 ...

  5. 面试官 | 讲一下如何给高并发系统做限流?

    作者 | nick hao 来源 | uee.me/cDuRD 在开发高并发系统时有三把利器用来保护系统:缓存.降级和限流.本文结合作者的一些经验介绍限流的相关概念.算法和常规的实现方式. 缓存 缓存 ...

  6. 高并发系统的限流算法与实现

    开发高并发系统时有三把利器用来保护系统:缓存.降级和限流. 缓存:缓存的目的是提升系统访问速度和增大系统处理容量. 降级:降级是当服务器压力剧增的情况下,根据当前业务情况及流量对一些服务和页面有策略的 ...

  7. 高并发系统之限流特技

    转载至:http://blog.csdn.net/g_hongjin/article/details/51649246 在开发高并发系统时有三把利器用来保护系统:缓存.降级和限流.缓存的目的是提升系统 ...

  8. 聊聊高并发系统之限流特技-1

    在开发高并发系统时有三把利器用来保护系统:缓存.降级和限流. 缓存的目的是提升系统访问速度和增大系统能处理的容量,可谓是抗高并发流量的银弹:而降级是当服务出问题或者影响到核心流程的性能则需要暂时屏蔽掉 ...

  9. javaweb对于高并发策略--限流

    1.对于后端开发来说基本策略:缓存,限流,降级 缓存 缓存比较好理解,在大型高并发系统中,如果没有缓存数据库将分分钟被爆,系统也会瞬间瘫痪.使用缓存不单单能够提升系统访问速度.提高并发访问量,也是保护 ...

最新文章

  1. ADO.NET的连接模式
  2. 基于GA的TSP问题
  3. 时序预测的三种方式:统计学模型、机器学习、循环神经网络
  4. C++ ——统一初始化
  5. css样式继承规则详解
  6. Hibernate4一对一关系映射(共享主键方式)
  7. 【JeeSite】用户管理
  8. ASP.NET Core Web 资源打包与压缩
  9. c++动态内存管理题目
  10. LeetCode 2080. 区间内查询数字的频率(哈希+二分查找)
  11. rabbit和mysql事务_分布式事务原理及SpringBoot整合RabbitMQ实现可靠事件,TCC事务模型及接口幂等性...
  12. SSE图像算法优化系列十八:三次卷积插值的进一步SSE优化。
  13. 怎么看oracle的procedure,Oracle基础 -- SQLPlus如何查看procedure的内容
  14. matlab dicom图像异常,用Matlab处理Dicom图像
  15. 报名国电没有计算机二级,想进入国电,捧起“铁饭碗”?没问题,这几个专业助你成功...
  16. Python collections 模块中的 deque(队列)
  17. linux web目录安全设置,[LNMP]Linux的Web环境的安全配置
  18. Silverlight+WCF 新手实例 象棋 该谁下棋-A下B停(二十八)
  19. java可视化工作流_强大的java工作流引擎,可视化开发工作流
  20. 微软bi报表服务器,安装 Power BI 报表服务器

热门文章

  1. 文件夹显示无法访问、拒绝访问需要权限的解决方法
  2. python操作word文档-python操作word
  3. hdu 6825 Set1
  4. pdf 分形 张济忠_分形 第二版 [张济忠 编著] 2011年版 - 资料下载|书籍手册|数学书籍 - 建筑资料网...
  5. 车身域控制器(BDCU)
  6. 神经网络的概念和基本用法
  7. oracle merge into 批量新增或更新
  8. python imread函数_OpenCV 使用imread()函数读取图片的六种正确姿势
  9. Origin如何绘制三维离散点并拟合曲面?
  10. OpenDocument文件格式在政府机关中的重要性