Nacos

Gateway

nginx

redission

Redisson是一个在Redis的基础上实现的Java驻内存数据网格

redission在hash结构中的存储是一个hash结构

类似这样

key 285475da-9152-4c83-822a-67ee2f116a79:52 1

在Redisson中,使用key来作为是否上锁的标志,当通过getLock(String key)方法获得相应的锁之后,这个key即作为一个key存储到redis hash结构的key中,并且将线程ID放在value hash结构中的key上,value代表锁了几次,主要用于可重入锁的实现。

在接下来如果有其他的线程尝试获取名为key的锁时,便会向集群中进行查询,如果能够查到这个锁并发现相应的value的值不为0,则表示已经有其他线程申请了这个锁同时还没有释放,则当前线程进入阻塞,否则由当前线程在redis中存储锁,如果是可重入锁的话,就根据线程id进行比对,如果匹配上了则当前线程每获得一个自身线程的锁,就将value的值加一,而每释放一个锁则将value值减一,直到减至0,完全释放这个锁。

这个key可以通过RLock lock = redisson.getLock(key)指定

Redissson tryLock 的主流程:

@Overridepublic boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long time = unit.toMillis(waitTime);long current = System.currentTimeMillis();long threadId = Thread.currentThread().getId();// 1.尝试获取锁Long ttl = tryAcquire(leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}// 申请锁的耗时如果大于等于最大等待时间,则申请锁失败.time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(threadId);return false;}current = System.currentTimeMillis();/*** 2.订阅锁释放事件,并通过 await 方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题:* 基于信息量,当锁被其它资源占用时,当前线程通过 Redis 的 channel 订阅锁的释放事件,一旦锁释放会发消息通知待等待的线程进行竞争.** 当 this.await 返回 false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败.* 当 this.await 返回 true,进入循环尝试获取锁.*/RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);// await 方法内部是用 CountDownLatch 来实现阻塞,获取 subscribe 异步执行的结果(应用了 Netty 的 Future)if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {if (!subscribeFuture.cancel(false)) {subscribeFuture.onComplete((res, e) -> {if (e == null) {unsubscribe(subscribeFuture, threadId);}});}acquireFailed(threadId);return false;}try {// 计算获取锁的总耗时,如果大于等于最大等待时间,则获取锁失败.time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(threadId);return false;}/*** 3.收到锁释放的信号后,在最大等待时间之内,循环一次接着一次的尝试获取锁* 获取锁成功,则立马返回 true,* 若在最大等待时间之内还没获取到锁,则认为获取锁失败,返回 false 结束循环*/while (true) {long currentTime = System.currentTimeMillis();// 再次尝试获取锁ttl = tryAcquire(leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}// 超过最大等待时间则返回 false 结束循环,获取锁失败time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(threadId);return false;}/*** 6.阻塞等待锁(通过信号量(共享锁)阻塞,等待解锁消息):*/currentTime = System.currentTimeMillis();if (ttl >= 0 && ttl < time) {//如果剩余时间(ttl)小于wait time ,就在 ttl 时间内,从Entry的信号量获取一个许可(除非被中断或者一直没有可用的许可)。getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {//则就在wait time 时间范围内等待可以通过信号量getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}// 更新剩余的等待时间(最大等待时间-已经消耗的阻塞时间)time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(threadId);return false;}}} finally {// 7.无论是否获得锁,都要取消订阅解锁消息unsubscribe(subscribeFuture, threadId);}
//        return get(tryLockAsync(waitTime, leaseTime, unit));}

流程分析:

  1. 尝试获取锁,返回 null 则说明加锁成功,返回一个数值,则说明已经存在该锁,ttl 为锁的剩余存活时间。

  2. 如果此时客户端 2 进程获取锁失败,那么使用客户端 2 的线程 id(其实本质上就是进程 id)通过 Redis 的 channel 订阅锁释放的事件,。如果等待的过程中一直未等到锁的释放事件通知,当超过最大等待时间则获取锁失败,返回 false,也就是第 39 行代码。如果等到了锁的释放事件的通知,则开始进入一个不断重试获取锁的循环。

  3. 循环中每次都先试着获取锁,并得到已存在的锁的剩余存活时间。如果在重试中拿到了锁,则直接返回。如果锁当前还是被占用的,那么等待释放锁的消息,具体实现使用了 JDK 的信号量 Semaphore 来阻塞线程,当锁释放并发布释放锁的消息后,信号量的 release() 方法会被调用,此时被信号量阻塞的等待队列中的一个线程就可以继续尝试获取锁了。

特别注意:以上过程存在一个细节,这里有必要说明一下,也是分布式锁的一个关键点:当锁正在被占用时,等待获取锁的进程并不是通过一个 while(true) 死循环去获取锁,而是利用了 Redis 的发布订阅机制,通过 await 方法阻塞等待锁的进程,有效的解决了无效的锁申请浪费资源的问题

锁的续期机制

客户端 1 加锁的锁 key 默认生存时间才 30 秒,如果超过了 30 秒,客户端 1 还想一直持有这把锁,怎么办呢?

Redisson 提供了一个续期机制, 只要客户端 1 一旦加锁成功,就会启动一个 Watch Dog。

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {if (leaseTime != -1) {return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e != null) {return;}// lock acquiredif (ttlRemaining == null) {scheduleExpirationRenewal(threadId);}});return ttlRemainingFuture;
}

注意:从以上源码我们看到 leaseTime 必须是 -1 才会开启 Watch Dog 机制,也就是如果你想开启 Watch Dog 机制必须使用默认的加锁时间为 30s。如果你自己自定义时间,超过这个时间,锁就会自定释放,并不会延长。

private void scheduleExpirationRenewal(long threadId) {ExpirationEntry entry = new ExpirationEntry();ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);if (oldEntry != null) {oldEntry.addThreadId(threadId);} else {entry.addThreadId(threadId);renewExpiration();}
}protected RFuture<Boolean> renewExpirationAsync(long threadId) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.<Object>singletonList(getName()),internalLockLeaseTime, getLockName(threadId));
}

Watch Dog 机制其实就是一个后台定时任务线程,获取锁成功之后,会将持有锁的线程放入到一个 RedissonLock.EXPIRATION_RENEWAL_MAP里面,然后每隔 10 秒 (internalLockLeaseTime / 3) 检查一下,如果客户端 1 还持有锁 key(判断客户端是否还持有 key,其实就是遍历 EXPIRATION_RENEWAL_MAP 里面线程 id 然后根据线程 id 去 Redis 中查,如果存在就会延长 key 的时间),那么就会不断的延长锁 key 的生存时间。

注意:这里有一个细节问题,如果服务宕机了,Watch Dog 机制线程也就没有了,此时就不会延长 key 的过期时间,到了 30s 之后就会自动过期了,其他线程就可以获取到锁。

释放锁机制

执行

lock.unlock()

就可以释放分布式锁。我们来看一下释放锁的流程代码:

@Override
public RFuture<Void> unlockAsync(long threadId) {RPromise<Void> result = new RedissonPromise<Void>();// 1. 异步释放锁RFuture<Boolean> future = unlockInnerAsync(threadId);// 取消 Watch Dog 机制future.onComplete((opStatus, e) -> {cancelExpirationRenewal(threadId);if (e != null) {result.tryFailure(e);return;}if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + threadId);result.tryFailure(cause);return;}result.trySuccess(null);});return result;
}protected RFuture<Boolean> unlockInnerAsync(long threadId) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,// 判断锁 key 是否存在"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +// 将该客户端对应的锁的 hash 结构的 value 值递减为 0 后再进行删除// 然后再向通道名为 redisson_lock__channel publish 一条 UNLOCK_MESSAGE 信息"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; "+"end; " +"return nil;",Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

从以上代码来看,释放锁的步骤主要分三步:

  1. 删除锁(这里注意可重入锁,在上面的脚本中有详细分析)。

  2. 广播释放锁的消息,通知阻塞等待的进程(向通道名为 redisson_lock__channel publish 一条 UNLOCK_MESSAGE 信息)。

  3. 取消 Watch Dog 机制,即将 RedissonLock.EXPIRATION_RENEWAL_MAP 里面的线程 id 删除,并且 cancel 掉 Netty 的那个定时任务线程。

方案优点

  1. Redisson 通过 Watch Dog 机制很好的解决了锁的续期问题。

  2. 和 Zookeeper 相比较,Redisson 基于 Redis 性能更高,适合对性能要求高的场景。

  3. 通过 Redisson 实现分布式可重入锁,比原生的 SET mylock userId NX PX milliseconds + lua 实现的效果更好些,虽然基本原理都一样,但是它帮我们屏蔽了内部的执行细节。

  4. 在等待申请锁资源的进程等待申请锁的实现上也做了一些优化,减少了无效的锁申请,提升了资源的利用率。

方案缺点

  1. 使用 Redisson 实现分布式锁方案最大的问题就是如果你对某个 Redis Master 实例完成了加锁,此时 Master 会异步复制给其对应的 slave 实例。但是这个过程中一旦 Master 宕机,主备切换,slave 变为了 Master。接着就会导致,客户端 2 来尝试加锁的时候,在新的 Master 上完成了加锁,而客户端 1 也以为自己成功加了锁,此时就会导致多个客户端对一个分布式锁完成了加锁,这时系统在业务语义上一定会出现问题,导致各种脏数据的产生。所以这个就是 Redis Cluster 或者说是 Redis Master-Slave 架构的主从异步复制导致的 Redis 分布式锁的最大缺陷(在 Redis Master 实例宕机的时候,可能导致多个客户端同时完成加锁)。

  2. 有个别观点说使用 Watch Dog 机制开启一个定时线程去不断延长锁的时间对系统有所损耗(这里只是网络上的一种说法,博主查了很多资料并且结合实际生产并不认为有很大系统损耗,这个仅供大家参考)。

SpirngCache

CompletableFuture

SpringSession

web应用开发完成后是需要部署到web容器里去运行的,而session是由web容器来管理的。web容器管理servlet生命周期,生成session,为http请求提供状态支持,web应用通过session来处理用户登录信息。采用分布式/集群这两种部署方式,这个时候就出现了session共享问题。主要原因有以下两点:

  1. 在Java Servlet 3.1 规范中明确规定,HttpSession 对象必须被限定在应用(或 servlet 上下文)级别。底层的机制,如使用 cookie 建立会话,不同的上下文可以是相同,但所引用的对象,包括包括该对象中的属性,决不能在容器上下文之间共享。这就导致了单个web容器中的两个不同的应用之间无法共享session,所以部署在同一个容器中也无法解决分布式session共享的问题。

    2.在不入侵web容器的前提下,大部分容器如tomcat/jetty不支持多容器之间session共享。

session共享问题有没有别的解决方法,为什么要用spring-session?

知道了session共享问题的成因后,我们可以提出两种主要的解决思路。

  1. ​ 将session集中管理,代替web容器的session管理。

  2. 入侵web容器的session管理,使之支持session共享。

spring-session采用的是第一种方法,在web应用中使用过滤器将请求过滤,由spring-session来实现session的管理,spring-session提供了redis、jdbc、hazelcast等数据源的整合,使session数据的管理变得可视化,非常方便。

那么有没有别的方法解决session共享问题呢?答案是有的。tomcat-redis-session-manager使用redis来管理tomcat集群的session会话,相比于spring-session,这种方式入侵了web容器,实现起来比较复杂,耦合度较高,而且对tomcat版本支持范围不足,spring-session相比起来更加轻巧,操作更简单,耦合度低,加入到项目中框架清晰,因此更推荐使用spring-session。

spring-session 原理及源码学习

spring-session的原理是通过实现Filter创建过滤器SessionRepositoryFilter,在收到请求时采用装饰器模式重新分装HttpServletRequest和HttpServletResponse传递给FilterChain,之后对session的操作都交由SessionRepositoryRequestWrapper和SessionRepositoryResponseWrapper进行执行,以此来代替容器的session操作。请求的主流程如下图:

spring-session-core中org.springframework.session.web.http的uml关系图如下:

org.springframework.session.web.http

根据这个uml关系图,我们需要了解几个模块。

5.1.1 session管理模块

spring-session自定义了Session类,该Session类完全符合java servlet规范,拥有id,超时时间,是否超时,自定义属性,创建时间,最后一次访问时间等属性。以这个Session为基础,定义了两个大类的session仓库:

1.SessionRepository 包含对session的增删改查四个方法

1.1 RedisSessionRepository 内部维护了一个RedisOperations<String, Object>,实现redis的session管理。

1.2 MapSessionRepository 内部维护了一个Map<String, Session>,实现map的session管理。

1.3 FindByIndexNameSessionRepository

1.3.1 RedisIndexedSessionRepository 内部维护了一个ReactiveRedisOperations<String, Object>,实现redis的session管理。

2.ReactiveSessionRepository 响应式的session仓库,包含对session的增删改查四个方法,借助spring 5的reacitve思想实现的响应式session管理。

2.1 ReactiveMapSessionRepository 内部维护了一个Map<String, Session>,实现map的session管理。

2.2.ReactiveRedisSessionRepository 内部维护了一个ReactiveRedisOperations<String, Object>,实现redis的session管理。
基础接口

SessionRepository

对session的增删改查四个方法

ReactiveSessionRepository

对session的增删改查四个方法
Map管理

MapSessionRepository

内部维护了一个Map<String, Session>

ReactiveMapSessionRepository

内部维护了一个Map<String, Session>
Redis管理

RedisSessionRepository

内部维护了一个RedisOperations<String, Object>

ReactiveRedisSessionRepository

内部维护了一个ReactiveRedisOperations<String, Object>
允许通过特殊的索引名称和值查找session的仓库管理接口 FindByIndexNameSessionRepository
允许通过特殊的索引名称和值查找session的仓库管理

RedisIndexedSessionRepository

实现FindByIndexNameSessionRepository

5.1.2 SessionRepositoryFilter 核心过滤器

SessionRepositoryFilter UML

SessionRepositoryFilter继承了OncePerRequestFilter,具备一次请求只过滤一次的特性,不会对请求进行重复过滤。SessionRepositoryFilter的doFilterInternal做了三个操作:

1.将session管理仓库sessionRepository添加到request属性中 (一个session存储仓库)

  1. 封装request /response(自定义session相关操作后的request/response),交由下个过滤器执行

3.最后提交session到指定容器。

     @Overrideprotected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain)throws ServletException, IOException {//将session仓库存入request的属性request.setAttribute(SESSION_REPOSITORY_ATTR, this.sessionRepository);//封装requestSessionRepositoryRequestWrapper wrappedRequest = new SessionRepositoryRequestWrapper(request, response);//封装responseSessionRepositoryResponseWrapper wrappedResponse = new SessionRepositoryResponseWrapper(wrappedRequest,response);try {//交由下个过滤器处理filterChain.doFilter(wrappedRequest, wrappedResponse);}finally {wrappedRequest.commitSession();}}

5.1.2.1 SessionRepositoryRequestWrapper

SessionRepositoryRequestWrapper继承了HttpServletRequestWrapper,采用装饰器模式包装HttpServletRequest,适配器模式包装Spring Session,重写session相关方法,这种设计既实现了完全符合HttpServletRequest的规范要求,又实现了session独立操作,是spring-session的精髓所在。

1.Spring Session的包装HttpSessionWrapper

首先定义一个适配器HttpSessionAdapter,实现HttpSession,重写session相关方法。然后定义HttpSessionWrapper继承HttpSessionAdapter,重写invalidate session失效方法。

   @Overridepublic void invalidate() {//调用父类invalidate方法,将invalidated属性置为truesuper.invalidate();//将SessionRepositoryRequestWrapper的requestedSessionInvalidated属性置为trueSessionRepositoryRequestWrapper.this.requestedSessionInvalidated = true;//移除当前sessionsetCurrentSession(null);//清除session缓存属性clearRequestedSessionCache();//移除sessionSessionRepositoryFilter.this.sessionRepository.deleteById(getId());}

2.SessionRepositoryRequestWrapper重写的有关session的几个主要方法

2.1获取session

   1.从request属性中获取当前session,private HttpSessionWrapper getCurrentSession() {return (HttpSessionWrapper) getAttribute(CURRENT_SESSION_ATTR);}public Object getAttribute(String name) {return this.request.getAttribute(name);}2.private S getRequestedSession() {//如果当前session缓存不存在if (!this.requestedSessionCached) {//那么从请求的cookie中获取sessionId集合List<String> sessionIds = SessionRepositoryFilter.this.httpSessionIdResolver.resolveSessionIds(this);//遍历sessionIds,获取第一个存在的session,填充到requestedSession,requestedSessionId属性,重置requestedSessionCached为turefor (String sessionId : sessionIds) {if (this.requestedSessionId == null) {this.requestedSessionId = sessionId;}S session = SessionRepositoryFilter.this.sessionRepository.findById(sessionId);if (session != null) {this.requestedSession = session;this.requestedSessionId = sessionId;break;}}this.requestedSessionCached = true;}return this.requestedSession;}2.强制性获取session@Overridepublic HttpSessionWrapper getSession() {return getSession(true);}@Overridepublic HttpSessionWrapper getSession(boolean create) {//先获取当前request中的sesion,如果存在则返回HttpSessionWrapper currentSession = getCurrentSession();if (currentSession != null) {return currentSession;}//从session库中获取请求对应的sessionS requestedSession = getRequestedSession();if (requestedSession != null) {if (getAttribute(INVALID_SESSION_ID_ATTR) == null) {//如果session库中存在对应的session并且当前request的session有效,那么重置session的访问时间,将requestedSessionIdValid置为truerequestedSession.setLastAccessedTime(Instant.now());this.requestedSessionIdValid = true;//将这个session重写包装到HttpSessionWrapper中,置为旧session,添加到request的当前session属性中,并返回。currentSession = new HttpSessionWrapper(requestedSession, getServletContext());currentSession.markNotNew();setCurrentSession(currentSession);return currentSession;}}else {// This is an invalid session id. No need to ask again if// request.getSession is invoked for the duration of this request//如果session库中也没有对应的session,打印日志,将request的session失效属性置为true,避免重复获取session,如果不强制获取session,返回null。if (SESSION_LOGGER.isDebugEnabled()) {SESSION_LOGGER.debug("No session found by id: Caching result for getSession(false) for this HttpServletRequest.");}setAttribute(INVALID_SESSION_ID_ATTR, "true");}if (!create) {return null;}//如果强制获取session,则打印日志,通过sessionRepository.createSession()创建新的session,将新建的session重新包装,添加到当前请求中,并返回。if (SESSION_LOGGER.isDebugEnabled()) {SESSION_LOGGER.debug("A new session was created. To help you troubleshoot where the session was created we provided a StackTrace (this is not an error). You can prevent this from appearing by disabling DEBUG logging for "+ SESSION_LOGGER_NAME,new RuntimeException("For debugging purposes only (not an error)"));}S session = SessionRepositoryFilter.this.sessionRepository.createSession();session.setLastAccessedTime(Instant.now());currentSession = new HttpSessionWrapper(session, getServletContext());setCurrentSession(currentSession);return currentSession;}

获取session的核心流程如下:

2.2 session是否有效

   @Overridepublic boolean isRequestedSessionIdValid() {//如果request的requestedSessionIdValid属性不为空,则直接返回该属性值if (this.requestedSessionIdValid == null) {//否则从sessionRepository中获取sessionS requestedSession = getRequestedSession();//如果session存在,则更新最后访问时间,返回该session的有效性if (requestedSession != null) {requestedSession.setLastAccessedTime(Instant.now());}return isRequestedSessionIdValid(requestedSession);}return this.requestedSessionIdValid;}

2.3 提交session commitSession

   private void commitSession() {//首先获取当前sessionHttpSessionWrapper wrappedSession = getCurrentSession();if (wrappedSession == null) {if (isInvalidateClientSession()) {//如果当前session已失效,那么将改cookie置为失效SessionRepositoryFilter.this.httpSessionIdResolver.expireSession(this, this.response);}}else {//否则,清除request的session缓存S session = wrappedSession.getSession();clearRequestedSessionCache();//保存session到sessionRepositorySessionRepositoryFilter.this.sessionRepository.save(session);String sessionId = session.getId();//将sessionId写入cookie中if (!isRequestedSessionIdValid() || !sessionId.equals(getRequestedSessionId())) {SessionRepositoryFilter.this.httpSessionIdResolver.setSessionId(this, this.response, sessionId);}}}

3.SessionCommittingRequestDispatcher

SessionCommittingRequestDispatcher也采用了装饰器模式,定义为SessionRepositoryRequestWrapper的内部类,包装了HttpServletRequestWrapper的RequestDispatcher,在RequestDispatcher的基础上,执行include前提添加了提交session操作,确保include前session得到保存。

5.1.2.2 SessionRepositoryRequestWrapper

SessionRepositoryRequestWrapper继承了OnCommittedResponseWrapper,OnCommittedResponseWrapper继承了HttpServletResponseWrapper,OnCommittedResponseWrapper采用装饰器模式,在HttpServletResponse的基础上添加了一些额外的功能,使得响应执行前能触发一些动作,而SessionRepositoryRequestWrapper继承了OnCommittedResponseWrapper,自然也拥有了这个属性。

我们从OnCommittedResponseWrapper的源码来入手,OnCommittedResponseWrapper有三个私有属性:

   //是否触发提交的标志,true:不触发private boolean disableOnCommitted;//响应头内容长度,如果这个值大于0,那么一旦contentWritten的值大于或等于这个值,那么这个相应被视为已提交。private long contentLength;//已被写入响应体的数据量private long contentWritten;

1.检查响应数据长度,自动触发提交事件

     private void checkContentLength(long contentLengthToWrite) {//将要写的数据长度添加到contentWritten     this.contentWritten += contentLengthToWrite;boolean isBodyFullyWritten = this.contentLength > 0 && this.contentWritten >= int bufferSize = getBufferSize();boolean requiresFlush = bufferSize > 0 && this.contentWritten >= bufferSize;if (isBodyFullyWritten || requiresFlush) {//如果要写的数据长度大于等于响应头长度,同时大于等于缓冲区长度,那么触发响应提交事件doOnResponseCommitted();}}

2.触发提交事件

     private void doOnResponseCommitted() {//如果允许触发if (!this.disableOnCommitted) {//调用触发事件onResponseCommitted();//将关闭触发功能disableOnResponseCommitted();}}private void disableOnResponseCommitted() {this.disableOnCommitted = true;}

3.在以上两个方法为基础的前提下,OnCommittedResponseWrapper采用装饰器模式,包装ServletOutputStream和PrintWriter,保证在响应完成前调用doOnResponseCommitted(),触发提交事件。而SessionRepositoryRequestWrapper继承了这些属性后,重写onResponseCommitted()方法,调用SessionRepositoryRequestWrapper的commitSession()方法,从而保证响应提交前能触发session提交事件。

   private final class SessionRepositoryResponseWrapper extends OnCommittedResponseWrapper {private final SessionRepositoryRequestWrapper request;/*** Create a new {@link SessionRepositoryResponseWrapper}.* @param request the request to be wrapped* @param response the response to be wrapped*/SessionRepositoryResponseWrapper(SessionRepositoryRequestWrapper request, HttpServletResponse response) {super(response);if (request == null) {throw new IllegalArgumentException("request cannot be null");}this.request = request;}//重写onResponseCommitted()方法@Overrideprotected void onResponseCommitted() {//调用SessionRepositoryRequestWrapper的commitSession()this.request.commitSession();}}

以上就是spring-session的工作原理,上述解析并没有针对某个子工程(spring-session-data-redis、spring-session-hazelcast、spring-session-jdbc等)进行详细讲解,主要讲述了spring-session的主要工作原理。

单点登录

seata

如下图所示,Seata 中有三大模块,分别是 TM、RM 和 TC。 其中 TM 和 RM 是作为 Seata 的客户端与业务系统集成在一起,TC 作为 Seata 的服务端独立部署。

在 Seata 中,分布式事务的执行流程:

  • TM 开启分布式事务(TM 向 TC 注册全局事务记录);
  • 按业务场景,编排数据库、服务等事务内资源(RM 向 TC 汇报资源准备状态 );
  • TM 结束分布式事务,事务一阶段结束(TM 通知 TC 提交/回滚分布式事务);
  • TC 汇总事务信息,决定分布式事务是提交还是回滚;
  • TC 通知所有 RM 提交/回滚 资源,事务二阶段结束。

2.3 分布式事务 Seata 解决方案

Seata 会有 4 种分布式事务解决方案,分别是 AT 模式、TCC 模式、Saga 模式和 XA 模式。

2.3.1 AT 模式

今年 1 月份,Seata 开源了 AT 模式。AT 模式是一种无侵入的分布式事务解决方案。在 AT 模式下,用户只需关注自己的“业务 SQL”,用户的 “业务 SQL” 作为一阶段,Seata 框架会自动生成事务的二阶段提交和回滚操作。

AT 模式如何做到对业务的无侵入 :
  • 一阶段:
  • 以下文中的行锁都是全局锁

在一阶段,Seata 会拦截“业务 SQL”,首先解析 SQL 语义,找到“业务 SQL”要更新的业务数据,之后要获得本地锁,在业务数据被更新前,将其保存成“before image”,然后执行“业务 SQL”更新业务数据,在业务数据更新之后,再将其保存成“after image”,最后生成行锁,然后再释放本地锁。最后一阶段执行完毕,事务只持有一个行锁。这样保证了一阶段操作的原子性。

  • 二阶段提交:

二阶段TC根据所有的分支事务执行结果判断,如果是提交的话,因为“业务 SQL”在一阶段已经提交至数据库, 所以 Seata 框架只需将一阶段保存的快照数据和行锁(全局锁)删掉,完成数据清理即可。

  • 二阶段回滚:

二阶段如果是回滚的话,Seata 就需要回滚一阶段已经执行的“业务 SQL”,还原业务数据。回滚方式便是用“before image”还原业务数据;但在还原前要首先要校验脏写,对比“数据库当前业务数据”和 “after image”,如果两份数据完全一致就说明没有脏写,可以还原业务数据,如果不一致就说明有脏写,出现脏写就需要转人工处理。

AT 模式的一阶段、二阶段提交和回滚均由 Seata 框架自动生成,用户只需编写“业务 SQL”,便能轻松接入分布式事务,AT 模式是一种对业务无任何侵入的分布式事务解决方案。

为什么要有行锁(全局锁)呢?

试想一下,我们有三个事务,分别是A、B、C,业务要求A、B、C三个事务的执行是同步的,这就要求我们要对这三个事务建立一个分布式事务,如果没有全局锁的限制

  1. A事务在本机执行完毕,释放本地锁
  2. B事务在本机执行完毕,释放本地锁
  3. C事务在本机执行完毕,释放本地锁
  4. 根据A、B、C三个事务执行状态,二阶段决定提交还是回滚
  • 看起来好像没什么问题,但是如果在A事务执行完毕,释放锁的时候,A2事务也要执行,并且修改A事务修改过的数据,如果没有全局锁,会导致A事务失效,为了分布式事务的一致性,需要添加全局锁作为保障。
  • 并且二阶段会进行脏数据的校验,相当于又加了一层保护。

2.3.2 TCC 模式

2019 年 3 月份,Seata 开源了 TCC 模式,该模式由蚂蚁金服贡献。TCC 模式需要用户根据自己的业务场景实现 Try、Confirm 和 Cancel 三个操作;事务发起方在一阶段 执行 Try 方式,在二阶段提交执行 Confirm 方法,二阶段回滚执行 Cancel 方法。

TCC 三个方法描述:

  • Try:资源的检测和预留;
  • Confirm:执行的业务操作提交;要求 Try 成功 Confirm 一定要能成功;
  • Cancel:预留资源释放。

业务模型分 2 阶段设计:

用户接入 TCC ,最重要的是考虑如何将自己的业务模型拆成两阶段来实现。

以“扣钱”场景为例,在接入 TCC 前,对 A 账户的扣钱,只需一条更新账户余额的 SQL 便能完成;但是在接入 TCC 之后,用户就需要考虑如何将原来一步就能完成的扣钱操作,拆成两阶段,实现成三个方法,并且保证一阶段 Try 成功的话 二阶段 Confirm 一定能成功。

如上图所示,

Try 方法作为一阶段准备方法,需要做资源的检查和预留。在扣钱场景下,Try 要做的事情是就是检查账户余额是否充足,预留转账资金,预留的方式就是冻结 A 账户的 转账资金。Try 方法执行之后,账号 A 余额虽然还是 100,但是其中 30 元已经被冻结了,不能被其他事务使用。

二阶段 Confirm 方法执行真正的扣钱操作。Confirm 会使用 Try 阶段冻结的资金,执行账号扣款。Confirm 方法执行之后,账号 A 在一阶段中冻结的 30 元已经被扣除,账号 A 余额变成 70 元 。

如果二阶段是回滚的话,就需要在 Cancel 方法内释放一阶段 Try 冻结的 30 元,使账号 A 的回到初始状态,100 元全部可用。

用户接入 TCC 模式,最重要的事情就是考虑如何将业务模型拆成 2 阶段,实现成 TCC 的 3 个方法,并且保证 Try 成功 Confirm 一定能成功。相对于 AT 模式,TCC 模式对业务代码有一定的侵入性,但是 TCC 模式无 AT 模式的全局行锁,TCC 性能会比 AT 模式高很多。

2.3.3 Saga 模式

Saga 模式是 Seata 即将开源的长事务解决方案,将由蚂蚁金服主要贡献。在 Saga 模式下,分布式事务内有多个参与者,每一个参与者都是一个冲正补偿服务,需要用户根据业务场景实现其正向操作和逆向回滚操作。

分布式事务执行过程中,依次执行各参与者的正向操作,如果所有正向操作均执行成功,那么分布式事务提交。如果任何一个正向操作执行失败,那么分布式事务会去退回去执行前面各参与者的逆向回滚操作,回滚已提交的参与者,使分布式事务回到初始状态。

Saga 模式下分布式事务通常是由事件驱动的,各个参与者之间是异步执行的,Saga 模式是一种长事务解决方案。

2.3.4 XA 模式

XA 模式是 Seata 将会开源的另一种无侵入的分布式事务解决方案,任何实现了 XA 协议的数据库都可以作为资源参与到分布式事务中,目前主流数据库,例如 MySql、Oracle、DB2、Oceanbase 等均支持 XA 协议。

XA 协议有一系列的指令,分别对应一阶段和二阶段操作。“xa start”和 “xa end”用于开启和结束XA 事务;“xa prepare” 用于预提交 XA 事务,对应一阶段准备;“xa commit”和“xa rollback”用于提交、回滚 XA 事务,对应二阶段提交和回滚。

在 XA 模式下,每一个 XA 事务都是一个事务参与者。分布式事务开启之后,首先在一阶段执行“xa start”、“业务 SQL”、“xa end”和 “xa prepare” 完成 XA 事务的执行和预提交;二阶段如果提交的话就执行 “xa commit”,如果是回滚则执行“xa rollback”。这样便能保证所有 XA 事务都提交或者都回滚。

XA 模式下,用户只需关注自己的“业务 SQL”,Seata 框架会自动生成一阶段、二阶段操作;XA 模式的实现如下:

  • 一阶段:

在 XA 模式的一阶段,Seata 会拦截“业务 SQL”,在“业务 SQL”之前开启 XA 事务(“xa start”),然后执行“业务 SQL”,结束 XA 事务“xa end”,最后预提交 XA 事务(“xa prepare”),这样便完成 “业务 SQL”的准备操作。

  • 二阶段提交:

执行“xa commit”指令,提交 XA 事务,此时“业务 SQL”才算真正的提交至数据库。

  • 二阶段回滚:

执行“xa rollback”指令,回滚 XA 事务,完成“业务 SQL”回滚,释放数据库锁资源。

XA 模式下,用户只需关注“业务 SQL”,Seata 会自动生成一阶段、二阶段提交和二阶段回滚操作。XA 模式和 AT 模式一样是一种对业务无侵入性的解决方案;但与 AT 模式不同的是,XA 模式将快照数据和行锁等通过 XA 指令委托给了数据库来完成,这样 XA 模式实现更加轻量化。

三、分布式事务在蚂蚁金服的实践

蚂蚁金服从 2007 年开始研发和应用分布式事务中间件,用 TCC 模式解决各类金融场景的数据一致性问题,后续又演进出 FMT(AT)、XA、Saga 等模式,各种模式分别适用于各类业务场景。我们决定将蚂蚁金服多年的技术积累开源出来,与社区共享蚂蚁金服的科技成果。

蚂蚁金服内部的分布式事务产品,在实现原理和使用方式上,与 Seata 类似,不同的是,为了支持双十一,对性能进行了极致优化,为了支持金融系统的高可用容灾,借助蚂蚁金服三地五中心架构实现了分布式事务服务的高可用容灾;接下来主要介绍蚂蚁金服在性能优化和高可用容灾方面的实践经验。

3.1 极致性能优化

3.1.1 同库模式

通常,一个 TM 会产生一笔主事务日志,一个 RM 会产生一条分支事务日志,每个分布式事务由一个 TM 和若干 RM 组成,一个分布式事务总共会有 1+N 条事务日志(N 为 RM 个数)。

在默认情况下,分布式事务执行过程中客户端将事务日志发送给服务端,服务端再将事务日志存储至数据库中,一条事务日志的存储链路会有 2 次 TCP ,分别是“客户端到服务端”和“服务端到数据库”, 我们称这种模式为异库模式。

在异库模式下,分布式事务存储事务日志总共需要 2*(1+N) 次左右的 TCP 通信。在 RM 数量较少的业务场景下,分布式事务性能还能接收,但有些业务场景下 RM 数量较多,此时事务内 TCP 数量也会增多,分布式事务性能急剧下降。

在事务执行过程中,客户端和服务端进行通信的目的是为了存储事务日志。如果客户端在存储事务日志时,绕过服务端直接将事务日志写入数据库(如上图“同库模式”所示),那么一笔事务日志的存储链路就由原来的 2 次 TCP 变成只需访问一次数据库便可,每条事务日志的存储减少了一次 TCP 通信,整个分布式事务就减少了 N+2 次 TCP 请求,分布式事务的性能大幅提升。我们将客户端直接将事务日志存储至数据库的模式称为同库模式。

3.1.2 二阶段异步执行

通常情况下,分布式事务发起方会依次执行一阶段和二阶段方法,然后结束分布式事务,返回结果。如果让分布式事务发起方执行完一阶段之后马上结束并返回结果,二阶段交由独立的线程或者进程异步执行,这样分布式事务的二阶段会晚几秒钟或者若干分钟执行,但事务的最终结果不会有任何改变。

二阶段异步执行之后,分布式事务的最终结果不会有任何影响,但是事务发起方要执行的内容减少一半(一阶段和二阶段都执行变成只执行一阶段),直观的用户感受是分布式事务的性能提升了 50%。

3.2 分布式事务高可用

为了保障金融系统的高可用,分布式事务服务必须达到 99.99% 的可用率。分布式事务使用了蚂蚁金服的三地五中心架构部署,在每个机房都独立部署分布式事务服务,分布式事务服务是无状态的,而底层数据库副本在各机房间也是双向同步,这样业务流量从一个机房切到另外一个机房,分布式事务服务不会对业务有任何影响,从而保证了分布式事务服务的高可用。

四、总结

在分布式架构演进中,蚂蚁金服对数据库进了水平拆分,对服务面向功能进行了服务化拆分,从而出现了跨服务、跨数据库的业务数据一致性挑战。

2007 年,蚂蚁金服自主研发分布式事务中间件经历 12 年的严苛业务锤炼。2019 年,将多年的技术积累分享给开源分布式事务 Seata,并持续投入社区共建。目前 Seata 提供了 AT、TCC、Saga 和 XA 四种模式,每一种模式分别有各自的应用场景,丰富的解决方案帮助用户解决给了各类场景的数据一致性问题。

最后一部分,分享了蚂蚁金服具体的实践。为了支持双十一的高性能需求,对分布式事务进行了极致的性能优化,例如同库模式、二阶段异步执行。为了使金融服务的可用性达到 99.99%,蚂蚁金服分布式事务采用三地五中心架构,异地多活的部署模式保障了分布式事务服务的高可用。

ThreadLocal

Feign远程调用丢失请求头

线程或者进程异步执行,这样分布式事务的二阶段会晚几秒钟或者若干分钟执行,但事务的最终结果不会有任何改变。

二阶段异步执行之后,分布式事务的最终结果不会有任何影响,但是事务发起方要执行的内容减少一半(一阶段和二阶段都执行变成只执行一阶段),直观的用户感受是分布式事务的性能提升了 50%。

[外链图片转存中…(img-GEIw7uMr-1626783859342)]

3.2 分布式事务高可用

为了保障金融系统的高可用,分布式事务服务必须达到 99.99% 的可用率。分布式事务使用了蚂蚁金服的三地五中心架构部署,在每个机房都独立部署分布式事务服务,分布式事务服务是无状态的,而底层数据库副本在各机房间也是双向同步,这样业务流量从一个机房切到另外一个机房,分布式事务服务不会对业务有任何影响,从而保证了分布式事务服务的高可用。

[外链图片转存中…(img-SXSc3tyZ-1626783859343)]

四、总结

在分布式架构演进中,蚂蚁金服对数据库进了水平拆分,对服务面向功能进行了服务化拆分,从而出现了跨服务、跨数据库的业务数据一致性挑战。

2007 年,蚂蚁金服自主研发分布式事务中间件经历 12 年的严苛业务锤炼。2019 年,将多年的技术积累分享给开源分布式事务 Seata,并持续投入社区共建。目前 Seata 提供了 AT、TCC、Saga 和 XA 四种模式,每一种模式分别有各自的应用场景,丰富的解决方案帮助用户解决给了各类场景的数据一致性问题。

最后一部分,分享了蚂蚁金服具体的实践。为了支持双十一的高性能需求,对分布式事务进行了极致的性能优化,例如同库模式、二阶段异步执行。为了使金融服务的可用性达到 99.99%,蚂蚁金服分布式事务采用三地五中心架构,异地多活的部署模式保障了分布式事务服务的高可用。

ThreadLocal

Feign远程调用丢失请求头

Feign异步调用丢失请求头问题

Spring组件原理相关推荐

  1. Spring MVC 原理探秘 - 一个请求的旅行过程

    1.简介 在前面的文章中,我较为详细的分析了 Spring IOC 和 AOP 部分的源码,并写成了文章.为了让我的 Spring 源码分析系列文章更为丰富一些,所以从本篇文章开始,我将来向大家介绍一 ...

  2. (转)spring源码解析,spring工作原理

    转自:https://www.ibm.com/developerworks/cn/java/j-lo-spring-principle/ Spring 的骨骼架构 Spring 总共有十几个组件,但是 ...

  3. Spring MVC 原理探秘 - 容器的创建过程

    1.简介 在上一篇文章中,我向大家介绍了 Spring MVC 是如何处理 HTTP 请求的.Spring MVC 可对外提供服务时,说明其已经处于了就绪状态.再次之前,Spring MVC 需要进行 ...

  4. Spring MVC原理及配置详解

    转载自 http://blog.csdn.net/jianyuerensheng/article/details/51258942 [Spring]Spring MVC原理及配置 1.Spring M ...

  5. 关于Spring底层原理面试的那些问题,你是不是真的懂Spring?

    转载自  关于Spring底层原理面试的那些问题,你是不是真的懂Spring? 1.什么是 Spring 框架?Spring 框架有哪些主要模块? Spring 框架是一个为 Java 应用程序的开发 ...

  6. SpingMVC框架:fileUpload组件原理和实现

    QUESTION:fileUpload组件原理和实现 ANSWER: 目录 QUESTION:fileUpload组件原理和实现 ANSWER: 一:异常产生 查询了一系列博客后,发现这是由于上传文件 ...

  7. Spring IOC原理总结

    Spring容器高层视图 Spring 启动时读取应用程序提供的Bean配置信息,并在Spring容器中生成一份相应的Bean配置注册表,然后根据这张注册表实例化Bean,装配好Bean之间的依赖关系 ...

  8. 一步一步手绘Spring MVC运行时序图(Spring MVC原理)

    相关内容: 架构师系列内容:架构师学习笔记(持续更新) 一步一步手绘Spring IOC运行时序图一(Spring 核心容器 IOC初始化过程) 一步一步手绘Spring IOC运行时序图二(基于XM ...

  9. Spring Ioc原理解析

    Spring Ioc原理解析 IoC理论的背景 我们都知道,在采用面向对象方法设计的软件系统中,它的底层实现都是由N个对象组成的,所有的对象通过彼此的合作,最终实现系统的业务逻辑. 图1:软件系统中耦 ...

最新文章

  1. 【怎样写代码】参数化类型 -- 泛型(二):泛型的优点
  2. 关于从基于Mult-Org的视图中查询数据的问题(转)
  3. go给Linux安装mysql_在Linux上安装Go语言开发包
  4. [剑指offer]面试题第[53-2]题[JAVA][0-n-1中缺失的数字][二分法][暴力法]
  5. 内网击穿之 HTTP 穿透:网站没上线?如何让全世界的人都可以访问你本地的网站?
  6. CPU的内部物理结构介绍
  7. 用端口映射的办法使用矩池云隐藏的vnc功能
  8. 容斥原理 —— 不重不漏的计数
  9. iphone 开发第四天 - 字符串
  10. 【设计模式】享元模式(Flyweight)
  11. pdo调用mysql存储过程_获取out参数值问题_调用Oracle存储过程并获取out参数值-阿里云开发者社区...
  12. GB28181国标平台资料整理
  13. android 360全景视频,【Android开发VR实战】二.播放360#176;全景视频(示例代码)
  14. 蔡颖-《APS走向实践》书解读之三:供应、计划排程、供应链优化
  15. EPICS AI记录
  16. 阿里云大数据专业认证(ACP),值得报名吗?
  17. DirectSound应用
  18. 【C语言】-关于strlen的介绍以及三种模拟实现的方法
  19. Linux Shell 打开软件时最小化窗口
  20. 【环形dp】poj 2228 Naptime

热门文章

  1. 智能驾驶功能软件平台设计规范
  2. 农夫山泉行业标杆,联想企业网盘助其业务日新月盛
  3. AdMob(app内嵌广告)横幅广告
  4. 苹果成亚马逊云计算大客户 答应5年“消费”15亿美元
  5. 如何在线将视频转gif动图?
  6. s7与200smart通讯
  7. CentOS8中使用Libreoffice7.3遇到的问题
  8. windows系统win10系统文件批量重命名软件名称
  9. Python 自动化教程(4) : 自动生成PPT文件 Part 2 (干货)
  10. 基本常用防火墙设置端口命令