【Redis】3.详解分布式锁
创始人
2024-04-01 11:58:14
0

文章目录

  • 1. 什么是分布式锁
  • 2. 分布式锁的特点
  • 3. 常见的分布式锁
  • 4. 实现分布式锁
  • 5.解决分布式锁中的原子性问题
    • 5.1 Lua脚本
    • 5.2 使用Java代码调用Lua脚本实现原子性

1. 什么是分布式锁

分布式锁是指分布式系统或者不同系统之间共同访问共享资源的一种锁实现,其是互斥的,多个线程均可见。

分布式锁的核心是大家都用同一个锁,不同服务之间锁是一样的,那么就能锁得住进程,不让进程进入锁住的代码块中。

在这里插入图片描述

为什么会使用分布式锁呢?使用ReentrantLock或synchronized不行吗?

ReentrantLock和synchronized在分布式情况下,每个服务的锁对象都不一样,因为每个服务的锁对象都是不一样的,所以无法锁住不同服务的线程。


2. 分布式锁的特点

那么分布式锁有什么特点呢?

  1. 可见性:指所有线程都可见,无论是同一个服务还是不同服务,都可以感知到锁的变化
  2. 互斥:可以使得城西串行化
  3. 高可用:不容易崩溃,时时可用
  4. 高性能:由于加锁本身就让性能降低,所以对于分布式锁本身需要他就较高的加锁性能和释放锁性能
  5. 安全性:安全也是程序中必不可少的一环

在这里插入图片描述


3. 常见的分布式锁

常见的分布式锁有以下三种:

  1. Mysql:mysql本身就有锁机制,但是由于mysql性能一般,因此很少使用mysql作为分布式锁
  2. Redis:redis作为分布式锁是企业里面很常用的方式。主要是使用sentnx方法,如果插入key成功,也就代表获得了锁,如果有人插入成功,其他人插入失败则表示无法获得到锁,利用这套逻辑来实现分布式锁
  3. Zookeeper:zookeeper也是企业级开发中较好的一个实现分布式锁的方案,具体可看作者【单曲循环的寂寞】——基于zookeeper实现分布式锁
MYSQLredisZookeeper
互斥利用mysql本身的互斥锁机制利用sentnx这样的互斥命令利用节点的唯一性和有序性实现互斥
高可用
高性能一般一般
安全性断开连接,自动释放锁利用锁的超时时间,到期释放临时节点,断开连接自动释放

4. 实现分布式锁

分布式锁实现的有以下两个很重要的步骤:

  • 获取锁:

    • 互斥:只能有一个线程获取锁
    • 非阻塞:尝试一次,成功返回true,失败返回false
  • 释放锁:

    • 手动释放
    • 超时释放:获取锁时添加一个超时时间

核心思路

  • 利用redis的setNx方法,当很多线程进入抢夺锁的时候,线程1进入redis插入key,返回1,如果结果是1,证明获取锁成功,它就可以执行锁内的业务。当其他线程尝试获取锁的时候,无法获取成功,便会一定时间后重试。只有当线程1执行完业务,删掉该key之后,其他线程才能获取锁。

在这里插入图片描述

具体实现分布式锁的步骤如下:

  • 定义一个锁接口,接口里面有两个重要的方法

    • tryLock( long timeoutSec ):尝试获取锁,参数为锁持有的过期时间,过期后自动释放

    • unlock( ):释放锁

    • public interface ILock {/*** 尝试获取锁* @param timeoutSec 锁持有的超时时间,过期后自动释放* @return true代表获取锁成功; false代表获取锁失败*/boolean tryLock(long timeoutSec);/*** 释放锁*/void unlock();
      }
      
  • 定义一个类,实现该接口

    • public class SimpleRedisLock implements ILock {private String name;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}private static final String KEY_PREFIX = "lock:";//ture可以去掉uuid的横线private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";@Overridepublic boolean tryLock(long timeoutSec) {// 获取线程标示String threadId = ID_PREFIX + Thread.currentThread().getId();// 获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);//这里这样返回是因为防止拆箱的时候success为null,导致空指针异常return Boolean.TRUE.equals(success);}@Overridepublic void unlock() {// 获取线程标示String threadId = ID_PREFIX + Thread.currentThread().getId();// 获取锁中的标示String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);// 判断标示是否一致if(threadId.equals(id)) {// 释放锁stringRedisTemplate.delete(KEY_PREFIX + name);}}
      }
      

这里的分布式锁,是为了解决一人一单问题,也就是一个人只能下一单,因此key就是key前置+用户id。

而value,则是uuid+线程id。为什么这样做呢?

是因为如果单单用线程id作为value的话,在分布式情况下,用户发送多个请求过来,会出现分布式锁误删情况。当有一个线程1获取到锁(这把锁后面称为锁A),由于某种情况,线程1获取锁A之后出现了阻塞,导致锁A超时被释放。这时候线程2进来获取锁(这把锁后面称为锁B),这时候线程1反应过来,进行释放锁的操作,因为锁A已经超时释放了,这时候线程1释放的锁将会是线程2获取的锁B,因此出现了误删的情况。

在这里插入图片描述

因此,需要在每个线程释放锁之前,判断这把锁是否属于自己,如果属于自己则释放,如果不属于自己那么就不释放,防止误删。因为程序可能位于分布式的系统中,那么多个服务之间线程ID可能会出现一样,因此value不能单单使用线程ID,应该用uuid拼上线程ID,这样保证了分布式情况下value的唯一性。

总而言之,解决分布式锁误删问题的解决方案如下:

  1. value使用UUID+线程ID。
  2. 在释放锁的时候,先判断该锁是否是自己的,也就是判断value。
    1. 如果该key对应的value是自己存进去的那个value,则释放锁。
    2. 否则不释放锁,防止锁误删。

在这里插入图片描述

一人一单的业务逻辑:

  • 首先判断秒杀是否开始

    • @Override
      public Result seckillVoucher(Long voucherId) {// 1.查询优惠券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {// 尚未开始return Result.fail("秒杀尚未开始!");}// 3.判断秒杀是否已经结束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {// 尚未开始return Result.fail("秒杀已经结束!");}// 4.判断库存是否充足if (voucher.getStock() < 1) {// 库存不足return Result.fail("库存不足!");}return createVoucherOrder(voucherId);
      }
      
  • 秒杀开始之后创建订单

    • @Transactional
      public Result createVoucherOrder(Long voucherId) {// 5.一人一单Long userId = UserHolder.getUser().getId();// 创建锁对象SimpleRedisLock redisLock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);// 尝试获取锁boolean isLock = redisLock.tryLock(1200);// 判断if (!isLock) {// 获取锁失败,直接返回失败或者重试return Result.fail("不允许重复下单!");}try {// 5.1.查询订单int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();// 5.2.判断是否存在if (count > 0) {// 用户已经购买过了return Result.fail("用户已经购买过一次!");}// 6.扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1") // set stock = stock - 1.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0.update();if (!success) {// 扣减失败return Result.fail("库存不足!");}// 7.创建订单VoucherOrder voucherOrder = new VoucherOrder();// 7.1.订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 7.2.用户idvoucherOrder.setUserId(userId);// 7.3.代金券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);// 7.返回订单idreturn Result.ok(orderId);} finally {// 释放锁redisLock.unlock();}}
      

这里分布式锁的作用,主要是保证程序在创建订单,将订单数据插入数据库这一过程中,只有一个线程在代码块中执行,防止超买超卖的情况,有效解决一人一单。


5.解决分布式锁中的原子性问题

虽然上面解决的分布式锁误删的情况,但是会出现分布式锁的原子性问题。

线程1尝试获取锁,获取锁成功(这把锁下面称为锁A),线程1执行完业务逻辑,准备释放锁。这时候其他线程进不来,所以这把锁肯定是线程1自己的,当线程1判断完标识为一致后,准备执行释放锁操作,这时候由于某种情况,线程1阻塞了,由于阻塞时间太长,锁A超时释放了。这时候线程2获取锁成功(这把锁后面称为锁B),这时候线程1不再阻塞,执行释放锁操作,因为锁A超时释放,因此线程1执行的释放锁操作释放的是线程2获取的锁B,因此依然出现了锁误删情况**,出现这一情况的原因是因为判断锁是否是自己的和释放锁这是两个操作**,不存在原子性。

在这里插入图片描述

解决方法也很简单,只需要保证判断锁和释放锁这两个操作是原子的就可以了。


5.1 Lua脚本

Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言,它的基本语法大家可以参考网站:https://www.runoob.com/lua/lua-tutorial.html,这里重点介绍Redis提供的调用函数,我们可以使用lua去操作redis,又能保证他的原子性,这样就可以实现拿锁比锁删锁是一个原子性动作了。

Redis提供的调用函数,语法如下:

redis.call('命令名称', 'key', '其它参数', ...)

比如执行set name jack,则脚本如下:

# 执行 set name jack
redis.call('set', 'name', 'jack')

使用redis命令执行脚本

在这里插入图片描述

上面是写死的情况,如果不想写死,可以使用参数传递。key类型参数会放入KEYS数组,其它参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数(注意数组是从1开始):

在这里插入图片描述

因此,上面释放锁的操作写成Lua脚本就可以这样写

-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then-- 一致,则删除锁return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0

5.2 使用Java代码调用Lua脚本实现原子性

在RedisTemplate已经为我们封装好了一个execute方法用来执行Lua脚本

public  T execute(RedisScript script, List keys, Object... args) {return this.scriptExecutor.execute(script, keys, args);
}
  1. script:Lua脚本
  2. keys:KEYS数组
  3. args:ARGV数组

在这里插入图片描述

private static final DefaultRedisScript UNLOCK_SCRIPT;static {UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);
}public void unlock() {// 调用lua脚本stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX + name),ID_PREFIX + Thread.currentThread().getId());
}

这里可以将Lua脚本写在一个Lua文件中,从而实现解耦,可以使用Spring的ClassPathResource读取Class类下的文件。


参考:黑马程序员Redis入门到实战教程,深度透析redis底层原理+redis分布式锁+企业解决方案+黑马点评实战项目


相关内容

热门资讯

监控摄像头接入GB28181平... 流程简介将监控摄像头的视频在网站和APP中直播,要解决的几个问题是:1&...
Windows10添加群晖磁盘... 在使用群晖NAS时,我们需要通过本地映射的方式把NAS映射成本地的一块磁盘使用。 通过...
protocol buffer... 目录 目录 什么是protocol buffer 1.protobuf 1.1安装  1.2使用...
在Word、WPS中插入AxM... 引言 我最近需要写一些文章,在排版时发现AxMath插入的公式竟然会导致行间距异常&#...
Fluent中创建监测点 1 概述某些仿真问题,需要创建监测点,用于获取空间定点的数据࿰...
educoder数据结构与算法...                                                   ...
MySQL下载和安装(Wind... 前言:刚换了一台电脑,里面所有东西都需要重新配置,习惯了所...
MFC文件操作  MFC提供了一个文件操作的基类CFile,这个类提供了一个没有缓存的二进制格式的磁盘...
有效的括号 一、题目 给定一个只包括 '(',')','{','}'...
【PdgCntEditor】解... 一、问题背景 大部分的图书对应的PDF,目录中的页码并非PDF中直接索引的页码...