分布式锁是指分布式系统或者不同系统之间共同访问共享资源的一种锁实现,其是互斥的,多个线程均可见。
分布式锁的核心是大家都用同一个锁,不同服务之间锁是一样的,那么就能锁得住进程,不让进程进入锁住的代码块中。
为什么会使用分布式锁呢?使用ReentrantLock或synchronized不行吗?
ReentrantLock和synchronized在分布式情况下,每个服务的锁对象都不一样,因为每个服务的锁对象都是不一样的,所以无法锁住不同服务的线程。
那么分布式锁有什么特点呢?
常见的分布式锁有以下三种:
MYSQL | redis | Zookeeper | |
---|---|---|---|
互斥 | 利用mysql本身的互斥锁机制 | 利用sentnx这样的互斥命令 | 利用节点的唯一性和有序性实现互斥 |
高可用 | 好 | 好 | 好 |
高性能 | 一般 | 好 | 一般 |
安全性 | 断开连接,自动释放锁 | 利用锁的超时时间,到期释放 | 临时节点,断开连接自动释放 |
分布式锁实现的有以下两个很重要的步骤:
获取锁:
释放锁:
核心思路:
具体实现分布式锁的步骤如下:
定义一个锁接口,接口里面有两个重要的方法
tryLock( long timeoutSec ):尝试获取锁,参数为锁持有的过期时间,过期后自动释放
unlock( ):释放锁
public interface ILock {/*** 尝试获取锁* @param timeoutSec 锁持有的超时时间,过期后自动释放* @return true代表获取锁成功; false代表获取锁失败*/boolean tryLock(long timeoutSec);/*** 释放锁*/void unlock();
}
定义一个类,实现该接口
public class SimpleRedisLock implements ILock {private String name;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}private static final String KEY_PREFIX = "lock:";//ture可以去掉uuid的横线private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";@Overridepublic boolean tryLock(long timeoutSec) {// 获取线程标示String threadId = ID_PREFIX + Thread.currentThread().getId();// 获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);//这里这样返回是因为防止拆箱的时候success为null,导致空指针异常return Boolean.TRUE.equals(success);}@Overridepublic void unlock() {// 获取线程标示String threadId = ID_PREFIX + Thread.currentThread().getId();// 获取锁中的标示String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);// 判断标示是否一致if(threadId.equals(id)) {// 释放锁stringRedisTemplate.delete(KEY_PREFIX + name);}}
}
这里的分布式锁,是为了解决一人一单问题,也就是一个人只能下一单,因此key就是key前置+用户id。
而value,则是uuid+线程id。为什么这样做呢?
是因为如果单单用线程id作为value的话,在分布式情况下,用户发送多个请求过来,会出现分布式锁误删情况。当有一个线程1获取到锁(这把锁后面称为锁A),由于某种情况,线程1获取锁A之后出现了阻塞,导致锁A超时被释放。这时候线程2进来获取锁(这把锁后面称为锁B),这时候线程1反应过来,进行释放锁的操作,因为锁A已经超时释放了,这时候线程1释放的锁将会是线程2获取的锁B,因此出现了误删的情况。
因此,需要在每个线程释放锁之前,判断这把锁是否属于自己,如果属于自己则释放,如果不属于自己那么就不释放,防止误删。因为程序可能位于分布式的系统中,那么多个服务之间线程ID可能会出现一样,因此value不能单单使用线程ID,应该用uuid拼上线程ID,这样保证了分布式情况下value的唯一性。
总而言之,解决分布式锁误删问题的解决方案如下:
一人一单的业务逻辑:
首先判断秒杀是否开始
@Override
public Result seckillVoucher(Long voucherId) {// 1.查询优惠券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {// 尚未开始return Result.fail("秒杀尚未开始!");}// 3.判断秒杀是否已经结束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {// 尚未开始return Result.fail("秒杀已经结束!");}// 4.判断库存是否充足if (voucher.getStock() < 1) {// 库存不足return Result.fail("库存不足!");}return createVoucherOrder(voucherId);
}
秒杀开始之后创建订单
@Transactional
public Result createVoucherOrder(Long voucherId) {// 5.一人一单Long userId = UserHolder.getUser().getId();// 创建锁对象SimpleRedisLock redisLock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);// 尝试获取锁boolean isLock = redisLock.tryLock(1200);// 判断if (!isLock) {// 获取锁失败,直接返回失败或者重试return Result.fail("不允许重复下单!");}try {// 5.1.查询订单int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();// 5.2.判断是否存在if (count > 0) {// 用户已经购买过了return Result.fail("用户已经购买过一次!");}// 6.扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1") // set stock = stock - 1.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0.update();if (!success) {// 扣减失败return Result.fail("库存不足!");}// 7.创建订单VoucherOrder voucherOrder = new VoucherOrder();// 7.1.订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 7.2.用户idvoucherOrder.setUserId(userId);// 7.3.代金券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);// 7.返回订单idreturn Result.ok(orderId);} finally {// 释放锁redisLock.unlock();}}
这里分布式锁的作用,主要是保证程序在创建订单,将订单数据插入数据库这一过程中,只有一个线程在代码块中执行,防止超买超卖的情况,有效解决一人一单。
虽然上面解决的分布式锁误删的情况,但是会出现分布式锁的原子性问题。
线程1尝试获取锁,获取锁成功(这把锁下面称为锁A),线程1执行完业务逻辑,准备释放锁。这时候其他线程进不来,所以这把锁肯定是线程1自己的,当线程1判断完标识为一致后,准备执行释放锁操作,这时候由于某种情况,线程1阻塞了,由于阻塞时间太长,锁A超时释放了。这时候线程2获取锁成功(这把锁后面称为锁B),这时候线程1不再阻塞,执行释放锁操作,因为锁A超时释放,因此线程1执行的释放锁操作释放的是线程2获取的锁B,因此依然出现了锁误删情况**,出现这一情况的原因是因为判断锁是否是自己的和释放锁这是两个操作**,不存在原子性。
解决方法也很简单,只需要保证判断锁和释放锁这两个操作是原子的就可以了。
Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言,它的基本语法大家可以参考网站:https://www.runoob.com/lua/lua-tutorial.html,这里重点介绍Redis提供的调用函数,我们可以使用lua去操作redis,又能保证他的原子性,这样就可以实现拿锁比锁删锁是一个原子性动作了。
Redis提供的调用函数,语法如下:
redis.call('命令名称', 'key', '其它参数', ...)
比如执行set name jack,则脚本如下:
# 执行 set name jack
redis.call('set', 'name', 'jack')
使用redis命令执行脚本
上面是写死的情况,如果不想写死,可以使用参数传递。key类型参数会放入KEYS数组,其它参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数(注意数组是从1开始):
因此,上面释放锁的操作写成Lua脚本就可以这样写
-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then-- 一致,则删除锁return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0
在RedisTemplate已经为我们封装好了一个execute方法用来执行Lua脚本
public T execute(RedisScript script, List keys, Object... args) {return this.scriptExecutor.execute(script, keys, args);
}
private static final DefaultRedisScript UNLOCK_SCRIPT;static {UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);
}public void unlock() {// 调用lua脚本stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX + name),ID_PREFIX + Thread.currentThread().getId());
}
这里可以将Lua脚本写在一个Lua文件中,从而实现解耦,可以使用Spring的ClassPathResource读取Class类下的文件。
参考:黑马程序员Redis入门到实战教程,深度透析redis底层原理+redis分布式锁+企业解决方案+黑马点评实战项目