Redis实现分布式锁的正确姿势 | Spring Cloud 36
一、分布式锁
1.1 什么是分布式锁
分布式锁,即分布式系统中的锁。在单体应用中我们通过锁解决的是控制共享资源访问的问题,而分布式锁,就是解决了分布式系统中控制共享资源访问的问题。与单体应用不同的是,分布式系统中竞争共享资源的最小粒度从线程升级成了进程。
1.2 分布式锁应该具备哪些条件
在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行
高可用的获取锁与释放锁
高性能的获取锁与释放锁
具备可重入特性(可理解为重新进入,由多于一个任务并发使用,而不必担心数据错误)
具备锁失效机制,即自动解锁,防止死锁
具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败
1.3 分布式锁的实现方式
基于数据库实现分布式锁
基于
Zookeeper
实现分布式锁基于
Reids
实现分布式锁
本章重点讲解的是基于
Reids
的分布式锁
二、利用RedisTemplate实现分布式锁
在真实的项目里,我们经常会使用Spring Boot/Spring Cloud
框架,该框架已经集成了 RedisTemplate
类,开放了很多对Redis
的API
。
本章节列举基于
RedisTemplate
实现方式分布式锁的各种方式及存在的问题。
2.1 利用setIfAbsent先设置key value再设置expire
redisTemplate.opsForValue().setIfAbsent(key,value);
redisTemplate.expire(key, time, TimeUnit.SECONDS);
问题:「不是原子操作」。以上两条语句不是原子性的。假如执行完第一条语句后,服务挂掉,导致
key
永久存在,锁无法释放。
setIfAbsent(key, value)
方法简介:如果key
不存在则新增,返回true
;存在则不改变已经有的值返回false
。
2.2 利用setIfAbsent同时设置 key value expire
redisTemplate.opsForValue().setIfAbsent(key, value, time, TimeUnit.SECONDS);
问题一:「锁过期释放了,业务还没执行完」。假设线程a获取锁成功,一直在执行临界区的代码。但是100s过去后,它还没执行完。但是,这时候锁已经过期了,此时线程b又请求过来。显然线程b就可以获得锁成功,也开始执行临界区的代码。那么问题就来了,临界区的业务代码都不是严格串行执行。
问题二:「锁被别的线程误删」。假设线程a执行完后,去释放锁。但是它不知道当前的锁可能是线程b持有的(线程a去释放锁时,有可能过期时间已经到了,此时线程b进来占有了锁)。那线程a就把线程b的锁释放掉了,但是线程b临界区业务代码可能都还没执行完。
2.3 利用setIfAbsent同时设置 key value expire,value为客户端标识
可以在获取锁,解锁时。首先获取客户端标识,如果和加锁时不一致,则获解锁操作失败,解决了2.2
中提到了「锁被别的线程误删」
问题。
Callable<String> callable = () -> {String result = "";String threadId = Thread.currentThread().getId() + "";if (Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, threadId, 10, TimeUnit.SECONDS))) {result = threadId;try {// 模拟业务处理Thread.sleep(5000);} catch (Exception e) {e.printStackTrace();} finally {if (redisTemplate.opsForValue().get(key).equals(threadId)) {redisTemplate.delete(key);}}}return result;
};
List<Future<String>> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {Future<String> future = executorService.submit(callable);list.add(future);
}
for (Future<String> future : list) {log.info(future.get());
}
问题:「不是原子操作」。通过
key
获取加锁时的客户端标识和释放锁两条语句不是原子操作。
2.4 利用lua脚本进行加锁及释放锁原子操作
本章重点,通过
lua
脚本对2.3
提出的原子性问题进行解决。
RedisTemplate
执行lua
脚本加锁释放锁工具类:
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.List;@Component
public class ConcurrentLockUtil {private static final String LOCK_LUA = "if redis.call('setnx', KEYS[1], ARGV[1]) == 1 then redis.call('expire', KEYS[1], ARGV[2]) return 'true' else return 'false' end";private static final String UNLOCK_LUA = "if redis.call('get', KEYS[1]) == ARGV[1] then redis.call('del', KEYS[1]) end return 'true' ";private final RedisScript lockRedisScript;private final RedisScript unLockRedisScript;private final RedisSerializer<String> argsSerializer;private final RedisSerializer<String> resultSerializer;private RedisTemplate redisTemplate;public ConcurrentLockUtil(RedisTemplate redisTemplate) {this.argsSerializer = new StringRedisSerializer();this.resultSerializer = new StringRedisSerializer();this.lockRedisScript = RedisScript.of(LOCK_LUA, String.class);this.unLockRedisScript = RedisScript.of(UNLOCK_LUA, String.class);this.redisTemplate = redisTemplate;}/*** 分布式锁** @param lockKey* @param value* @param time* @return*/public boolean lock(String lockKey, String value, long time) {List<String> keys = Collections.singletonList(lockKey);String flag = (String) redisTemplate.execute(lockRedisScript, argsSerializer, resultSerializer, keys, value, String.valueOf(time));return Boolean.valueOf(flag);}/*** 删除锁** @param lock* @param val*/public void unlock(String lock, String val) {List<String> keys = Collections.singletonList(lock);redisTemplate.execute(unLockRedisScript, argsSerializer, resultSerializer, keys, val);}
}
业务调用:
@Autowired
ConcurrentLockUtil concurrentLockUtil;ExecutorService executorService = Executors.newFixedThreadPool(10);
String key = "test2-lock";Callable<String> callable = () -> {String result = "";String threadId = Thread.currentThread().getId() + "";if (Boolean.TRUE.equals(concurrentLockUtil.lock(key, threadId, 10))) {result = threadId;try {Thread.sleep(5000);} catch (Exception e) {e.printStackTrace();} finally {concurrentLockUtil.unlock(key, threadId);}}return result;
};
List<Future<String>> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {Future<String> future = executorService.submit(callable);list.add(future);
}
for (Future<String> future : list) {log.info(future.get());
}
问题:针对
2.2
章节提到的「锁过期释放了,业务还没执行完」问题仍然存在。
有些小伙伴认为,稍微把锁过期时间设置长一些就可以。其实我们设想一下,是否可以给获得锁的线程,开启一个定时守护线程,每隔一段时间检查锁是否还存在,存在则对锁的过期时间延长,防止锁过期提前释放,请见下面章节。
三、利用Redisson框架RLock实现分布式锁
开源框架Redisson
解决了2.1.4
提出的问题。官网地址:https://github.com/redisson/redisson/wiki/1.-%E6%A6%82%E8%BF%B0
我们一起来看下Redisson
底层原理图:
只要线程一加锁成功,就会启动一个watch dog
看门狗,它是一个后台线程,会每隔10
秒检查一下,如果线程1还持有锁,那么就会不断的延长锁key
的生存时间(Redisson
中使用的 Lua
脚本做的检查及设置过期时间操作,本身是原子性的)。因此,解决了「锁过期释放,业务没执行完」
问题。
3.1 加锁逻辑
Redisson
中 Lua
加锁脚本定义及流程如下:
3.2 解锁逻辑
Redisson
中 Lua
解锁脚本定义及流程如下:
3.3 续锁逻辑
可以看到续时方法将 threadId
当作标识符进行续时
知道核心理念就好了, 没必要研究每一行代码
四、利用RedissonRedLock多机实现分布式锁
4.1 单机版锁存在的问题
前面章节的几种方案都只是基于单机版的讨论,还不是很完美。其实Redis
一般都是集群部署的:
如果线程一在Redis
的master
节点上拿到了锁,但是加锁的key
还没同步到slave
节点。恰好这时,master
节点发生故障,一个slave
节点就会升级为master
节点。线程二就可能获取同个key
的锁,但线程一也已经拿到锁了,锁的安全性就没了。
4.2 RedissonRedLock核心思想
为了解决上述问题,Redis
作者 antirez
提出一种高级的分布式锁算法:Redlock
。Redlock
核心思想是这样的:
搞多个
Redis
master
部署,以保证它们不会同时宕掉。并且这些master
节点是完全相互独立的,相互之间不存在数据同步。同时,需要确保在这多个master
实例上,是与在Redis
单实例,使用相同方法来获取和释放锁。
RedissonRedLock
算法:
RedissonRedLock
业务逻辑实现:
Config config1 = new Config();
config1.useSingleServer().setAddress("redis://172.0.0.1:5378").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient1 = Redisson.create(config1);Config config2 = new Config();
config2.useSingleServer().setAddress("redis://172.0.0.1:5379").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient2 = Redisson.create(config2);Config config3 = new Config();
config3.useSingleServer().setAddress("redis://172.0.0.1:5380").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient3 = Redisson.create(config3);String lockKey = "myLock";
int waitTimeout = 5;
int leaseTime = 30;/*** 获取多个 RLock 对象*/
RLock lock1 = redissonClient1.getLock(lockKey);
RLock lock2 = redissonClient2.getLock(lockKey);
RLock lock3 = redissonClient3.getLock(lockKey);/*** 根据多个 RLock 对象构建 RedissonRedLock (最核心的差别就在这里)*/
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);try {/*** 4.尝试获取锁* waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败* leaseTime 锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)*/boolean res = redLock.tryLock((long) waitTimeout, (long) leaseTime, TimeUnit.SECONDS);if (res) {//成功获得锁,在这里处理业务}
} catch (Exception e) {throw new RuntimeException("aquire lock fail");
} finally {//无论如何, 最后都要解锁redLock.unlock();
}
RedissonRedLock
核心源码:
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long newLeaseTime = -1;if (leaseTime != -1) {newLeaseTime = unit.toMillis(waitTime)*2;}long time = System.currentTimeMillis();long remainTime = -1;if (waitTime != -1) {remainTime = unit.toMillis(waitTime);}long lockWaitTime = calcLockWaitTime(remainTime);/*** 1. 允许加锁失败节点个数限制(N-(N/2+1))*/int failedLocksLimit = failedLocksLimit();/*** 2. 遍历所有节点通过EVAL命令执行lua加锁*/List<RLock> acquiredLocks = new ArrayList<>(locks.size());for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {RLock lock = iterator.next();boolean lockAcquired;/*** 3.对节点尝试加锁*/try {if (waitTime == -1 && leaseTime == -1) {lockAcquired = lock.tryLock();} else {long awaitTime = Math.min(lockWaitTime, remainTime);lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);}} catch (RedisResponseTimeoutException e) {// 如果抛出这类异常,为了防止加锁成功,但是响应失败,需要解锁所有节点unlockInner(Arrays.asList(lock));lockAcquired = false;} catch (Exception e) {// 抛出异常表示获取锁失败lockAcquired = false;}if (lockAcquired) {/***4. 如果获取到锁则添加到已获取锁集合中*/acquiredLocks.add(lock);} else {/*** 5. 计算已经申请锁失败的节点是否已经到达 允许加锁失败节点个数限制 (N-(N/2+1))* 如果已经到达, 就认定最终申请锁失败,则没有必要继续从后面的节点申请了* 因为 Redlock 算法要求至少N/2+1 个节点都加锁成功,才算最终的锁申请成功*/if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {break;}if (failedLocksLimit == 0) {unlockInner(acquiredLocks);if (waitTime == -1 && leaseTime == -1) {return false;}failedLocksLimit = failedLocksLimit();acquiredLocks.clear();// reset iteratorwhile (iterator.hasPrevious()) {iterator.previous();}} else {failedLocksLimit--;}}/*** 6.计算 目前从各个节点获取锁已经消耗的总时间,如果已经等于最大等待时间,则认定最终申请锁失败,返回false*/if (remainTime != -1) {remainTime -= System.currentTimeMillis() - time;time = System.currentTimeMillis();if (remainTime <= 0) {unlockInner(acquiredLocks);return false;}}}if (leaseTime != -1) {List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());for (RLock rLock : acquiredLocks) {RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);futures.add(future);}for (RFuture<Boolean> rFuture : futures) {rFuture.syncUninterruptibly();}}/*** 7.如果逻辑正常执行完则认为最终申请锁成功,返回true*/return true;
}
当然,对于 RedissonRedLock
算法不是没有质疑声, 大家可以去 Redis
官网查看Martin Kleppmann
与 Redis
作者Antirez
的辩论。
Redis实现分布式锁的正确姿势 | Spring Cloud 36相关推荐
- 【分布式缓存系列】Redis实现分布式锁的正确姿势
一.前言 在我们日常工作中,除了Spring和Mybatis外,用到最多无外乎分布式缓存框架--Redis.但是很多工作很多年的朋友对Redis还处于一个最基础的使用和认识.所以我就像把自己对分布式缓 ...
- 【最全】Spring Boot 实现分布式锁——这才是实现分布式锁的正确姿势!
ava世界的"半壁江山"--Spring早就提供了分布式锁的实现.早期,分布式锁的相关代码存在于Spring Cloud的子项目Spring Cloud Cluster中,后来被迁 ...
- 程序员如何 Get 分布式锁的正确姿势?| 技术头条
作者 | 刘春龙 责编 | 郭芮 在很多互联网产品应用中,有些场景需要加锁处理,比如秒杀.全局递增ID.楼层生成等等,大部分的解决方案是基于DB实现的,Redis也是较为常见的方案之一. Redis为 ...
- 这才是实现分布式锁的正确姿势!
都9102年了,你还在手写分布式锁吗? 经常被问到"如何实现分布式锁",看来这是大家的一个痛点. 其实Java世界的"半壁江山"--Spring早就提供了分布式 ...
- 掌握Redis分布式锁的正确姿势
本文中案例都会在上传到git上,请放心浏览 git地址:https://github.com/muxiaonong/Spring-Cloud/tree/master/order-lock 本文会使用到 ...
- Redis实现分布式锁的深入探究
点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试文章 一.分布式锁简介 锁 是一种用来解决多个执行线程 访问共享资源 错 ...
- Redis之分布式锁
# 基于 Redis 实现分布式锁的三种方案 用 Redis 实现分布式锁的正确姿势(实现一) 用 Redisson 实现分布式可重入锁(RedissonLock)(实现二) 用 Redisson 实 ...
- Redis 作者 Antirez 讲如何实现分布式锁?Redis 实现分布式锁天然的缺陷分析Redis分布式锁的正确使用姿势!...
Redis分布式锁基本原理 采用 redis 实现分布式锁,主要是利用其单线程命令执行的特性,一般是 setnx, 只会有一个线程会执行成功,也就是只有一个线程能成功获取锁:看着很完美. 然而-- 看 ...
- 七种方案!探讨Redis分布式锁的正确使用姿势
前言 日常开发中,秒杀下单.抢红包等等业务场景,都需要用到分布式锁.而Redis非常适合作为分布式锁使用.本文将分七个方案展开,跟大家探讨Redis分布式锁的正确使用方式.如果有不正确的地方,欢迎大家 ...
最新文章
- dom vue 加载完 执行_前端面试题——Vue
- C# 学习笔记(13)自己的串口助手
- Java Virtual Machine Stacks(虚拟机栈)
- 干货:嵌入式C语言源代码优化方案(非编译器优化)
- 每隔10秒钟打印一个“Helloworld”
- Dumb Bones UVA - 10529(概率dp)
- 实训41 2018.6.2
- pandas DataFrame数据转为list
- c语言vc怎么改变背景板颜色,VC设置视图背景颜色方法
- Markdown必备,Lsky-pro图床配置
- 360众筹网_360众筹平台
- TCP Socket与TCP 连接
- mmdetection3d S3DIS (持续更新)
- 【JavaWeb】AJAX
- MSP430F5529-PWM波在串口中的输出及调整
- 想做自媒体副业,有什么领域可推荐?
- ubuntu20.04 使用root用户登录系统
- ICP、ISP、IAP、JTAG、SWD下载方式
- 使用java自造TCP/IP协议栈:使用JPCAP实现数据发包
- Auvidea J120 TX2开发板 Jetpack刷机与驱动安装