一、分布式锁

1.1 什么是分布式锁

分布式锁,即分布式系统中的锁。在单体应用中我们通过锁解决的是控制共享资源访问的问题,而分布式锁,就是解决了分布式系统中控制共享资源访问的问题。与单体应用不同的是,分布式系统中竞争共享资源的最小粒度从线程升级成了进程。

1.2 分布式锁应该具备哪些条件

  • 在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行

  • 高可用的获取锁与释放锁

  • 高性能的获取锁与释放锁

  • 具备可重入特性(可理解为重新进入,由多于一个任务并发使用,而不必担心数据错误)

  • 具备锁失效机制,即自动解锁,防止死锁

  • 具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败

1.3 分布式锁的实现方式

  • 基于数据库实现分布式锁

  • 基于Zookeeper实现分布式锁

  • 基于Reids实现分布式锁

本章重点讲解的是基于Reids的分布式锁

二、利用RedisTemplate实现分布式锁

在真实的项目里,我们经常会使用Spring Boot/Spring Cloud框架,该框架已经集成了 RedisTemplate 类,开放了很多对RedisAPI

本章节列举基于RedisTemplate实现方式分布式锁的各种方式及存在的问题。

2.1 利用setIfAbsent先设置key value再设置expire

redisTemplate.opsForValue().setIfAbsent(key,value);
redisTemplate.expire(key, time, TimeUnit.SECONDS);

问题:「不是原子操作」。以上两条语句不是原子性的。假如执行完第一条语句后,服务挂掉,导致key永久存在,锁无法释放。

setIfAbsent(key, value)方法简介:如果key不存在则新增,返回 true;存在则不改变已经有的值返回 false

2.2 利用setIfAbsent同时设置 key value expire

redisTemplate.opsForValue().setIfAbsent(key, value, time, TimeUnit.SECONDS);

问题一:「锁过期释放了,业务还没执行完」。假设线程a获取锁成功,一直在执行临界区的代码。但是100s过去后,它还没执行完。但是,这时候锁已经过期了,此时线程b又请求过来。显然线程b就可以获得锁成功,也开始执行临界区的代码。那么问题就来了,临界区的业务代码都不是严格串行执行。

问题二:「锁被别的线程误删」。假设线程a执行完后,去释放锁。但是它不知道当前的锁可能是线程b持有的(线程a去释放锁时,有可能过期时间已经到了,此时线程b进来占有了锁)。那线程a就把线程b的锁释放掉了,但是线程b临界区业务代码可能都还没执行完。

2.3 利用setIfAbsent同时设置 key value expire,value为客户端标识

可以在获取锁,解锁时。首先获取客户端标识,如果和加锁时不一致,则获解锁操作失败,解决了2.2中提到了「锁被别的线程误删」问题。

Callable<String> callable = () -> {String result = "";String threadId = Thread.currentThread().getId() + "";if (Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, threadId, 10, TimeUnit.SECONDS))) {result = threadId;try {// 模拟业务处理Thread.sleep(5000);} catch (Exception e) {e.printStackTrace();} finally {if (redisTemplate.opsForValue().get(key).equals(threadId)) {redisTemplate.delete(key);}}}return result;
};
List<Future<String>> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {Future<String> future = executorService.submit(callable);list.add(future);
}
for (Future<String> future : list) {log.info(future.get());
}

问题:「不是原子操作」。通过key获取加锁时的客户端标识和释放锁两条语句不是原子操作。

2.4 利用lua脚本进行加锁及释放锁原子操作

本章重点,通过lua脚本对2.3提出的原子性问题进行解决。

RedisTemplate执行lua脚本加锁释放锁工具类:

import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.List;@Component
public class ConcurrentLockUtil {private static final String LOCK_LUA = "if redis.call('setnx', KEYS[1], ARGV[1]) == 1 then redis.call('expire', KEYS[1], ARGV[2]) return 'true' else return 'false' end";private static final String UNLOCK_LUA = "if redis.call('get', KEYS[1]) == ARGV[1] then redis.call('del', KEYS[1]) end return 'true' ";private final RedisScript lockRedisScript;private final RedisScript unLockRedisScript;private final RedisSerializer<String> argsSerializer;private final RedisSerializer<String> resultSerializer;private RedisTemplate redisTemplate;public ConcurrentLockUtil(RedisTemplate redisTemplate) {this.argsSerializer = new StringRedisSerializer();this.resultSerializer = new StringRedisSerializer();this.lockRedisScript = RedisScript.of(LOCK_LUA, String.class);this.unLockRedisScript = RedisScript.of(UNLOCK_LUA, String.class);this.redisTemplate = redisTemplate;}/*** 分布式锁** @param lockKey* @param value* @param time* @return*/public boolean lock(String lockKey, String value, long time) {List<String> keys = Collections.singletonList(lockKey);String flag = (String) redisTemplate.execute(lockRedisScript, argsSerializer, resultSerializer, keys, value, String.valueOf(time));return Boolean.valueOf(flag);}/*** 删除锁** @param lock* @param val*/public void unlock(String lock, String val) {List<String> keys = Collections.singletonList(lock);redisTemplate.execute(unLockRedisScript, argsSerializer, resultSerializer, keys, val);}
}

业务调用:

@Autowired
ConcurrentLockUtil concurrentLockUtil;ExecutorService executorService = Executors.newFixedThreadPool(10);
String key = "test2-lock";Callable<String> callable = () -> {String result = "";String threadId = Thread.currentThread().getId() + "";if (Boolean.TRUE.equals(concurrentLockUtil.lock(key, threadId, 10))) {result = threadId;try {Thread.sleep(5000);} catch (Exception e) {e.printStackTrace();} finally {concurrentLockUtil.unlock(key, threadId);}}return result;
};
List<Future<String>> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {Future<String> future = executorService.submit(callable);list.add(future);
}
for (Future<String> future : list) {log.info(future.get());
}

问题:针对2.2章节提到的「锁过期释放了,业务还没执行完」问题仍然存在。

有些小伙伴认为,稍微把锁过期时间设置长一些就可以。其实我们设想一下,是否可以给获得锁的线程,开启一个定时守护线程,每隔一段时间检查锁是否还存在,存在则对锁的过期时间延长,防止锁过期提前释放,请见下面章节。

三、利用Redisson框架RLock实现分布式锁

开源框架Redisson解决了2.1.4提出的问题。官网地址:https://github.com/redisson/redisson/wiki/1.-%E6%A6%82%E8%BF%B0

我们一起来看下Redisson底层原理图:

只要线程一加锁成功,就会启动一个watch dog看门狗,它是一个后台线程,会每隔10秒检查一下,如果线程1还持有锁,那么就会不断的延长锁key的生存时间(Redisson 中使用的 Lua 脚本做的检查及设置过期时间操作,本身是原子性的)。因此,解决了「锁过期释放,业务没执行完」问题。

3.1 加锁逻辑

RedissonLua 加锁脚本定义及流程如下:

3.2 解锁逻辑

RedissonLua 解锁脚本定义及流程如下:

3.3 续锁逻辑

可以看到续时方法将 threadId 当作标识符进行续时

知道核心理念就好了, 没必要研究每一行代码

四、利用RedissonRedLock多机实现分布式锁

4.1 单机版锁存在的问题

前面章节的几种方案都只是基于单机版的讨论,还不是很完美。其实Redis一般都是集群部署的:

如果线程一在Redismaster节点上拿到了锁,但是加锁的key还没同步到slave节点。恰好这时,master节点发生故障,一个slave节点就会升级为master节点。线程二就可能获取同个key的锁,但线程一也已经拿到锁了,锁的安全性就没了。

4.2 RedissonRedLock核心思想

为了解决上述问题,Redis作者 antirez提出一种高级的分布式锁算法:RedlockRedlock核心思想是这样的:

搞多个Redis master部署,以保证它们不会同时宕掉。并且这些master节点是完全相互独立的,相互之间不存在数据同步。同时,需要确保在这多个master实例上,是与在Redis单实例,使用相同方法来获取和释放锁。


RedissonRedLock算法:

RedissonRedLock业务逻辑实现:

Config config1 = new Config();
config1.useSingleServer().setAddress("redis://172.0.0.1:5378").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient1 = Redisson.create(config1);Config config2 = new Config();
config2.useSingleServer().setAddress("redis://172.0.0.1:5379").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient2 = Redisson.create(config2);Config config3 = new Config();
config3.useSingleServer().setAddress("redis://172.0.0.1:5380").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient3 = Redisson.create(config3);String lockKey = "myLock";
int waitTimeout = 5;
int leaseTime = 30;/*** 获取多个 RLock 对象*/
RLock lock1 = redissonClient1.getLock(lockKey);
RLock lock2 = redissonClient2.getLock(lockKey);
RLock lock3 = redissonClient3.getLock(lockKey);/*** 根据多个 RLock 对象构建 RedissonRedLock (最核心的差别就在这里)*/
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);try {/*** 4.尝试获取锁* waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败* leaseTime   锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)*/boolean res = redLock.tryLock((long) waitTimeout, (long) leaseTime, TimeUnit.SECONDS);if (res) {//成功获得锁,在这里处理业务}
} catch (Exception e) {throw new RuntimeException("aquire lock fail");
} finally {//无论如何, 最后都要解锁redLock.unlock();
}

RedissonRedLock核心源码:

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long newLeaseTime = -1;if (leaseTime != -1) {newLeaseTime = unit.toMillis(waitTime)*2;}long time = System.currentTimeMillis();long remainTime = -1;if (waitTime != -1) {remainTime = unit.toMillis(waitTime);}long lockWaitTime = calcLockWaitTime(remainTime);/*** 1. 允许加锁失败节点个数限制(N-(N/2+1))*/int failedLocksLimit = failedLocksLimit();/*** 2. 遍历所有节点通过EVAL命令执行lua加锁*/List<RLock> acquiredLocks = new ArrayList<>(locks.size());for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {RLock lock = iterator.next();boolean lockAcquired;/***  3.对节点尝试加锁*/try {if (waitTime == -1 && leaseTime == -1) {lockAcquired = lock.tryLock();} else {long awaitTime = Math.min(lockWaitTime, remainTime);lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);}} catch (RedisResponseTimeoutException e) {// 如果抛出这类异常,为了防止加锁成功,但是响应失败,需要解锁所有节点unlockInner(Arrays.asList(lock));lockAcquired = false;} catch (Exception e) {// 抛出异常表示获取锁失败lockAcquired = false;}if (lockAcquired) {/***4. 如果获取到锁则添加到已获取锁集合中*/acquiredLocks.add(lock);} else {/*** 5. 计算已经申请锁失败的节点是否已经到达 允许加锁失败节点个数限制 (N-(N/2+1))* 如果已经到达, 就认定最终申请锁失败,则没有必要继续从后面的节点申请了* 因为 Redlock 算法要求至少N/2+1 个节点都加锁成功,才算最终的锁申请成功*/if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {break;}if (failedLocksLimit == 0) {unlockInner(acquiredLocks);if (waitTime == -1 && leaseTime == -1) {return false;}failedLocksLimit = failedLocksLimit();acquiredLocks.clear();// reset iteratorwhile (iterator.hasPrevious()) {iterator.previous();}} else {failedLocksLimit--;}}/*** 6.计算 目前从各个节点获取锁已经消耗的总时间,如果已经等于最大等待时间,则认定最终申请锁失败,返回false*/if (remainTime != -1) {remainTime -= System.currentTimeMillis() - time;time = System.currentTimeMillis();if (remainTime <= 0) {unlockInner(acquiredLocks);return false;}}}if (leaseTime != -1) {List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());for (RLock rLock : acquiredLocks) {RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);futures.add(future);}for (RFuture<Boolean> rFuture : futures) {rFuture.syncUninterruptibly();}}/*** 7.如果逻辑正常执行完则认为最终申请锁成功,返回true*/return true;
}

当然,对于 RedissonRedLock 算法不是没有质疑声, 大家可以去 Redis 官网查看Martin KleppmannRedis 作者Antirez 的辩论。

Redis实现分布式锁的正确姿势 | Spring Cloud 36相关推荐

  1. 【分布式缓存系列】Redis实现分布式锁的正确姿势

    一.前言 在我们日常工作中,除了Spring和Mybatis外,用到最多无外乎分布式缓存框架--Redis.但是很多工作很多年的朋友对Redis还处于一个最基础的使用和认识.所以我就像把自己对分布式缓 ...

  2. 【最全】Spring Boot 实现分布式锁——这才是实现分布式锁的正确姿势!

    ava世界的"半壁江山"--Spring早就提供了分布式锁的实现.早期,分布式锁的相关代码存在于Spring Cloud的子项目Spring Cloud Cluster中,后来被迁 ...

  3. 程序员如何 Get 分布式锁的正确姿势?| 技术头条

    作者 | 刘春龙 责编 | 郭芮 在很多互联网产品应用中,有些场景需要加锁处理,比如秒杀.全局递增ID.楼层生成等等,大部分的解决方案是基于DB实现的,Redis也是较为常见的方案之一. Redis为 ...

  4. 这才是实现分布式锁的正确姿势!

    都9102年了,你还在手写分布式锁吗? 经常被问到"如何实现分布式锁",看来这是大家的一个痛点. 其实Java世界的"半壁江山"--Spring早就提供了分布式 ...

  5. 掌握Redis分布式锁的正确姿势

    本文中案例都会在上传到git上,请放心浏览 git地址:https://github.com/muxiaonong/Spring-Cloud/tree/master/order-lock 本文会使用到 ...

  6. Redis实现分布式锁的深入探究

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试文章 一.分布式锁简介 锁 是一种用来解决多个执行线程 访问共享资源 错 ...

  7. Redis之分布式锁

    # 基于 Redis 实现分布式锁的三种方案 用 Redis 实现分布式锁的正确姿势(实现一) 用 Redisson 实现分布式可重入锁(RedissonLock)(实现二) 用 Redisson 实 ...

  8. Redis 作者 Antirez 讲如何实现分布式锁?Redis 实现分布式锁天然的缺陷分析Redis分布式锁的正确使用姿势!...

    Redis分布式锁基本原理 采用 redis 实现分布式锁,主要是利用其单线程命令执行的特性,一般是 setnx, 只会有一个线程会执行成功,也就是只有一个线程能成功获取锁:看着很完美. 然而-- 看 ...

  9. 七种方案!探讨Redis分布式锁的正确使用姿势

    前言 日常开发中,秒杀下单.抢红包等等业务场景,都需要用到分布式锁.而Redis非常适合作为分布式锁使用.本文将分七个方案展开,跟大家探讨Redis分布式锁的正确使用方式.如果有不正确的地方,欢迎大家 ...

最新文章

  1. dom vue 加载完 执行_前端面试题——Vue
  2. C# 学习笔记(13)自己的串口助手
  3. Java Virtual Machine Stacks(虚拟机栈)
  4. 干货:嵌入式C语言源代码优化方案(非编译器优化)
  5. 每隔10秒钟打印一个“Helloworld”
  6. Dumb Bones UVA - 10529(概率dp)
  7. 实训41 2018.6.2
  8. pandas DataFrame数据转为list
  9. c语言vc怎么改变背景板颜色,VC设置视图背景颜色方法
  10. Markdown必备,Lsky-pro图床配置
  11. 360众筹网_360众筹平台
  12. TCP Socket与TCP 连接
  13. mmdetection3d S3DIS (持续更新)
  14. 【JavaWeb】AJAX
  15. MSP430F5529-PWM波在串口中的输出及调整
  16. 想做自媒体副业,有什么领域可推荐?
  17. ubuntu20.04 使用root用户登录系统
  18. ICP、ISP、IAP、JTAG、SWD下载方式
  19. 使用java自造TCP/IP协议栈:使用JPCAP实现数据发包
  20. Auvidea J120 TX2开发板 Jetpack刷机与驱动安装

热门文章

  1. C++调用Fortran动态库说明
  2. python按出现次数排序_Python编程题18--统计字母出现次数并排序
  3. 1 如何查看文件后缀名(全)
  4. 近7个月的专业学习总体计划
  5. SpringBoot 整合163邮箱 阿里云25端口问题
  6. 美团2020后台秋招笔试整理
  7. Xcode Cloud
  8. Android SoftAP 实现框架
  9. 初学者的购买指南,蓝牙麦克风
  10. 毕业论文中的问卷如何做效度分析?