项目源码 https://gitee.com/qiuyusy/small-project-study
听说你简历没有亮点?要不来个秒杀,一起来被面试官拷打
本文使用乐观锁/悲观锁/分布式锁分别实现秒杀,并使用Redis + Lua + MQ 实现优化
首先我们需要创建如下三张表
CREATE TABLE `tb_goods` (`id` bigint NOT NULL,`good_name` varchar(255) NOT NULL,`price` bigint NOT NULL,`stock` int NOT NULL,`begin_time` timestamp NOT NULL,`end_time` timestamp NOT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;CREATE TABLE `tb_order` (`id` bigint NOT NULL,`user_id` bigint NOT NULL,`good_id` bigint NOT NULL,`create_time` timestamp NOT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;CREATE TABLE `tb_order` (`id` bigint NOT NULL,`user_id` bigint NOT NULL,`good_id` bigint NOT NULL,`create_time` timestamp NOT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
首先完成Entity,为了之后的分布式拓展,主键使用雪花算法
@Data
@TableName("tb_goods")
public class Good {@TableId(type = IdType.ASSIGN_ID)private Long id;private String goodName;private Long price;private Integer stock;private LocalDateTime beginTime;private LocalDateTime endTime;
}@Data
@TableName("tb_order")
public class Order {@TableId(type = IdType.ASSIGN_ID)private Long id;private Long userId;private Long goodId;private LocalDateTime createTime;
}@Data
@TableName("tb_user")
public class User {@TableId(type = IdType.ASSIGN_ID)private Long id;private String username;private String password;private Long point;
}
然后是Mapper层
注意这里有个坑,如果使用Mybatis-Plus的update方法
他生成的sql语句会把所有的字段都set一遍,返回的结果会一直是>0,所以这里update采用xml写
@Mapper
public interface GoodMapper extends BaseMapper {/*** 更新商品库存* @param goodId 商品ID* @param nums 减去的数量* @return*/int updateStock(@Param("goodId") Long goodId, @Param("nums") Integer nums);
}
@Mapper
public interface OrderMapper extends BaseMapper {
}
@Mapper
public interface UserMapper extends BaseMapper {/*** * @param userId 用户ID* @param nums 减去点数数量* @return*/int updatePoint(@Param("userId") Long userId, @Param("nums") Long nums);
}
update seckill.tb_goodsset stock = stock - #{nums}where id = #{goodId};
update seckill.tb_userset point = tb_user.point - #{nums}where id = #{userId};
直接使用SpringBootTest写几个例子插入即可
商品表插入数据
@SpringBootTest
class GoodMapperTest {@Resourceprivate GoodMapper goodMapper;@Testvoid testInset(){Good good = new Good();good.setGoodName("皮卡丘头套");good.setStock(100);good.setPrice(10L);good.setBeginTime(LocalDateTime.of(2023,3,8,3,0,0));good.setEndTime(LocalDateTime.of(2023,3,15,3,0,0));goodMapper.insert(good);}@Testvoid updateStock() {goodMapper.updateStock(1633188654610382849L,1);}
}
用户表插入数据
class UserMapperTest {@Resourceprivate UserMapper userMapper;@Testvoid testInsert(){for (int i = 0; i < 100; i++) {User user = new User();user.setUsername("user_" + i);user.setPassword("123456");user.setPoint(1000L);userMapper.insert(user);}}
}
首先,我们分析一下业务流程是什么样的
用户购买商品,所以我们需要知道用户ID和商品ID来找到他们,所以设计如下接口,获得用户ID和商品ID
然后我们将具体业务放在Service层解决,所以暂时直接调用
@RestController
@RequestMapping("/good")
public class GoodController {@Resourceprivate GoodService goodService;@PostMapping("/buy/{userId}/{goodId}")public String buyGood(@PathVariable("userId") Long userId,@PathVariable("goodId") Long goodId){return goodService.buyGood(userId, goodId);}
}
然后我们就来设计Service层
Service层需要做的事情如下
那么我们就根据这个流程来编写代码吧
我们为了方便使用Mybatis-Plus来完成CRUD
package com.qiuyu.service;
/*** @author QiuYuSY* @create 2023-03-08 3:43*/
@Service
public class GoodService {@Resourceprivate GoodMapper goodMapper;@Resourceprivate UserMapper userMapper;@Resourceprivate OrderMapper orderMapper;@Transactionalpublic String buyGood(Long userId, Long goodId) {// 1.根据用户ID找出用户User user = userMapper.selectOne(new QueryWrapper().eq("id", userId));if(user == null){return "用户未找到";}//2. 根据商品id找到商品Good good = goodMapper.selectOne(new QueryWrapper().eq("id", goodId));if(good == null){return "商品未找到";}//3.判断是否在售卖时间if (LocalDateTime.now().isBefore(good.getBeginTime())) {return "未到售卖时间";}if (LocalDateTime.now().isAfter(good.getEndTime())) {return "已过售卖时间";}// 4. 判断商品是否有库存if(good.getStock() <= 0) {return "商品售罄";}// 5. 判断用户积分是否足够if(user.getPoint() < good.getPrice()){return "积分不足,无法购买";}// 6. 扣除积分int id = userMapper.updatePoint(userId, good.getPrice());if(id <= 0){throw new RuntimeException("积分不足,无法购买");}// 7. 购买商品int update = goodMapper.updateStock(goodId, 1);if(update <= 0){throw new RuntimeException("库存发生变化,购买失败");}// 8. 生成订单Order order = new Order();order.setUserId(userId);order.setGoodId(goodId);order.setCreateTime(LocalDateTime.now());int insert = orderMapper.insert(order);if(insert <= 0){throw new RuntimeException("生成订单失败");}return "ok!";}
}
因为要保证扣除积分、购买商品、生成订单的一致性,所以时用@Transactional,并在修改失败的时候抛出异常,进行回滚,所以Controller层也需要处理一下异常。
@RestController
@RequestMapping("/good")
public class GoodController {@Resourceprivate GoodService goodService;@PostMapping("/buy/{userId}/{goodId}")public String buyGood(@PathVariable("userId") Long userId,@PathVariable("goodId") Long goodId){String result = null;try{result = goodService.buyGood(userId, goodId);}catch (Exception e){result = e.getMessage();}return result;}
}
我们发现一套流程下来行云流水,好像没有丝毫问题。但是真的这样吗
我们使用Jmeter来测试一下吧,商品库存设置100,200个线程直接开跑
跑之前数据库数据
商品表:商品库存为100个
用户表:积分充足
订单表为空:
跑之后数据
商品表:商品库存为-7个
订单表:订单生成了107条
用户表:
为什么商品数量会为负数,只有100个商品却卖出了107单?
这就是秒杀中会出现的超卖问题
那么我们应该如何进行解决呢?接下来我主要从悲观锁、乐观锁、分布式锁三个方面来展示
Java中的悲观锁,大家最熟悉的肯定就是synchronized
了
最简单的方法就是直接方法上加锁
但是这样的话,锁的粒度太大,不太推荐
所以我们使用商品的ID的字符串进行锁,intern()表示字符串常量池中的对象
@Transactionalpublic String buyGood(Long userId, Long goodId) {synchronized (goodId.toString().intern()) {// 1.根据用户ID找出用户User user = userMapper.selectOne(new QueryWrapper().eq("id", userId));if (user == null) {return "用户未找到";}//2. 根据商品id找到商品Good good = goodMapper.selectOne(new QueryWrapper().eq("id", goodId));if (good == null) {return "商品未找到";}//3.判断是否在售卖时间if (LocalDateTime.now().isBefore(good.getBeginTime())) {return "未到售卖时间";}if (LocalDateTime.now().isAfter(good.getEndTime())) {return "已过售卖时间";}// 4. 判断商品是否有库存if (good.getStock() <= 0) {return "商品售罄";}// 5. 判断用户积分是否足够if (user.getPoint() < good.getPrice()) {return "积分不足,无法购买";}// 6. 扣除积分int id = userMapper.updatePoint(userId, good.getPrice());if (id <= 0) {throw new RuntimeException("积分不足,无法购买");}// 7. 购买商品int update = goodMapper.updateStock(goodId, 1);if (update <= 0) {throw new RuntimeException("库存发生变化,购买失败");}// 8. 生成订单Order order = new Order();order.setUserId(userId);order.setGoodId(goodId);order.setCreateTime(LocalDateTime.now());int insert = orderMapper.insert(order);if (insert <= 0) {throw new RuntimeException("生成订单失败");}}return "ok!";}
本以为万无一失,但是居然依旧发生了问题,WTF?
分析下原因,我们使用了Spring事务来保证一致性,而锁在事务的内部,也就是说锁释放完后,事务还未提交,这也会产生问题捏。
鼠鼠受不了咧
那么如何解决呢?
在外层,也就是Controller中加锁
@PostMapping("/buy/{userId}/{goodId}")
public String buyGood(@PathVariable("userId") Long userId,@PathVariable("goodId") Long goodId
){String result = null;try{synchronized (goodId.toString().intern()) {result = goodService.buyGood(userId, goodId);}}catch (Exception e){result = e.getMessage();}return result;
}
ok 完成,成功把锁的粒度从goodService对象降低到了一个商品ID
说到乐观锁大家可能会想到CAS 和 version 字段
这里我就用stock库存字段来代替version字段了
第一步,修改一下Mapper接口,加入stock字段,进行stock的比对
int updateStock(@Param("goodId") Long goodId, @Param("nums") Integer nums, @Param("stock") Integer stock);
update seckill.tb_goodsset stock = stock - #{nums}where id = #{goodId} and stock = #{stock};
记得把悲观锁去掉,我们再来试一下
发现剩下54个库存
居然没卖完,多跑几次,是可以的,也解决了超卖问题
但是大家也都发现了,这种实现效率非常低。
也就是同时只有一个线程成功,这个并发太低了
之前是判断库存数和查询时的库存数相等才能购买,其实我们应该是判断库存>0就能购买
修改如下
@Mapper
public interface GoodMapper extends BaseMapper {/*** 更新商品库存* @param goodId 商品ID* @param nums 减去的数量* @return*/int updateStock(@Param("goodId") Long goodId, @Param("nums") Integer nums);
}
判断库存>0时就能够购买
update seckill.tb_goodsset stock = stock - #{nums}where id = #{goodId} and stock > 0;
OK,我们再来试一下
跑一次就成功卖完了,鼠鼠哭死
其实这里是利用到了MySQL RR 的排他锁,这样想来这个乐观锁也不是特别乐观捏
为了展示分布式下的超卖问题,我们加一个端口8082服务
Nginx配置如下
乐观锁估计是因为加了数据库的锁,无法出现错误
改成悲观锁,测试,出现问题!
先写一个接口
public interface ILock {/*** 加锁* @param timeoutSec 超时时间* @return*/boolean tryLock(long timeoutSec);/*** 释放锁*/void unlock();
}
这里的释放锁需要使用lua脚本来保证原子性
unock.lua
if (redis.call('GET', KEYS[1]) == ARGV[1]) then-- 当前线程的value与redis中value一致,直接删return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0
然后去实现接口ILock
package com.qiuyu.util;public class RedisLock implements ILock {private StringRedisTemplate stringRedisTemplate;private String name; //key//因为当前类没有被Spring接管,StringRedisTeplate让调用者传进来public RedisLock(StringRedisTemplate stringRedisTemplate, String name) {this.stringRedisTemplate = stringRedisTemplate;this.name = name;}private static final String KEY_PREFIX = "lock:"; //key的前缀//生成一个UUID随机数用来做value,这是为了防止误删别的线程的锁public static final String VALUE = UUID.randomUUID().toString().replaceAll("-", "");private static final DefaultRedisScript UNLOCK_SCRIPT;static {// 静态代码块加载lua脚本UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class); //设置放回值类型}@Overridepublic boolean tryLock(long timeoutSec) {Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name,VALUE, timeoutSec, TimeUnit.SECONDS);// 防止拆箱NPEreturn Boolean.TRUE.equals(success);}@Overridepublic void unlock() {//调用lua脚本stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX + name),VALUE);}
}
Controller再改下
@PostMapping("/buy/{userId}/{goodId}")public String buyGood(@PathVariable("userId") Long userId,@PathVariable("goodId") Long goodId){String result = null;ILock lock = new RedisLock(stringRedisTemplate, "good:" + goodId); //拿到锁boolean isLock = lock.tryLock(10); //加锁if(!isLock){return "争抢锁失败";}try{result = goodService.buyGood(userId, goodId);}catch (Exception e){result = e.getMessage();}finally {lock.unlock(); //解锁}return result;}
}
出现找不到LUA文件的情况,加入如下配置即可
src/main/resources **/*.properties **/*.yml **/*.xml **/*.lua true src/main/java **/*.xml true
测试下,成功
我们自己实现的Redis锁有以下问题
Redisson用Hash结构解决了可重入的问题
用看门狗解决了续约的问题
用MutiLock解决了Redis集群的主从一致性问题
实践
引入依赖:建议用这个而不是starter,因为starter会覆盖redis的配置
org.redisson redisson 3.13.6
配置Redisson客户端:
@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient(){// 配置Config config = new Config();config.useSingleServer().setAddress("redis://192.168.222.128:6379");// 创建RedissonClient对象return Redisson.create(config);}
}
然后直接用就行
@RestController
@RequestMapping("/good")
public class GoodController {@Resourceprivate GoodService goodService;@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate RedissonClient redissonClient;@PostMapping("/buy/{userId}/{goodId}")public String buyGood(@PathVariable("userId") Long userId,@PathVariable("goodId") Long goodId){String result = null;RLock lock = redissonClient.getLock("good:" + goodId);//不要给锁设定过期时间才会开启看门狗boolean isLock = lock.tryLock(); if(!isLock){return "争抢锁失败";}try{result = goodService.buyGood(userId, goodId);}catch (Exception e){result = e.getMessage();}finally {lock.unlock();}return result;}
}
测试,也是没问题
异步秒杀优化的思路就是将前面1-5步这些判断逻辑放到redis中,如果可以下单的话,就发送一个消息到MQ中去消费
发送完消息后就无需考虑之后数据库的修改了,这些数据库修改的工作就交给MQ的消费者去做。
此外还可使用Redis的set数据结构,加入限购(购买过无法再买)的功能
添加商品时,加入Redis
@Transactional(rollbackFor = Exception.class)
public void addGood(Good good){//存入MySQLint result = goodMapper.insert(good);if(result <= 0){throw new RuntimeException("插入数据库失败");}// 保存商品到Redis中stringRedisTemplate.opsForValue().set("stock:" + good.getId(), String.valueOf(good.getStock()));
}
测试一下
@Test
void addGood() {Good good = new Good();good.setGoodName("白色帽子装饰");good.setPrice(10L);good.setStock(100);good.setBeginTime(LocalDateTime.now());good.setEndTime(LocalDateTime.of(2023,4,1,0,0,0));goodService.addGood(good);}
可以看到加入成功
我们需要在脚本中对商品的库存和是否购买过进行判断
如果可以购买就返回 0
--- data
-- 商品ID
local goodId = ARGV[1]
-- 用户ID
local userId = ARGV[2]--- key
-- 商品Key
local stockKey = 'stock:' .. goodId
-- 订单Key
local orderKey = 'order:' .. goodId--- 业务
-- 判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then-- 库存不足,返回1return 1
end
-- 判断用户是否下单 SISMEMBER orderKey userId
if (redis.call('sismember', orderKey, userId) == 1) then-- 存在,说明是重复下单,返回2return 2
end
-- 扣库存
redis.call('incrby', stockKey, -1)
-- 下单
redis.call('sadd', orderKey, userId)return 0
需要生成一个雪花算法的订单ID,用于返回给用户
然后调用LUA脚本判断是否能购买
能购买的话,就发送消息给MQ
package com.qiuyu.service;/*** @author QiuYuSY* @create 2023-03-08 3:43*/
@Service
public class GoodService {@Resourceprivate GoodMapper goodMapper;@Resourceprivate UserMapper userMapper;@Resourceprivate OrderMapper orderMapper;@Resourceprivate StringRedisTemplate stringRedisTemplate;private static final DefaultRedisScript SECKILL_SCRIPT;static {// 加载lua脚本SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}@Transactionalpublic String buyGood(Long userId, Long goodId) {//MP提供的雪花算法工具类,获取一个IDLong orderId = IdWorker.getId();// 1.执行lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),goodId.toString(), userId.toString());// 判断有无购买资格if(result != 0){return result == 1 ? "库存不足" : "不能重复下单";}// 2. 发送到MQ// 3. 返回订单IDreturn orderId.toString();}//...
}
测试一下,可以看到库存成功-1
再次购买时,无法购买
选一种喜欢的MQ完成消费即可,这里用Kafka演示
导入依赖
org.springframework.kafka spring-kafka
配置
spring: kafka:bootstrap-servers: 192.168.222.128:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: test-consumer-group #根据comsumer.properties配置文件中填写enable-auto-commit: true #是否自动提交消费者的偏移量auto-commit-interval: 3000 #3秒提交一次key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
编写Event类
采用链式方式编程,写数据的时候舒服点
package com.qiuyu.entity;import lombok.Getter;/*** @author QiuYuSY* @create 2023-03-09 2:24*/
@Getter
public class Event {private String topic; //主题private Long userId; //用户IDprivate Long goodId; //商品IDprivate Long orderId; //订单IDpublic Event setTopic(String topic) {this.topic = topic;return this;}public Event setUserId(Long userId) {this.userId = userId;return this;}public Event setGoodId(Long goodId) {this.goodId = goodId;return this;}public Event setOrderId(Long orderId) {this.orderId = orderId;return this;}@Overridepublic String toString() {return "Event{" +"topic='" + topic + '\'' +", userId=" + userId +", goodId=" + goodId +", orderId=" + orderId +'}';}
}
编写生产者
把Event转json然后发过去就行
@Component
public class EventProducer {@Resourceprivate KafkaTemplate kafkaTemplate;@Resourceprivate ObjectMapper objectMapper;public void fireEvent(Event event) throws JsonProcessingException {// 将事件发布到指定的主题,内容为event对象转化的json格式字符串kafkaTemplate.send(event.getTopic(), objectMapper.writeValueAsString(event));}
}
编写消费者
拿到消息里的信息,然后消费,写到数据库中
package com.qiuyu.event;@Component
@Slf4j
public class EventConsumer {@Resourceprivate ObjectMapper objectMapper;@Resourceprivate GoodMapper goodMapper;@Resourceprivate UserMapper userMapper;@Resourceprivate OrderMapper orderMapper;@KafkaListener(topics = {"buy"})public void handleCommentMessage(ConsumerRecord record) throws JsonProcessingException {if (record == null || record.value() == null) {log.error("消息的内容为空!");return;}// 将record.value字符串格式转化为Event对象Event event = objectMapper.readValue((String) record.value(), Event.class);if (event == null) {log.error("消息格式错误!");return;}writeToDB(event);}// 写入数据库@Transactional(rollbackFor = Exception.class)public void writeToDB(Event event) {Long goodId = event.getGoodId();Long userId = event.getUserId();Long orderId = event.getOrderId();// 根据商品id找到商品Good good = goodMapper.selectOne(new QueryWrapper().eq("id", goodId));if (good == null) {return;}//1. 扣除积分int id = userMapper.updatePoint(userId, good.getPrice());if (id <= 0) {throw new RuntimeException("积分不足,无法购买");}//2. 购买商品int update = goodMapper.updateStock(goodId, 1);if (update <= 0) {throw new RuntimeException("库存发生变化,购买失败");}//3. 生成订单Order order = new Order();order.setId(orderId);order.setUserId(userId);order.setGoodId(goodId);order.setCreateTime(LocalDateTime.now());int insert = orderMapper.insert(order);if (insert <= 0) {throw new RuntimeException("生成订单失败");}}
}
Service层
加入发送消息即可
@Transactional
public String buyGood(Long userId, Long goodId) throws JsonProcessingException {//MP提供的雪花算法工具类,获取一个IDLong orderId = IdWorker.getId();// 1.执行lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),goodId.toString(), userId.toString());// 判断有无购买资格if(result != 0){return result == 1 ? "库存不足" : "不能重复下单";}// 2. 发送到MQEvent event = new Event();event.setTopic("buy").setGoodId(goodId).setOrderId(orderId).setUserId(userId);eventProducer.fireEvent(event);// 3. 返回订单IDreturn orderId.toString();
}
测试
发送一个post请求
订单ID相同,成功被消费
再测测并发的情况
写一个测试类,获取user表中所有user的userId,存入userId.txt中
package com.qiuyu;import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.qiuyu.entity.User;
import com.qiuyu.mapper.UserMapper;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;
import java.io.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;/*** @author QiuYuSY* @create 2023-03-09 3:05*/
@SpringBootTest
public class FileTest {@Resourceprivate UserMapper userMapper;@Testvoid writeFile(){List users = userMapper.selectList(new QueryWrapper().select("id"));List result = new ArrayList<>();users.forEach((user)->{result.add(user.getId().toString());});writeDataToTxtFile("userId.txt", "utf-8", result);}public static String writeDataToTxtFile(String filePath, String charset, List dataList) {File txtFile = new File(filePath);// txt文件PrintWriter txtWriter = null;// 输出流FileOutputStream fo = null;OutputStreamWriter os = null;try {if (dataList.size() != 0) {// 如果文件不存在就新建文件if (!txtFile.exists()) {txtFile.createNewFile();}// 获取流fo = new FileOutputStream(txtFile);os = new OutputStreamWriter(fo, charset);txtWriter = new PrintWriter(os);// 遍历并输出数据到文件,并防止空行for (int i = 0; i < dataList.size(); i++) {if (i == dataList.size() - 1) {txtWriter.print(dataList.get(i));} else {txtWriter.println(dataList.get(i));}}txtWriter.flush();return "写入成功";} else {System.err.println("数据为空");return null;}} catch (IOException e) {e.printStackTrace();System.err.println("IOException异常:" + e.getCause().getMessage());return null;} finally {try {if (txtWriter != null) {txtWriter.close();}if (fo != null) {fo.close();}if (os != null) {os.close();}} catch (IOException e) {e.printStackTrace();System.err.println("文件流关闭异常:"+e.getCause().getMessage());}}}
}
然后在jmeter中加入txt
搞了半天鼠鼠要开测了 嘿嘿
成功!
再来看看redis中
也没毛病老铁们