记录生活中的点点滴滴

0%

Redis分布式锁

什么是分布式锁?

分布式微服务架构,拆分后各个微服务之间为了避免冲突和数据故障而加入的一把锁,叫分布式锁。

分布式锁的实现可以用mysql、redis去实现,本文主要将redis去实现分布式锁的一些问题,一层一层去看,去总结。

我们先搭建一个SpringBoot+Redis的环境,然后模拟在redis中设置库存:

1
set goods:001 100

然后暴露接口,每次访问让库存减一。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@RestController
public class HelloController {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@GetMapping("/hello")
public String hello(){
String res = stringRedisTemplate.opsForValue().get("goods:001");
int goodNumber = Integer.parseInt(res);
if(goodNumber>0){
int realNumber = goodNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001",String.valueOf(realNumber));
System.out.println("成功买到商品,还剩"+realNumber+"件!");
return "成功买到商品,还剩"+realNumber+"件!";
}else {
System.out.println("失败");
}

return "失败";
}
}

这是最简单的,单击也会出现超卖问题,OK,改进一下,利用同步机制,修改一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
synchronized (this){
String res = stringRedisTemplate.opsForValue().get("goods:001");
int goodNumber = Integer.parseInt(res);
if(goodNumber>0){
int realNumber = goodNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001",String.valueOf(realNumber));
System.out.println("成功买到商品,还剩"+realNumber+"件!");
return "成功买到商品,还剩"+realNumber+"件!";
}else {
System.out.println("失败");
}
return "失败";
}

这样,单击可以解决问题,但是还是不能解决分布式情况下超卖问题。

OK,我们可以用一下redis给我们提供的SETNX指令,实现分布式锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public static final String REDIS_LOCK = "gsLock";

@GetMapping("/hello")
public String hello(){
//先声明一个唯一的value
String value = UUID.randomUUID().toString() + Thread.currentThread().getName();
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(REDIS_LOCK, value);
if(!flag){
return "抢锁失败";
}

try {
String res = stringRedisTemplate.opsForValue().get("goods:001");
int goodNumber = Integer.parseInt(res);
if(goodNumber>0){
int realNumber = goodNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001",String.valueOf(realNumber));
System.out.println("成功买到商品,还剩"+realNumber+"件!");
return "成功买到商品,还剩"+realNumber+"件!";
}else {
System.out.println("失败");
}
return "失败";
}finally {
stringRedisTemplate.delete(REDIS_LOCK);
}
}

但是如果程序还没有执行finally,即还没有释放锁,此时突然宕机了,这个key就没有释放,所以需要我们去设置一个过期时间。

1
2
3
String value = UUID.randomUUID().toString() + Thread.currentThread().getName();
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(REDIS_LOCK, value);
stringRedisTemplate.expire(REDIS_LOCK,10L, TimeUnit.SECONDS);

但是,我们要考虑原子性,所以应该如下写:

1
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(REDIS_LOCK, value,10L,TimeUnit.SECONDS);

这样其实还有问题,就是如果一个线程的锁有效期是10s,但是它执行了15s,就是比10s还要多,此时它已经释放锁了,但是它最后还会执行finally的代码,即释放了别人的锁。所以还需要判断一下:

1
2
3
4
5
finally {
if(stringRedisTemplate.opsForValue().get(REDIS_LOCK).equalsIgnoreCase(value)){
stringRedisTemplate.delete(REDIS_LOCK);
}
}

但是这又会有一个问题,就是判断和删除不是原子性的,还是有问题!

怎样解决?两种方法,一般用Lua脚本解决,这也是Redis官网上推荐的,但是面试会问到第二种,就是用Redis本身的事务。

  • Lua脚本

  • Redis本身的事务

我们先用一下事务实现,先复习一下命令:

释放的代码该改成如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
while (true){
stringRedisTemplate.watch(REDIS_LOCK);
if(stringRedisTemplate.opsForValue().get(REDIS_LOCK).equalsIgnoreCase(value)){
stringRedisTemplate.setEnableTransactionSupport(true);
stringRedisTemplate.multi();
stringRedisTemplate.delete(REDIS_LOCK);
List<Object> list = stringRedisTemplate.exec();
if(list==null){
continue;
}
}
stringRedisTemplate.unwatch();
break;
}

下面我们用Lua脚本去做:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//用Lua脚本
Jedis jedis = JedisUtils.getJedis();
jedis.auth("123456");
String script = "if redis.call('get',KEYS[1]) == ARGV[1] " +
"then " +
"return redis.call('del',KEYS[1]) " +
"else " +
" return 0 " +
"end";
try {
Object o = jedis.eval(script, Collections.singletonList(REDIS_LOCK), Collections.singletonList(value));
if("1".equals(o.toString())){
System.out.println("-----del lock success");
}else{
System.out.println("-----del lock error");
}
} finally {
if(null!=jedis){
jedis.close();
}
}

这样的话,已经其实可以了,但是其实还存在一个问题。我们的Redis一般都是一个集群,就按照如下的一主二从:

如果我们的主机宕机了,但是此时还没有把数据同步给从机,那么此时主机宕机之后,从机变成了主机,所以就会掉成锁的丢失。

这个其实就需要拿我们的Redis和Zookeeper做一个比较,Redis是AP,强调可用性,Zookeeper是CP,强调一致性,它只有主节点把数据同步给了从机才会返回,所以性能没有Redis好。

正是因为在Redis集群中,我们自己写的也不OK,所以出现了Redisson!

使用如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//先声明一个唯一的value
String value = UUID.randomUUID().toString() + Thread.currentThread().getName();

Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.138.129:6379").setPassword("123456").setDatabase(0);
// 构造RedissonClient
RedissonClient redissonClient = Redisson.create(config);
// 设置锁定资源名称
RLock lock = redissonClient.getLock(REDIS_LOCK);

lock.lock();
xxxx
}finally {
lock.unlock();
}

OK,附一下Redisson的原理:Redisson 实现分布式锁原理分析