我们先来看看之前的基于setnx实现的分布式锁存在的问题:
我们之前实现的分布式锁是基于redis的setnx命令的特性的!
但是,这样子实现起来会有很多弊端!
简单的来说就是一旦setnx [key] [value]后,就不能再对这个key做任何操作了(除了删除)
假设我们在开发中有A和B两个业务,在业务A中,执行了setnx操作,然后在业务A中调用业务B。
然后在业务B中也有setnx的操作(同一个KEY)
此时,业务B就会阻塞在这里,等待业务A释放锁
但是,业务A肯定不会释放锁,因为业务A还没有执行完(调B)。故就会发生死锁。
在我们之前业务逻辑中,尝试获取锁,如果获取不到就直接return了,没有“重来”的机会!也无法提供重试的机制!
我们之前,分析过分布式锁被误删的问题。这个问题是已经解决了。
但是,仍然会存在隐患!我们这里是用TTL来控制它。业务执行,时间多少,这是一个未知数,TTL要怎么设置?如何处理业务阻塞?
在主节点上获取到了锁,但是主节点突然宕机了,就会从从结点中选出一个节点,作为主节点。
但由于,因为之前的那个主节点宕机了。在新选举出来的这个主节点中是无法获取到之前的锁。
所以之前的那个锁相当于失效了!
要解决上述问题并不是那么容易的,如果我们自己实现很有可能会出一些问题!所以最好的办法就是使用市面上的一些框架来解决!
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
0. 项目介绍 - 《Redisson 使用手册》 - 书栈网 · BookStackhttps://www.bookstack.cn/read/redisson-wiki-zh/Redisson%E9%A1%B9%E7%9B%AE%E4%BB%8B%E7%BB%8D.md里面提到了Redisson可以实现大致如下的分布式锁
org.redisson redisson 3.13.6
/*** 配置 Redisson*/
@Configuration
public class RedisConfig {@Beanpublic RedissonClient redissonClient() {// 配置Config config = new Config();config.useSingleServer().setAddress("redis://192.168.89.128:6379").setPassword("888888");// 创建 RedissonClient 对象return Redisson.create(config);}
}
@Test
void testRedisson() throws Exception {RLock anyLock = redissonClient.getLock("anyLock");boolean isLock = anyLock.tryLock(1, 10, TimeUnit.SECONDS);if(isLock) {try {System.out.println("执行业务");} finally {anyLock.unlock();}}
}
测试结果
这里可重入锁的实现 和 Java的 ReentrantLock 类似!
获取锁的时候,先判断是不是同一个对象,是就将 value+1,释放锁的时候就 value-1,当其小于0时就将该key删除!
在Redis中使用 Hash结构 去存储!
private RFuture tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {if (leaseTime != -1) {return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);}RFuture ttlRemainingFuture = tryLockInnerAsync(waitTime,commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e != null) {return;}if (ttlRemaining) {scheduleExpirationRenewal(threadId);}});return ttlRemainingFuture;
}
RFuture tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {internalLockLeaseTime = unit.toMillis(leaseTime);// 在Lua脚本中起始位是1return evalWriteAsync(getName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
上述代码中字符串部分就是Lua脚本,Redisson用其实现可重入锁!
Redisson 获取锁中的Lua脚本源码解析
-- 判断锁是否存在
if (redis.call('exists', KEYS[1]) == 0) then-- 不存在,获取锁redis.call('hincrby', KEYS[1], ARGV[2], 1);-- 设置有效期redis.call('pexpire', KEYS[1], ARGV[1]);return nil;
end;
-- 锁已经存在,判断是否是自己?!
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then-- 自增+1redis.call('hincrby', KEYS[1], ARGV[2], 1);-- 重置有效期redis.call('pexpire', KEYS[1], ARGV[1]);return nil;
end;
return redis.call('pttl', KEYS[1]);
protected RFuture unlockInnerAsync(long threadId) {return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end; " +"return nil;",Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
Redisson 释放锁中的Lua脚本源码解析
-- 判断当前锁是否还是被自己持有
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then-- 不是就就直接返回return nil;end;
-- 是自己,则重入次数 -1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
-- 判断重入次数是否已经为0
if (counter > 0) then-- 大于0,说明不能释放,重置有效期即可redis.call('pexpire', KEYS[1], ARGV[2]);return 0;
else-- 等于0,说明可以直接删除redis.call('del', KEYS[1]);-- 发消息redis.call('publish', KEYS[2], ARGV[1]);return 1;end;
return nil;
我们这边模拟一下锁重入的场景。方法A上锁后调方法B,方法B也获取锁(如果是不可重入,这里就会阻塞!)
/*** Redisson的单元测试*/
@SpringBootTest
@Slf4j
public class RedissonTest {@Resourceprivate RedissonClient redissonClient;private RLock lock;@BeforeEachvoid setUp() {lock = redissonClient.getLock("order");}@Testvoid method1() {boolean isLock = lock.tryLock();if (!isLock) {log.error("获取锁失败 ... 1");return;}try {log.info("获取锁成功 ... 1");method2();log.info("开始执行业务 ... 1");} finally {log.warn("准备释放锁 ... 1");lock.unlock();}}@Testvoid method2() {boolean isLock = lock.tryLock();if (!isLock) {log.error("获取锁失败 ... 2");return;}try {log.info("获取锁成功 ... 2");log.info("开始执行业务 ... 2");} finally {log.warn("准备释放锁 ... 2");lock.unlock();}}
}
运行结果
Redis 中值的情况
为了Redis的可靠性,我们一般会使用Redis的主从模式。
使用了主从模式,一般会采用读写分离的策略,主节点写,从节点读!
那么,当数据被写入主节点的时候,主节点时需要向从节点去同步数据的!
这个过程一定会有延时,一致性问题也就发生在这里!
假如,在主节点中获取到了锁,在主节点向从节点同步这个锁信息的时候,主节点宕机了!那么从节点就会从中挑选一个作为主节点!
可是,此时之前的锁信息就丢失了!也就发生了锁失效的问题!!!
之前我们分析了,主从模式是导致锁失效的原因,所以Redisson中就直接将它们视为相同的角色!
此时,我们获取锁的方式就变了,获取锁的时候,我们需要依次向全部节点获取锁,只有都获取成功时才算成功!!!
如果此时也发生了刚刚描述的问题,是不会出现锁失效的问题的!
分析如下
这套方案就是Redisson中的联锁——MultiLock
分布式锁和同步器 - 《Redisson 官方文档中文翻译》 - 书栈网 · BookStackhttps://www.bookstack.cn/read/redisson-doc-cn/distributed_locks.md#83-multilock
@Configuration
public class RedisConfig {@Beanpublic RedissonClient redissonClient() {// 配置Config config = new Config();config.useSingleServer().setAddress("redis://192.168.89.128:6379").setPassword("888888");// 创建 RedissonClient 对象return Redisson.create(config);}@Beanpublic RedissonClient redissonClient2() {// 配置Config config = new Config();config.useSingleServer().setAddress("redis://192.168.89.128:6380");// 创建 RedissonClient 对象return Redisson.create(config);}@Beanpublic RedissonClient redissonClient3() {// 配置Config config = new Config();config.useSingleServer().setAddress("redis://192.168.89.128:6381");// 创建 RedissonClient 对象return Redisson.create(config);}
}
其实和我们之前的代码没有什么差别!
@SpringBootTest
@Slf4j
public class RedissonTest {@Resourceprivate RedissonClient redissonClient;@Resourceprivate RedissonClient redissonClient2;@Resourceprivate RedissonClient redissonClient3;private RLock lock;@BeforeEachvoid setUp() {RLock lock1 = redissonClient.getLock("order");RLock lock2 = redissonClient.getLock("order");RLock lock3 = redissonClient.getLock("order");// 创建联锁lock = redissonClient.getMultiLock(lock1, lock2, lock3);}@Testvoid method1() throws InterruptedException {boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);if (!isLock) {log.error("获取锁失败 ... 1");return;}try {log.info("获取锁成功 ... 1");method2();log.info("开始执行业务 ... 1");} finally {log.warn("准备释放锁 ... 1");lock.unlock();}}@Testvoid method2() {boolean isLock = lock.tryLock();if (!isLock) {log.error("获取锁失败 ... 2");return;}try {log.info("获取锁成功 ... 2");log.info("开始执行业务 ... 2");} finally {log.warn("准备释放锁 ... 2");lock.unlock();}}
}
我们可以跟一下代码看看!
@Override
public RLock getMultiLock(RLock... locks) {return new RedissonMultiLock(locks);
}
在这里就可以发生,不管是哪一个对象来调,其实都是一样的,这里面其实是在new一个对象RedissonMultiLock() ,所以谁去调getMultiLock()都是一样的!!!
final List locks = new ArrayList<>();public RedissonMultiLock(RLock... locks) {if (locks.length == 0) {throw new IllegalArgumentException("Lock objects are not defined");}this.locks.addAll(Arrays.asList(locks));
}
在这里可以发现,这个可变参数被视为集合,然后都添加到数组(集合)里面去了!
所以,按照联锁的原理,在获取锁的时候,也会依次把集合中的每一个锁都获取一次!
我们这里跟一下tryLock的源码(RedissonMultiLock)
@Override
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {return tryLock(waitTime, -1, unit);
}
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long newLeaseTime = -1;if (leaseTime != -1) {// waitTime 为 -1,表示不重试if (waitTime == -1) {newLeaseTime = unit.toMillis(leaseTime);} else {// 如果重试就会对时间做个扩容(放弃waitTime,使用newLeaseTime!)newLeaseTime = unit.toMillis(waitTime)*2; }}long time = System.currentTimeMillis();// 剩余时间long remainTime = -1;if (waitTime != -1) {remainTime = unit.toMillis(waitTime);}// 锁等待时间 与 剩余时间是一样的!long lockWaitTime = calcLockWaitTime(remainTime);int failedLocksLimit = failedLocksLimit();// 定义一个获取锁成功的集合,初始化肯定是0List acquiredLocks = new ArrayList<>(locks.size());for (ListIterator iterator = locks.listIterator(); iterator.hasNext();) {RLock lock = iterator.next();boolean lockAcquired;try {if (waitTime == -1 && leaseTime == -1) {lockAcquired = lock.tryLock();} else {long awaitTime = Math.min(lockWaitTime, remainTime);lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);}} catch (RedisResponseTimeoutException e) {unlockInner(Arrays.asList(lock));lockAcquired = false;} catch (Exception e) {lockAcquired = false;}if (lockAcquired) {acquiredLocks.add(lock);} else {if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {break;}if (failedLocksLimit == 0) {unlockInner(acquiredLocks);if (waitTime == -1) {return false;}failedLocksLimit = failedLocksLimit();acquiredLocks.clear();// reset iteratorwhile (iterator.hasPrevious()) {iterator.previous();}} else {failedLocksLimit--;}}if (remainTime != -1) {remainTime -= System.currentTimeMillis() - time;time = System.currentTimeMillis();if (remainTime <= 0) {unlockInner(acquiredLocks);return false;}}}
下一篇:0116 查找算法 Day5