# 一、高效分布式锁

当我们在设计分布式锁的时候,我们应该考虑分布式锁至少要满足的一些条件,同时考虑如何高效的设计分布式锁,这里我认为以下几点是必须要考虑的。

# 1、互斥

在分布式高并发的条件下,我们最需要保证,同一时刻只能有一个线程获得锁,这是最基本的一点。

# 2、防止死锁

在分布式高并发的条件下,比如有个线程获得锁的同时,还没有来得及去释放锁,就因为系统故障或者其它原因使它无法执行释放锁的命令,导致其它线程都无法获得锁,造成死锁。所以分布式非常有必要设置锁的有效时间,确保系统出现故障后,在一定时间内能够主动去释放锁,避免造成死锁的情况。

# 3、性能

对于访问量大的共享资源,需要考虑减少锁等待的时间,避免导致大量线程阻塞。

所以在锁的设计时,需要考虑两点:
1、锁的颗粒度要尽量小。比如你要通过锁来减库存,那这个锁的名称你可以设置成是商品的ID,而不是任取名称。这样这个锁只对当前商品有效,锁的颗粒度小。
2、锁的范围尽量要小。比如只要锁2行代码就可以解决问题的,那就不要去锁10行代码了。

# 4、重入

我们知道ReentrantLock是可重入锁,那它的特点就是:同一个线程可以重复拿到同一个资源的锁。重入锁非常有利于资源的高效利用。关于这点之后会做演示。

针对以上 Redisson都能很好的满足,下面就来分析下它。

# 二、Redisson原理分析

为了更好的理解分布式锁的原理,我这边自己画张图通过这张图来分析。
分布式锁

# 1、加锁机制

线程去获取锁,获取成功: 执行 lua脚本,保存数据到 redis数据库。

线程去获取锁,获取失败: 一直通过 while循环尝试获取锁,获取成功后,执行 lua脚本,保存数据到 redis数据库。

# 2、watch dog自动延期机制

在一个分布式环境下,假如一个线程获得锁后,突然服务器宕机了,那么这个时候在一定时间后这个锁会自动释放,你也可以设置锁的有效时间(不设置默认30秒),这样的目的主要是防止死锁的发生。

但在实际开发中会有下面一种情况:

//设置锁1秒过去
redissonLock.lock("redisson", 1);
/**
 * 业务逻辑需要咨询2秒
 */
redissonLock.release("redisson");

/**
* 线程1 进来获得锁后,线程一切正常并没有宕机,但它的业务逻辑需要执行2秒,这就会有个问题,在 线程1 执行1秒后,这个锁就自动过期了,
* 那么这个时候 线程2 进来了。那么就存在 线程1和线程2 同时在这段业务逻辑里执行代码,这当然是不合理的。
* 而且如果是这种情况,那么在解锁时系统会抛异常,因为解锁和加锁已经不是同一线程了,具体后面代码演示。
*/
1
2
3
4
5
6
7
8
9
10
11
12

所以这个时候看门狗就出现了,它的作用就是 线程1 业务还没有执行完,时间就过了,线程1 还想持有锁的话,就会启动一个 watch dog后台线程,不断的延长锁 key的生存时间。注意:正常这个看门狗线程是不启动的,还有就是这个看门狗启动后对整体性能也会有一定影响,所以不建议开启看门狗

# 3、为啥要用 lua脚本呢?

这个不用多说,主要是如果你的业务逻辑复杂的话,通过封装在 lua脚本中发送给redis,而且 redis是单线程的,这样就保证这段复杂业务逻辑执行的原子性

# 4、可重入加锁机制

Redisson可以实现可重入加锁机制的原因,我觉得跟两点有关:

1、Redis存储的数据类型是 Hash类型 2、Hash数据类型的key值包含了当前线程信息

下面是redis存储的数据
分布式锁

这里表面数据类型是 Hash类型,Hash类型相当于我们 java的 <key,<key1,value>> 类型,这里key是指 'redisson'

它的有效期还有9秒,我们再来看里们的 key值为 078e44a3-5f95-4e24-b6aa-80684655a15a:45它的组成是:

guid + 当前线程的ID。后面的 value是就和可重入加锁有关。

举图说明

分布式锁

上面这图的意思就是可重入锁的机制,它最大的优点就是相同线程不需要在等待锁,而是可以直接进行相应操作。

# 5、Redis分布式锁的缺点

Redis分布式锁会有个缺陷,就是在 Redis哨兵模式下:

客户端1 对某个master节点写入了 redisson锁,此时会异步复制给对应的 slave节点。但是这个过程中一旦发生 master节点宕机,主备切换,slave节点从变为了 master节点。

这时客户端2 来尝试加锁的时候,在新的 master节点上也能加锁,此时就会导致多个客户端对同一个分布式锁完成了加锁。

这时系统在业务语义上一定会出现问题,导致各种脏数据的产生

缺陷哨兵模式或者主从模式下,如果 master实例宕机的时候,可能导致多个客户端同时完成加锁

# 三、RLock接口

# 1、概念

public interface RLock extends Lock, RExpirable, RLockAsync
1

很明显 RLock是继承 Lock锁,所以他有 Lock锁的所有特性,比如lock、unlock、trylock等特性,同时它还有很多新特性:强制锁释放,带有效期的锁,。

# 2、RLock锁API

这里针对上面做个整理,这里列举几个常用的接口说明

public interface RRLock {
    //----------------------Lock接口方法-----------------------

    /**
     * 加锁 锁的有效期默认30秒
     */
    void lock();
    /**
     * tryLock()方法是有返回值的,它表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false .
     */
    boolean tryLock();
    /**
     * tryLock(long time, TimeUnit unit)方法和tryLock()方法是类似的,只不过区别在于这个方法在拿不到锁时会等待一定的时间,
     * 在时间期限之内如果还拿不到锁,就返回false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回true。
     *
     * @param time 等待时间
     * @param unit 时间单位 小时、分、秒、毫秒等
     */
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    /**
     * 解锁
     */
    void unlock();
    /**
     * 中断锁 表示该锁可以被中断 假如A和B同时调这个方法,A获取锁,B为获取锁,那么B线程可以通过
     * Thread.currentThread().interrupt(); 方法真正中断该线程
     */
    void lockInterruptibly();

    //----------------------RLock接口方法-----------------------
    /**
     * 加锁 上面是默认30秒这里可以手动设置锁的有效时间
     *
     * @param leaseTime 锁有效时间
     * @param unit      时间单位 小时、分、秒、毫秒等
     */
    void lock(long leaseTime, TimeUnit unit);
    /**
     * 这里比上面多一个参数,多添加一个锁的有效时间
     *
     * @param waitTime  等待时间
     * @param leaseTime 锁有效时间
     * @param unit      时间单位 小时、分、秒、毫秒等
     */
    boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
    /**
     * 检验该锁是否被线程使用,如果被使用返回True
     */
    boolean isLocked();
    /**
     * 检查当前线程是否获得此锁(这个和上面的区别就是该方法可以判断是否当前线程获得此锁,而不是此锁是否被线程占有)
     * 这个比上面那个实用
     */
    boolean isHeldByCurrentThread();
    /**
     * 中断锁 和上面中断锁差不多,只是这里如果获得锁成功,添加锁的有效时间
     * @param leaseTime  锁有效时间
     * @param unit       时间单位 小时、分、秒、毫秒等
     */
    void lockInterruptibly(long leaseTime, TimeUnit unit);  
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

RLock相关接口,主要是新添加了 leaseTime 属性字段,主要是用来设置锁的过期时间,避免死锁。

# 四、RedissonLock实现类

public class RedissonLock extends RedissonExpirable implements RLock
1

RedissonLock实现了 RLock接口,所以实现了接口的具体方法。这里我列举几个方法说明下

# 1、void lock()方法

@Override
public void lock() {
    try {
        lockInterruptibly();
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}
1
2
3
4
5
6
7
8

发现 lock锁里面进去其实用的是lockInterruptibly(中断锁,表示可以被中断),而且捕获异常后用 Thread.currentThread().interrupt()来真正中断当前线程,其实它们是搭配一起使用的。

具体有关 lockInterruptibly()方法讲解推荐一个博客。博客Lock的lockInterruptibly() (opens new window)

接下来执行流程,这里理下关键几步

/**
 * 1、带上默认值调另一个中断锁方法
 */
@Override
public void lockInterruptibly() throws InterruptedException {
    lockInterruptibly(-1, null);
}
/**
 * 2、另一个中断锁的方法
 */
void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException 
/**
 * 3、这里已经设置了锁的有效时间默认为30秒  (commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout()=30)
 */
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
/**
 * 4、最后通过lua脚本访问Redis,保证操作的原子性
 */
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);

    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "return redis.call('pttl', KEYS[1]);",
            Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

那么void lock(long leaseTime, TimeUnit unit)方法其实和上面很相似了,就是从上面第二步开始的。

# 2、tryLock(long waitTime, long leaseTime, TimeUnit unit)

接口的参数和含义上面已经说过了,现在我们开看下源码,这里只显示一些重要逻辑。

@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    //1、 获取锁同时获取成功的情况下,和lock(...)方法是一样的 直接返回True,获取锁False再往下走
    if (ttl == null) {
        return true;
    }
    //2、如果超过了尝试获取锁的等待时间,当然返回false 了。
    time -= System.currentTimeMillis() - current;
    if (time <= 0) {
        acquireFailed(threadId);
        return false;
    }

    // 3、订阅监听redis消息,并且创建RedissonLockEntry,其中RedissonLockEntry中比较关键的是一个 Semaphore属性对象,用来控制本地的锁请求的信号量同步,返回的是netty框架的Future实现。
    final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
    //  阻塞等待subscribe的future的结果对象,如果subscribe方法调用超过了time,说明已经超过了客户端设置的最大wait time,则直接返回false,取消订阅,不再继续申请锁了。
    //  只有await返回true,才进入循环尝试获取锁
    if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
        if (!subscribeFuture.cancel(false)) {
            subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
                @Override
                public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                    if (subscribeFuture.isSuccess()) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                }
            });
        }
        acquireFailed(threadId);
        return false;
    }

   //4、如果没有超过尝试获取锁的等待时间,那么通过While一直获取锁。最终只会有两种结果
    //1)、在等待时间内获取锁成功 返回true。2)等待时间结束了还没有获取到锁那么返回false。
    while (true) {
        long currentTime = System.currentTimeMillis();
        ttl = tryAcquire(leaseTime, unit, threadId);
        // 获取锁成功
        if (ttl == null) {
            return true;
        }
       //   获取锁失败
        time -= System.currentTimeMillis() - currentTime;
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

重点 tryLock一般用于特定满足需求的场合,但不建议作为一般需求的分布式锁,一般分布式锁建议用 void lock(long leaseTime, TimeUnit unit)。因为从性能上考虑,在高并发情况下后者效率是前者的好几倍

# 3、unlock()

解锁的逻辑很简单。

@Override
public void unlock() {
    // 1.通过 Lua 脚本执行 Redis 命令释放锁
    Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE,
            RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                    "end;" +
                    "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                    "end; " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                    "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; "+
                    "end; " +
                    "return nil;",
            Arrays.<Object>asList(getName(), getChannelName()),
            LockPubSub.unlockMessage, internalLockLeaseTime,
            getLockName(Thread.currentThread().getId()));
    // 2.非锁的持有者释放锁时抛出异常
    if (opStatus == null) {
        throw new IllegalMonitorStateException(
                "attempt to unlock lock, not locked by current thread by node id: "
                        + id + " thread-id: " + Thread.currentThread().getId());
    }
    // 3.释放锁后取消刷新锁失效时间的调度任务
    if (opStatus) {
        cancelExpirationRenewal();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

使用 EVAL 命令执行 Lua 脚本来释放锁:

  1. key 不存在,说明锁已释放,直接执行 publish 命令发布释放锁消息并返回 1
  2. key 存在,但是 field 在 Hash 中不存在,说明自己不是锁持有者,无权释放锁,返回 nil
  3. 因为锁可重入,所以释放锁时不能把所有已获取的锁全都释放掉,一次只能释放一把锁,因此执行 hincrby 对锁的值减一
  4. 释放一把锁后,如果还有剩余的锁,则刷新锁的失效时间并返回 0;如果刚才释放的已经是最后一把锁,则执行 del 命令删除锁的 key,并发布锁释放消息,返回 1

注意这里有个实际开发过程中,容易出现很容易出现上面第二步异常,非锁的持有者释放锁时抛出异常。比如下面这种情况

//设置锁1秒过去
redissonLock.lock("redisson", 1);
/**
 * 业务逻辑需要咨询2秒
 */
redissonLock.release("redisson");
/**
* 线程1 进来获得锁后,线程一切正常并没有宕机,但它的业务逻辑需要执行2秒,这就会有个问题,在 线程1 执行1秒后,这个锁就自动过期了,
* 那么这个时候 线程2 进来了。在线程1去解锁就会抛上面这个异常(因为解锁和当前锁已经不是同一线程了)
*/
1
2
3
4
5
6
7
8
9
10

# 五、Redisson 项目落地

# 1、技术架构

项目总体技术选型

SpringBoot2.1.5 + Maven3.5.4 + Redisson3.5.4 + lombok(插件)
1

# 2、加锁方式

该项目支持 自定义注解加锁常规加锁 两种模式

自定义注解加锁

@DistributedLock(value="goods", leaseTime=5)
public String lockDecreaseStock(){
   //业务逻辑
}
1
2
3
4

常规加锁

//1、加锁
redissonLock.lock("redisson", 10);
//2、业务逻辑
//3、解锁
redissonLock.unlock("redisson");
1
2
3
4
5

# 3、Redis部署方式

该项目支持四种 Redis部署方式

TIP

1、单机模式部署
2、集群模式部署
3、主从模式部署
4、哨兵模式部署

该项目已经实现支持上面四种模式,你要采用哪种只需要修改配置文件 application.properties,项目代码不需要做任何修改。

# 4、项目整体结构

redis-distributed-lock-core # 核心实现
|
---src
      |
      ---com.jincou.redisson
                           |# 通过注解方式 实现分布式锁
                           ---annotation
                           |# 配置类实例化RedissonLock
                           ---config
                           |# 放置常量信息
                           ---constant
                           |# 读取application.properties信息后,封装到实体
                           ---entity    
                           |# 支持单机、集群、主从、哨兵 代码实现
                           ---strategy

redis-distributed-lock-web-test # 针对上面实现类的测试类
|
---src
      |
      ---java
            |
            ---com.jincou.controller
                                 |# 测试 基于注解方式实现分布式锁
                                 ---AnnotatinLockController.java
                                 |# 测试 基于常规方式实现分布式锁
                                 ---LockController.java
      ---resources                
           | # 配置端口号 连接redis信息(如果确定部署类型,那么将连接信息放到core项目中)
            ---application.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

# 六、测试

模拟1秒内100个线程请求接口,来测试结果是否正确。同时测试3中不同的锁:lock锁、trylock锁、注解锁。

# 1、lock锁

/**
 * 模拟这个是商品库存
 */
public static volatile Integer TOTAL = 10;

@GetMapping("lock-decrease-stock")
public String lockDecreaseStock() throws InterruptedException {
    redissonLock.lock("lock", 10);
    if (TOTAL > 0) {
        TOTAL--;
    }
    Thread.sleep(50);
    log.info("======减完库存后,当前库存===" + TOTAL);
    //如果该线程还持有该锁,那么释放该锁。如果该线程不持有该锁,说明该线程的锁已到过期时间,自动释放锁
    if (redissonLock.isHeldByCurrentThread("lock")) {
       redissonLock.unlock("lock");
    }
    return "=================================";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

压测结果

分布式锁

没问题,不会超卖!

# 2、tryLock锁

/**
 * 模拟这个是商品库存
 */
public static volatile Integer TOTAL = 10;

@GetMapping("trylock-decrease-stock")
public String trylockDecreaseStock() throws InterruptedException {
    if (redissonLock.tryLock("trylock", 10, 5)) {
        if (TOTAL > 0) {
            TOTAL--;
        }
        Thread.sleep(50);
        redissonLock.unlock("trylock");
        log.info("====tryLock===减完库存后,当前库存===" + TOTAL);
    } else {
        log.info("[ExecutorRedisson]获取锁失败");
    }
    return "===================================";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

测试结果

分布式锁

没有问题 ,不会超卖!

# 3、注解锁

/**
 * 模拟这个是商品库存
 */
public static volatile Integer TOTAL = 10;

@GetMapping("annotatin-lock-decrease-stock")
@DistributedLock(value="goods", leaseTime=5)
public String lockDecreaseStock() throws InterruptedException {
    if (TOTAL > 0) {
        TOTAL--;
    }
    log.info("===注解模式=== 减完库存后,当前库存===" + TOTAL);
    return "=================================";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

测试结果

分布式锁

没有问题 ,不会超卖!

通过实验可以看出,通过这三种模式都可以实现分布式锁,然后呢?哪个最优。

# 七、三种锁的锁选择

#

观点 最完美的就是lock锁,因为

TIP

tryLock锁是可能会跳过减库存的操作 ,因为当过了等待时间还没有获取锁,就会返回false,这显然很致命!
注解锁只能用于方法上,颗粒度太大,满足不了方法内加锁。

# 1、lock PK tryLock 性能的比较

模拟5秒内1000个线程分别去压测这两个接口,看报告结果!

1)lock锁

压测结果 1000个线程平均响应时间为31324。吞吐量 14.7/sec

分布式锁

# 2)tryLock锁

压测结果 1000个线程平均响应时间为28628。吞吐量 16.1/sec

分布式锁

这里只是单次测试,有很大的随机性。从当前环境单次测试来看,tryLock稍微高点。

# 2、常见异常 attempt to unlock lock, not ······

在使用RedissonLock锁时,很容易报这类异常,比如如下操作

//设置锁1秒过去
redissonLock.lock("redisson", 1);
/**
 * 业务逻辑需要咨询2秒
 */
redissonLock.release("redisson");
1
2
3
4
5
6

上面在并发情况下就会这样

分布式锁 造成异常原因:
线程1 进来获得锁后,但它的业务逻辑需要执行2秒,在 线程1 执行1秒后,这个锁就自动过期了,那么这个时候 
线程2 进来了获得了锁。在线程1去解锁就会抛上面这个异常(因为解锁和当前锁已经不是同一线程了)
1
2

所以我们需要注意,设置锁的过期时间不能设置太小,一定要合理,宁愿设置大点。

正对上面的异常,可以通过 isHeldByCurrentThread()方法,

//如果为false就说明该线程的锁已经自动释放,无需解锁
if (redissonLock.isHeldByCurrentThread("lock")) {
        redissonLock.unlock("lock");
}
1
2
3
4

这里只是单次测试,有很大的随机性。从当前环境单次测试来看,tryLock稍微高点。

# 2、常见异常 attempt to unlock lock, not ······

在使用RedissonLock锁时,很容易报这类异常,比如如下操作

//设置锁1秒过去
redissonLock.lock("redisson", 1);
/**
 * 业务逻辑需要咨询2秒
 */
redissonLock.release("redisson");
1
2
3
4
5
6

线程1 进来获得锁后,但它的业务逻辑需要执行2秒,在 线程1 执行1秒后,这个锁就自动过期了,那么这个时候
线程2 进来了获得了锁。在线程1去解锁就会抛上面这个异常(因为解锁和当前锁已经不是同一线程了)

所以我们需要注意,设置锁的过期时间不能设置太小,一定要合理,宁愿设置大点。 正对上面的异常,可以通过 isHeldByCurrentThread()方法,

//如果为false就说明该线程的锁已经自动释放,无需解锁
if (redissonLock.isHeldByCurrentThread("lock")) {
        redissonLock.unlock("lock");
}
1
2
3
4
(adsbygoogle = window.adsbygoogle || []).push({});