Redisson锁 Redis 实现分布式锁主要步骤
指定一个 key 作为锁标记,存入 Redis 中,指定一个 唯一的用户标识 作为 value。
当 key 不存在时才能设置值,确保同一时间只有一个客户端进程获得锁,满足 互斥性 特性。
设置一个过期时间,防止因系统异常导致没能删除这个 key,满足 防死锁 特性。
当处理完业务之后需要清除这个 key 来释放锁,清除 key 时需要校验 value 值,需要满足 只有加锁的人才能释放锁 。
特别注意:以上实现步骤考虑到了使用分布式锁需要考虑的互斥性、防死锁、加锁和解锁必须为同一个进程等问题,但是锁的续期无法实现 。所以,博主采用 Redisson 实现 Redis 的分布式锁,借助 Redisson 的 WatchDog 机制 能够很好的解决锁续期的问题,同样 Redisson 也是 Redis 官方推荐分布式锁实现方案,实现起来较为简单。
高效分布式锁 当我们在设计分布式锁的时候,我们应该考虑分布式锁至少要满足的一些条件,同时考虑如何高效的设计分布式锁,这里我认为以下几点是必须要考虑的。
互斥 在分布式高并发的条件下,我们最需要保证,同一时刻只能有一个线程获得锁,这是最基本的一点。
防止死锁 在分布式高并发的条件下,比如有个线程获得锁的同时,还没有来得及去释放锁,就因为系统故障或者其它原因使它无法执行释放锁的命令,导致其它线程都无法获得锁,造成死锁。
所以分布式非常有必要设置锁的有效时间
,确保系统出现故障后,在一定时间内能够主动去释放锁,避免造成死锁的情况。
性能 对于访问量大的共享资源,需要考虑减少锁等待的时间,避免导致大量线程阻塞。
所以在锁的设计时,需要考虑两点。
锁的颗粒度要尽量小
。比如你要通过锁来减库存,那这个锁的名称你可以设置成是商品的ID,而不是任取名称。这样这个锁只对当前商品有效,锁的颗粒度小。
锁的范围尽量要小
。比如只要锁2行代码就可以解决问题的,那就不要去锁10行代码了。
重入 我们知道ReentrantLock是可重入锁,那它的特点就是:同一个线程可以重复拿到同一个资源的锁。重入锁非常有利于资源的高效利用。
Redis 发布订阅 之前说过,分布式锁的核心功能其实就三个:加锁、解锁、设置锁超时。这三个功能也是我们研究Redisson分布式锁原理的方向。
在学习之前,我们有必要先了解一个知识点,就是有关Redis的发布订阅功能。
Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息,发布者可以向指定的渠道 (channel) 发送消息,订阅者如果订阅了该频道的话就能收到消息,从而实现多个客户端的通信效果。
订阅的命令是SUBSCRIBE channel[channel ...]
,可以订阅一个或多个频道,当有新消息通过PUBLISH 命令发送给频道时,订阅者就能收到消息
开启两个客户端,一个订阅了频道channel1 ,另一个通过PUBLISH发送消息后,订阅的那个就能收到了,靠这种模式就能实现不同客户端之间的通信。
Redisson原理
加锁机制 线程去获取锁,获取成功: 执行lua脚本,保存数据到redis数据库。
线程去获取锁,获取失败: 一直通过while循环尝试获取锁,获取成功后,执行lua脚本,保存数据到redis数据库。
1 2 3 4 5 6 7 8 9 10 11 if (redis.call('exists' , KEYS[1 ]) == 0 ) then " + " redis.call('hincrby' , KEYS[1 ], ARGV[2 ], 1 ); " + " redis.call('pexpire' , KEYS[1 ], ARGV[1 ]); " + " return nil; " + " end; " + " if (redis.call('hexists' , KEYS[1 ], ARGV[2 ]) == 1 ) then " + " redis.call('hincrby' , KEYS[1 ], ARGV[2 ], 1 ); " + " redis.call('pexpire' , KEYS[1 ], ARGV[1 ]); " + " return nil; " + " end; " + " return redis.call('pttl' , KEYS[1 ]);"
这里 KEYS[1]
代表的是你加锁的 key,比如你自己设置了加锁的那个锁 key 就是 “myLock”。
1 2 // create a lock RLock lock = redisson.getLock("myLock");
ARGV[1]
代表的是锁 key 的默认生存时间,默认 30 秒。
ARGV[2]
代表的是加锁的客户端的 ID,类似于下面这样:285475da-9152-4c83-822a-67ee2f116a79:52。
至于最后面的一个 1 是为了后面可重入做的计数统计
加锁流程
判断是否存在这个加锁的key
如果不存在,通过hset命令加锁
设置过期时间
锁的续期机制 客户端 1 加锁的锁 key 默认生存时间才 30 秒,如果超过了 30 秒,客户端 1 还想一直持有这把锁,怎么办呢?
Redisson 提供了一个续期机制, 只要客户端 1 一旦加锁成功,就会启动一个 Watch Dog。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private <T> RFuture<Long> tryAcquireAsync (long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1 ) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null ) { return ; } if (ttlRemaining == null ) { scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; }
注意:从以上源码我们看到 leaseTime
必须是 -1 才会开启 Watch Dog 机制,也就是如果你想开启 Watch Dog 机制必须使用默认的加锁时间为 30s。如果你自己自定义时间,超过这个时间,锁就会自定释放,并不会延长。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private void scheduleExpirationRenewal (long threadId) { ExpirationEntry entry = new ExpirationEntry (); ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null ) { oldEntry.addThreadId(threadId); } else { entry.addThreadId(threadId); renewExpiration(); } } protected RFuture<Boolean> renewExpirationAsync (long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;" , Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
Watch Dog 机制其实就是一个后台定时任务线程,获取锁成功之后,会将持有锁的线程放入到一个 RedissonLock.EXPIRATION_RENEWAL_MAP
里面,然后每隔 10 秒 (internalLockLeaseTime / 3)
检查一下,如果客户端 1 还持有锁 key(判断客户端是否还持有 key,其实就是遍历 EXPIRATION_RENEWAL_MAP
里面线程 id 然后根据线程 id 去 Redis 中查,如果存在就会延长 key 的时间),那么就会不断的延长锁 key 的生存时间。
注意:这里有一个细节问题,如果服务宕机了,Watch Dog 机制线程也就没有了,此时就不会延长 key 的过期时间,到了 30s 之后就会自动过期了,其他线程就可以获取到锁。
为啥要用lua脚本呢? 这个不用多说,主要是如果你的业务逻辑复杂的话,通过封装在lua脚本中发送给redis,而且redis是单线程的,这样就保证这段复杂业务逻辑执行的原子性 。
锁互斥机制 此时,如果客户端 2 来尝试加锁,会如何呢?首先,第一个 if 判断会执行 exists myLock
,发现 myLock 这个锁 key 已经存在了。接着第二个 if 判断,判断一下,myLock 锁 key 的 hash 数据结构中,是否包含客户端 2 的 ID,这里明显不是,因为那里包含的是客户端 1 的 ID。所以,客户端 2 会执行:
1 return redis.call('pttl', KEYS[1]);
返回的一个数字,这个数字代表了 myLock 这个锁 key 的剩余生存时间。
接下来我们看一下 Redissson tryLock 的主流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 @Override public boolean tryLock (long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(leaseTime, unit, threadId); if (ttl == null ) { return true ; } time -= System.currentTimeMillis() - current; if (time <= 0 ) { acquireFailed(threadId); return false ; } current = System.currentTimeMillis(); RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false )) { subscribeFuture.onComplete((res, e) -> { if (e == null ) { unsubscribe(subscribeFuture, threadId); } }); } acquireFailed(threadId); return false ; } try { time -= System.currentTimeMillis() - current; if (time <= 0 ) { acquireFailed(threadId); return false ; } while (true ) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(leaseTime, unit, threadId); if (ttl == null ) { return true ; } time -= System.currentTimeMillis() - currentTime; if (time <= 0 ) { acquireFailed(threadId); return false ; } currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; if (time <= 0 ) { acquireFailed(threadId); return false ; } } } finally { unsubscribe(subscribeFuture, threadId); } }
流程分析:
尝试获取锁,返回 null 则说明加锁成功,返回一个数值,则说明已经存在该锁,ttl 为锁的剩余存活时间。
如果此时客户端 2 进程获取锁失败,那么使用客户端 2 的线程 id(其实本质上就是进程 id)通过 Redis 的 channel 订阅锁释放的事件,。如果等待的过程中一直未等到锁的释放事件通知,当超过最大等待时间则获取锁失败,返回 false,也就是第 39 行代码。如果等到了锁的释放事件的通知,则开始进入一个不断重试获取锁的循环。
循环中每次都先试着获取锁,并得到已存在的锁的剩余存活时间。如果在重试中拿到了锁,则直接返回。如果锁当前还是被占用的,那么等待释放锁的消息,具体实现使用了 JDK 的信号量 Semaphore 来阻塞线程,当锁释放并发布释放锁的消息后,信号量的 release()
方法会被调用,此时被信号量阻塞的等待队列中的一个线程就可以继续尝试获取锁了。
特别注意:以上过程存在一个细节,这里有必要说明一下,也是分布式锁的一个关键点:当锁正在被占用时,等待获取锁的进程并不是通过一个 while(true)
死循环去获取锁,而是利用了 Redis 的发布订阅机制,通过 await 方法阻塞等待锁的进程,有效的解决了无效的锁申请浪费资源的问题 。
可重入加锁机制 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Override public void lock () { RLock lock = redissonSingle.getLock("myLock" ); try { lock.lock(); doBusiness(); lock.lock(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); lock.unlock(); logger.info("任务执行完毕, 释放锁!" ); } }
Redisson可以实现可重入加锁机制的原因
Redis存储锁的数据类型是 Hash类型
Hash数据类型的key值包含了当前线程信息。
Hash类型相当于我们java的 <key,<key1,value>>
类型,
key是指 ‘redisson’
key1值为078e44a3-5f95-4e24-b6aa-80684655a15a:45
它的组成是: guid + 当前线程的ID。后面的value是就和可重入加锁有关。
可重入锁的机制,它最大的优点就是相同线程不需要在等待锁,而是可以直接进行相应操作。
锁释放机制 执行
就可以释放分布式锁。我们来看一下释放锁的流程代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 @Override public RFuture<Void> unlockAsync (long threadId) { RPromise<Void> result = new RedissonPromise <Void>(); RFuture<Boolean> future = unlockInnerAsync(threadId); future.onComplete((opStatus, e) -> { cancelExpirationRenewal(threadId); if (e != null ) { result.tryFailure(e); return ; } if (opStatus == null ) { IllegalMonitorStateException cause = new IllegalMonitorStateException ("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); result.tryFailure(cause); return ; } result.trySuccess(null ); }); return result; } protected RFuture<Boolean> unlockInnerAsync (long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;" , Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }
从以上代码来看,释放锁的步骤主要分三步:
删除锁(这里注意可重入锁,在上面的脚本中有详细分析)。
广播释放锁的消息,通知阻塞等待的进程(向通道名为 redisson_lock__channel
publish 一条 UNLOCK_MESSAGE
信息)。
取消 Watch Dog 机制,即将 RedissonLock.EXPIRATION_RENEWAL_MAP
里面的线程 id 删除,并且 cancel 掉 Netty 的那个定时任务线程。
Redis分布式锁的缺点 Redis分布式锁会有个缺陷,就是在Redis哨兵模式 下:
客户端1
对某个master节点
写入了redisson锁,此时会异步复制给对应的 slave节点。但是这个过程中一旦发生 master节点宕机,主备切换,slave节点从变为了 master节点。
这时客户端2
来尝试加锁的时候,在新的master节点上也能加锁,此时就会导致多个客户端对同一个分布式锁完成了加锁。
这时系统在业务语义上一定会出现问题,导致各种脏数据的产生 。
缺陷
在哨兵模式或者主从模式下,如果 master实例宕机的时候,可能导致多个客户端同时完成加锁。
RedissonLock RLock接口 概念 1 public interface RLock extends Lock , RExpirable, RLockAsync
很明显RLock是继承Lock锁,所以他有Lock锁的所有特性,比如lock、unlock、trylock等特性,同时它还有很多新特性:强制锁释放,带有效期的锁,。
RLock锁API 这里针对上面做个整理,这里列举几个常用的接口说明
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 public interface RRLock { void lock () ; boolean tryLock () ; boolean tryLock (long time, TimeUnit unit) throws InterruptedException; void unlock () ; void lockInterruptibly () ; void lock (long leaseTime, TimeUnit unit) ; boolean tryLock (long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException; boolean isLocked () ; boolean isHeldByCurrentThread () ; void lockInterruptibly (long leaseTime, TimeUnit unit) ; }
RLock相关接口,主要是新添加了 leaseTime
属性字段,主要是用来设置锁的过期时间,避免死锁。
RedissonLock实现类 1 public class RedissonLock extends RedissonExpirable implements RLock
RedissonLock实现了RLock接口,所以实现了接口的具体方法。这里我列举几个方法说明下
void lock()方法 1 2 3 4 5 6 7 8 @Override public void lock () { try { lockInterruptibly(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }
lock的拿锁过程跟tryLock基本是一致的,区别在于lock没有手动设置锁过期时长的参数,该方法的调用链也是跑到tryAcquire
方法来获取锁的,不同的是,lock锁里面进去其实用的是lockInterruptibly
(中断锁,表示可以被中断),而且捕获异常后用 Thread.currentThread().interrupt()来真正中断当前线程,其实它们是搭配一起使用的。
接下来执行流程,这里理下关键几步
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 @Override public void lockInterruptibly () throws InterruptedException { lockInterruptibly(-1 , null ); } void lockInterruptibly (long leaseTime, TimeUnit unit) throws InterruptedException RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); <T> RFuture<T> tryLockInnerAsync (long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);" , Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
那么void lock(long leaseTime, TimeUnit unit)方法其实和上面很相似了,就是从上面第二步开始的。
tryLock(long waitTime, long leaseTime, TimeUnit unit)
重要逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 @Override public boolean tryLock (long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(leaseTime, unit, threadId); if (ttl == null ) { return true ; } time -= System.currentTimeMillis() - current; if (time <= 0 ) { acquireFailed(threadId); return false ; } final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false )) { subscribeFuture.addListener(new FutureListener <RedissonLockEntry>() { @Override public void operationComplete (Future<RedissonLockEntry> future) throws Exception { if (subscribeFuture.isSuccess()) { unsubscribe(subscribeFuture, threadId); } } }); } acquireFailed(threadId); return false ; } try { time -= (System.currentTimeMillis() - current); if (time <= 0 ) { acquireFailed(threadId); return false ; } while (true ) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(leaseTime, unit, threadId); if (ttl == null ) { return true ; } time -= System.currentTimeMillis() - currentTime; if (time <= 0 ) { acquireFailed(threadId); return false ; } currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= (System.currentTimeMillis() - currentTime); if (time <= 0 ) { acquireFailed(threadId); return false ; } }finally { unsubscribe(subscribeFuture, threadId); } }
要么线程拿到锁返回成功;要么没拿到锁并且等待时间还没过就继续循环拿锁,同时监听锁是否被释放。
拿锁的方法是tryAcquire
传入的参数分别是锁的持有时间,时间单位以及代表当前线程的ID,跟进代码查看调用栈,它会调到一个叫做tryAcquireAsync
的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private Long tryAcquire (long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(leaseTime, unit, threadId)); } private <T> RFuture<Long> tryAcquireAsync (long leaseTime, TimeUnit unit, final long threadId) { if (leaseTime != -1 ) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.addListener(new FutureListener <Long>() { @Override public void operationComplete (Future<Long> future) throws Exception { if (!future.isSuccess()) { return ; } Long ttlRemaining = future.getNow(); if (ttlRemaining == null ) { scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; }
tryLockInnerAsync
方法的源码:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 <T> RFuture<T> tryLockInnerAsync (long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);" , Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); } String getLockName (long threadId) { return id + ":" + threadId; }
这里就是底层的调用栈了,直接操作命令,整合成lua脚本后,调用netty的工具类跟redis进行通信,从而实现获取锁的功能。
LUA脚本解读:
先用exists key
命令判断是否锁是否被占据了,没有的话就用hset
命令写入,key为锁的名称,field为“客户端唯一ID:线程ID”,value为1;
锁被占据了,判断是否是当前线程占据的,是的话value值加1;
锁不是被当前线程占据,返回锁剩下的过期时长;
用了redis的Hash结构存储数据,如果发现当前线程已经持有锁了,就用hincrby
命令将value值加1,value的值将决定释放锁的时候调用解锁命令的次数,达到实现锁的可重入性效果。
这个时长有什么作用呢?我们回到tryLock 方法中死循环的那个地方
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 try { time -= (System.currentTimeMillis() - current): if (time <= 0 ) { acquireFailed(threadId); return false ; } while (true ){ long currentlime = System.currentTimeMillis() ; ttl = tryAcquire (leaseTime, unit, threadId); if (ttl == null ){ return true ; } time -= (System. currentTimeMillis() - currentTime); if (time <= 0 ){ acquireFailed(threadId); return false ; } currentTime = System.currentTimeMillis(); if (ttl ›= 0 && ttl < time){ getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS): } else { getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); }
这里有一个针对waitTime和key的剩余过期时间大小的比较,取到二者中比较小的那个值,然后用Java的Semaphore信号量的tryAcquire方法来阻塞线程。
那么Semaphore信号量又是由谁控制呢,何时才能release呢。这里又需要回到上面来看,各位看官应该还记得,我们上面贴的tryLock 代码中还有这一段:
1 2 3 current = System.currentTimeMillis(); final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
订阅的逻辑显然是在subscribe
方法里,跟着方法的调用链,它会进入到PublishSubscribe.Java 中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 publac RFuture<E> subscribe ( final String entryame, final Strang channeName, final ConnectionManager connectionManager) { final AtomicReference<Runnable› listenerHolder = new AtomicReference <~>0 : final AsyncSemaphore semaphore = connectionManager.getSemaphore(channelName): final RPromise<E› newPromise = (RedissonPromise) cancel(mayInterruptIfRunning) ->{ return semaphore.remove (listenerHolder.get()): } Runnaole listener = new Runnable { @Override public void run () { E entry = entries.get (entryName) if (entry != null ){ entry.aquire(); semaphore.release(); entry.getPromise().addListener(new TransferListener <E>(newPromise)) return ; } E value = createEntry(newPromise); vaLue.aquire(); E oldValue = entries.putIfAbsent(entryName, value); if (oldValue != null ){ oldValue.aquire(); semaphore.release(); oldValue.getPromise().addListener(new TransferListener <E>(newPromise)) return ; } RedisPubSubListener<Object> listener = createListener(channelName,value); connectionManager.subscribe(LongCodec.INSTANCE,channelName,semaphore,listener); } };
这段代码的作用在于将当前线程的threadId 添加到一个AsyncSemaphore 中,并且设置一个redis的监听器,这个监听器是通过redis的发布、订阅功能实现的。
一旦监听器收到redis发来的消息,就从中获取与当前thread相关的,如果是锁被释放的消息,就立马通过操作Semaphore (也就是调用release 方法)来让刚才阻塞的地方释放。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 currentTime = System.currentTimeMillis(); if (ttl ›= 0 && ttl < time){ getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS): } else { getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; if (time <= 0 ){ acquireFailed(threadId); return false ; } }finally {
释放后线程继续执行,仍旧是判断是否已经超时。如果还没超时,就进入下一次循环再次去获取锁,拿到就返回true,没有拿到的话就继续流程。
这里说明一下,之所以要循环,是因为锁可能会被多个客户端同时争抢,线程阻塞被释放之后的那一瞬间很可能还是拿不到锁,但是线程的等待时间又还没过,这个时候就需要重新跑循环去拿锁。
unlock()
Redisson分布式锁解锁的上层调用方法是unlock(),默认不用传任何参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Override public void unlock () { Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId())); if (opStatus == null ) { throw new IllegalMonitorStateException ( "attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + Thread.currentThread().getId()); } if (opStatus) { cancelExpirationRenewal(); } }
opStatus核心逻辑是unlockInnerAsync 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;" , Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(Thread.currentThread().getId()));
使用 EVAL 命令执行 Lua 脚本来释放锁:
key 不存在,说明锁已释放,直接执行 publish
命令发布释放锁消息并返回 1
。
key 存在,但是 field 在 Hash 中不存在,说明自己不是锁持有者,无权释放锁,返回 nil
。
因为锁可重入,所以释放锁时不能把所有已获取的锁全都释放掉,一次只能释放一把锁,因此执行 hincrby
对锁的值减一 。
释放一把锁后,如果还有剩余的锁,则刷新锁的失效时间并返回 0
;如果刚才释放的已经是最后一把锁,则执行 del
命令删除锁的 key,并发布锁释放消息,返回 1
。
注意
这里有个实际开发过程中,容易出现很容易出现上面第二步异常,非锁的持有者释放锁时抛出异常。比如下面这种情况
1 2 3 4 5 6 7 8 9 10 redissonLock.lock("redisson" , 1 ); redissonLock.release("redisson" );
RedLock 命令实现锁的功能,这也是很多Redis分布式锁工具的设计原理。除此之外,Redisson还支持用”RedLock算法”来实现锁的效果,这个工具类就是RedissonRedLock
用法也很简单,创建多个Redisson Node, 由这些无关联的Node就可以组成一个完整的分布式锁
1 2 3 4 5 6 7 8 9 10 RLock lock1 = Redisson.create(config1).getLock(lockKey);RLock lock2 = Redisson.create(config2).getLock(lockKey);RLock lock3 = Redisson.create(config3).getLock(lockKey);RedissonRedLock redLock = new RedissonRedLock (lock1, lock2, lock3);try { redLock.lock(); } finally { redLock.unlock(); }
能一定程度上能有效防止Redis实例单点故障的问题,但并不完全可靠,不管是哪种设计,光靠Redis本身都是无法保证锁的强一致性的。
实践 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public interface Locker { void lock (String name, Runnable command) ; <T> T lock (String name, Supplier<T> command) ; void lockMutex (String name, Runnable command) ; <T> T lockMutex (String name, Supplier<T> command) ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 @Slf4j @Component public class RedisLockerImpl implements Locker { private final int timeout = 30 ; @Autowired private RedissonClient redissonClient; @Override public void lock (String name, Runnable command) { RLock lock = redissonClient.getLock(name); try { if (lock.tryLock(timeout, TimeUnit.SECONDS)) { log.debug("get lock, name={}" , name); try { command.run(); } finally { log.debug("release lock, name={}" , name); lock.unlock(); } } else { throw new RuntimeException ("网络异常,请稍后再试" ); } } catch (InterruptedException e) { throw new RuntimeException ("服务器异常,请稍后再试" ); } } @Override public <T> T lock (String name, Supplier<T> command) { RLock lock = redissonClient.getLock(name); try { if (lock.tryLock(timeout, TimeUnit.SECONDS)) { log.debug("get lock, name={}" , name); try { return command.get(); } finally { log.debug("release lock, name={}" , name); lock.unlock(); } } else { throw new RuntimeException ("网络异常,请稍后再试" ); } } catch (InterruptedException e) { throw new RuntimeException ("服务器异常,请稍后再试" ); } } @Override public void lockMutex (String name, Runnable command) { RLock lock = redissonClient.getLock(name); if (lock.tryLock()) { log.debug("get lock, name={}" , name); try { command.run(); } finally { log.debug("release lock, name={}" , name); lock.unlock(); } } else { throw new RuntimeException ("网络异常,请稍后再试" ); } } @Override public <T> T lockMutex (String name, Supplier<T> command) { RLock lock = redissonClient.getLock(name); if (lock.tryLock()) { log.debug("get lock, name={}" , name); try { return command.get(); } finally { log.debug("release lock, name={}" , name); lock.unlock(); } } else { return null ; } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 @Component public class LockUtil { private static Locker locker; @Autowired public void setLocker (Locker locker) { LockUtil.locker = locker; } public static void lock (String name, Runnable command) { locker.lock(name, command); } public static void lockMutex (String name, Runnable command) { locker.lockMutex(name, command); } public static <T> T lock (String name, Supplier<T> command) { return locker.lock(name, command); } public static <T> T lockMutex (String name, Supplier<T> command) { return locker.lockMutex(name, command); } public static void lockWithTx (String name, Runnable command) { locker.lock(name, () -> transactionHelper.run(command)); } public static String generateLockName (String ... seps) { StringBuilder sb = new StringBuilder (); for (String sep : seps) { sb = sb.append(sep).append("_" ); } return sb.substring(0 , sb.length() - 1 ); } }
三种锁的锁选择 最完美的就是lock锁
tryLock锁是可能会跳过减库存的操作,因为当过了等待时间还没有获取锁,就会返回false,这显然很致命!
注解锁只能用于方法上,颗粒度太大,满足不了方法内加锁。
1)lock锁
压测结果 1000个线程平均响应时间为31324。吞吐量 14.7/sec
2)tryLock锁 压测结果 1000个线程平均响应时间为28628。吞吐量 16.1/sec
这里只是单次测试,有很大的随机性。从当前环境单次测试来看,tryLock稍微高点。
2、常见异常 attempt to unlock lock, not ······ 在使用RedissonLock锁时,很容易报这类异常,比如如下操作
1 2 3 4 5 6 redissonLock.lock("redisson" , 1 ); redissonLock.release("redisson" );
上面在并发情况下就会这样
1 2 3 4 5 6 7 8 9 java. lang. IllegalMonitorStateException: attempt to unlock lock, not locked by current thread by node id: dad1c6d2-c511-416 -a30d-14c3a3083dbe thread-id: 80 at org. redisson. RedissonLock. unlock(RedissonLock. java:367 ) at com.jincou.redisson.RedissonLock.unlock(RedissonLock.java:95 ) at com.jincou.controller.LockController.lockDecreaseStock(LockController.java:38 )<13internalcalls> at javax.servlet.http.HttpServlet.service(HttpServlet.java:634 )<1internalcallo at javax.servlet.http.HttpServlet.service(HttpServlet.java:741 ) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain-lava:231 ) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain-java:166 )
造成异常原因:
线程1 进来获得锁后,但它的业务逻辑需要执行2秒,在 线程1 执行1秒后,这个锁就自动过期了,那么这个时候
线程2 进来了获得了锁。在线程1去解锁就会抛上面这个异常(因为解锁和当前锁已经不是同一线程了)
所以需要注意,设置锁的过期时间不能设置太小,一定要合理,宁愿设置大点。
正对上面的异常,可以通过isHeldByCurrentThread()方法,
1 2 3 4 if (redissonLock.isHeldByCurrentThread("lock" )) { redissonLock.unlock("lock" ); }
来源: https://www.cnblogs.com/qdhxhz/p/11046905.html
https://www.cnblogs.com/qdhxhz/p/11055426.html
https://www.cnblogs.com/qdhxhz/p/11059200.html
https://segmentfault.com/a/1190000039728618
https://zhuanlan.zhihu.com/p/135864820
https://www.modb.pro/db/57921