Redis Series II — Реализация распределенных блокировок с помощью команд Redis и Lua

Redis

1 Обзор

В распределенных системах, если задействованы операции с одними и теми же ресурсами, часто используется распределенная блокировка. Redis — это однопроцессный и однопоточный режим.С помощью команды Redis SETNX GET может легко реализовать распределенные блокировки. В этой статье сначала реализуются распределенные блокировки с помощью команд Redis, представлена ​​основная бизнес-логика реализации и указаны ее недостатки. Затем с помощью lua-скриптов реализуются распределенные блокировки, чтобы компенсировать их недостатки. Наконец, ab используется для стресс-тестирования блокировок, реализованных двумя, чтобы сравнить их производительность.

2. Используйте команду redis для реализации распределенных блокировок

2.1. SETNX

Синтаксис: значение ключа SETNX

  • Если ключ не существует, сохраните значение (ключ:значение) и верните 1.
  • Если ключ больше не существует, операция не выполняется и возвращается 0

Из-за характера этой команды только один поток может изменить значение ключа, когда конкурируют несколько потоков. Используя это, можно реализовать функцию взаимного исключения блокировки.

2.2. ILock и DistributeLock

Определить блокировку: основной метод имеет две блокировки и разблокировки.

/**
 1. 定义锁
 2. @author hry
 3.  */
public interface ILock {
    /**
     * 获取锁
     * @param lock 锁名称
     */
    void lock(String lock);

    /**
     * 释放锁
     * @param lock 锁名称
     */
    void unlock(String lock);
}

Конкретный класс реализации ILock DistributeLock:

  1. ThreadLocal threadId: сохраните значение UUID каждой блокировки потока через threadId, который используется для определения того, принадлежит ли текущая блокировка себе, и значение блокировки также сохраняет это значение.
  2. Основная логика блокировки: Установить значение lockKey через setIfAbsent из BoundValueOperations (на самом деле setIfAbsent — это команда, которая инкапсулирует SETNX), если она возвращает true, значит, блокировка получена, если возвращает false, то входит в ожидание.
  3. Основная логика разблокировки: снимите блокировку с помощью redisTemplate.delete. Перед освобождением блокировки необходимо судить, что текущая блокировка принадлежит текущему потоку, если это так, выполнить освобождение блокировки, иначе она не будет выполняться.
  4. Избегайте взаимоблокировки: если поток A внезапно умирает после получения блокировки, прежде чем снять блокировку, другие потоки не могут снова получить блокировку, что приводит к взаимоблокировке. Чтобы избежать взаимоблокировки, после того, как мы получим блокировку, нам нужно установить срок действия блокировки, даже если владелец блокировки умрет, блокировка может быть автоматически снята.
  5. Блокировки могут быть реентерабельными: после того, как поток A получит блокировку, если он снова выполнит блокировку, он может получить блокировку снова, вместо того, чтобы стоять в очереди в ожидании блокировки; если текущий поток уже получил блокировку, он должен быть может получить блокировку, если он снова запросит блокировку.

Смотрите подробную реализацию кода:

/**
 * 通过redis实现分布锁
 * @author hry
 *
 */
public class DistributeLock implements ILock {
    private static final Logger logger  = LoggerFactory.getLogger(DistributeLock.class);

    private static final int LOCK_MAX_EXIST_TIME = 5;  // 单位s,一个线程持有锁的最大时间
    private static final String LOCK_PREX = "lock_"; // 作为锁的key的前缀

    private StringRedisTemplate redisTemplate;
    private String lockPrex; // 做为锁key的前缀
    private int lockMaxExistTime; // 单位s,一个线程持有锁的最大时间

    private ThreadLocal<String> threadId = new ThreadLocal<String>();  // 线程变量

    public DistributeLock(StringRedisTemplate redisTemplate){
        this(redisTemplate, LOCK_PREX, LOCK_MAX_EXIST_TIME);
    }

    public DistributeLock(StringRedisTemplate redisTemplate, String lockPrex, int lockMaxExistTime){
        this.redisTemplate = redisTemplate;
        this.lockPrex = lockPrex;
        this.lockMaxExistTime = lockMaxExistTime;
    }

    @Override
    public void lock(String lock){
        Assert.notNull(lock, "lock can't be null!");
        String lockKey = getLockKey(lock);
        BoundValueOperations<String,String> keyBoundValueOperations = redisTemplate.boundValueOps(lockKey);     
        while(true){
            // 如果上次拿到锁的是自己,则本次也可以拿到锁:实现可重入
            String value = keyBoundValueOperations.get();
            // 根据传入的值,判断用户是否持有这个锁
            if(value != null && value.equals(String.valueOf(threadId.get()))){
                // 重置过期时间
                keyBoundValueOperations.expire(lockMaxExistTime, TimeUnit.SECONDS);
                break;
            }

            if(keyBoundValueOperations.setIfAbsent(lockKey)){
                // 每次获取锁时,必须重新生成id值
                String keyUniqueId = UUID.randomUUID().toString(); // 生成key的唯一值
                threadId.set(keyUniqueId);
                // 显设置value,再设置过期日期,否则过期日期无效
                keyBoundValueOperations.set(String.valueOf(keyUniqueId));
                // 为了避免一个用户拿到锁后,进行过程中没有正常释放锁,这里设置一个默认过期实际,这段非常重要,如果没有,则会造成死锁
                keyBoundValueOperations.expire(lockMaxExistTime, TimeUnit.SECONDS);
                // 拿到锁后,跳出循环
                break;
            }else{
                try {
                    // 短暂休眠,nano避免出现活锁 
                    Thread.sleep(10, (int)(Math.random() * 500));
                } catch (InterruptedException e) {
                    break;
                }
            }
        }
    }


    /**
     * 释放锁,同时要考虑当前锁是否为自己所有,以下情况会导致当前线程失去锁:线程执行的时间超过超时的时间,导致此锁被其它线程拿走; 此时用户不可以执行删除
     * 
     * 以上方法的缺陷:
     *  a. 在本线程获取值,判断锁本线程所有,但是在执行删除前,锁超时被释放同时被另一个线程获取,则本操作释放锁
     * 
     * 最终解决方案
     *  a. 使用lua脚本,保证检测和删除在同一事物中
     * 
     */
    @Override
    public void unlock(final String lock) {
        final String lockKey = getLockKey(lock);
        BoundValueOperations<String,String> keyBoundValueOperations = redisTemplate.boundValueOps(lockKey);
        String lockValue = keyBoundValueOperations.get();
        if(!StringUtils.isEmpty(lockValue) && lockValue.equals(threadId.get())){
            redisTemplate.delete(lockKey);
        }else{
            logger.warn("key=[{}]已经变释放了,本次不执行释放. 线程Id[{}] ", lock, lockValue);  
        }
    }

    /**
     * 生成key
     * @param lock
     * @return
     */
    private String getLockKey(String lock){
        StringBuilder sb = new StringBuilder();
        sb.append(lockPrex).append(lock);
        return sb.toString();
    }

}

2.3. ILockManager и SimpleRedisLockManager

ILockManager: инкапсулирует использование распределенных блокировок.

public interface ILockManager {
    /**
     * 通过加锁安全执行程序,无返回的数据
     * @param lockKeyName key名称
     * @param callback  
     */
    void lockCallBack(String lockKeyName, SimpleCallBack callback);
    /**
     * 通过加锁安全执行程序,有返回数据
     * @param lockKeyName
     * @param callback
     * @return
     */
    <T> T lockCallBackWithRtn(String lockKeyName, ReturnCallBack<T> callback);
}

SimpleRedisLockManager
Класс реализации ILockManager, инициализирующий блокировку, реализованную выше;
Этот класс инкапсулирует общий код, использующий блокировки, что упрощает использование распределенных блокировок.
Два метода обратного вызова определены для реализации реальной бизнес-логики пользователя.

  1. SimpleCallBack: функция обратного вызова без возвращаемого значения
  2. ReturnCallBack: функция обратного вызова с возвращаемыми данными
@Component
public class SimpleRedisLockManager implements ILockManager {   

    @Autowired
    protected StringRedisTemplate redisTemplate;

    protected ILock distributeLock; // 分布锁

    @PostConstruct
    public void init(){
        // 初始化锁
        distributeLock = new DistributeLock(redisTemplate, "mylock_", 5);
    }

    @Override
    public void lockCallBack(String lockKeyName, SimpleCallBack callback){
        Assert.notNull("lockKeyName","lockKeyName 不能为空");
        Assert.notNull("callback","callback 不能为空");
        try{
            // 获取锁
            distributeLock.lock(lockKeyName);
            callback.execute();
        }finally{
            // 必须释放锁
            distributeLock.unlock(lockKeyName);
        }
    }

    @Override
    public <T> T lockCallBackWithRtn(String lockKeyName, ReturnCallBack<T> callback){
        Assert.notNull("lockKeyName","lockKeyName 不能为空");
        Assert.notNull("callback","callback 不能为空");
        try{
            // 获取锁
            distributeLock.lock(lockKeyName);
            return callback.execute();
        }finally{
            // 必须释放锁
            distributeLock.unlock(lockKeyName);
        }
    }
}

/**
 * 无返回值的回调函数
 * @author hry
 *
 */
public interface SimpleCallBack {
    void execute();
}

/**
 * 有返回数据的回调函数
 * 
 * @author hry
 *
 * @param <T>
 */
public interface ReturnCallBack<T> {
    T execute();
}

2.4 Код, который реально использует блокировку TestCtrl

очень прост в использовании

@Autowired
private SimpleRedisLockManager simpleRedisLockManager;

 simpleRedisLockManager.lockCallBack("distributeLock" + ThreadLocalRandom.current().nextInt(1000), new SimpleCallBack() {
    @Override
    public void execute() {
        System.out.println("lockCallBack");
    }
});

2.5 Вышеупомянутая реализация блокировки все еще имеет недостатки

  1. Если поток A получает блокировку дольше указанного времени и не завершился, Redis автоматически снимет блокировку в это время. В это время поток B получает блокировку, затем поток A и поток B получают блокировку одновременно. В этом случае ее можно решить, установив разумный тайм-аут.
  2. Если уровень параллелизма высок, несколько потоков могут одновременно удерживать блокировки. Это связано с тем, что методы блокировки и разблокировки DistributeLock выполняют несколько операторов, и эти операторы не являются транзакционными. Например, когда поток A разблокирован, он узнает, что ему принадлежит блокировка, с помощью метода get, а затем выполняет операцию снятия блокировки. Между этими двумя операциями Redis обнаруживает, что срок действия блокировки истек, и автоматически удаляет блокировку.В это время поток B подает заявку на блокировку и получает ее. В это время поток A выполняет операцию удаления блокировки, и поток C также может получить блокировку. В это время потоки B и C получают блокировку одновременно. Эта ситуация может быть решена следующим методом lua

3. Используйте скрипт lua для реализации распределенной блокировки

Реализация вышеуказанной блокировки проблематична, ключ в том, что несколько выполняемых операторов не входят в транзакцию. И lua, представленный в этом разделе, как раз может решить эту проблему.
Версии после redis 2.6.0 стали поддерживать lua-скрипты. Подробнее об использовании lua в Redis см.здесь. При выполнении lua-скрипта в redis, redis выполнит весь скрипт как единое целое и не будет вставляться другими командами посередине, решая проблему нескольких команд.

3.1. Lua-скрипт блокировки

скрипт блокировки: lock.lua

-- Set a lock
--  如果获取锁成功,则返回 1
local key     = KEYS[1]
local content = KEYS[2]
local ttl     = ARGV[1]
local lockSet = redis.call('setnx', key, content)
if lockSet == 1 then
  redis.call('pexpire', key, ttl)
--  redis.call('incr', "count")
else 
  -- 如果value相同,则认为是同一个线程的请求,则认为重入锁
  local value = redis.call('get', key)
  if(value == content) then
    lockSet = 1;
    redis.call('pexpire', key, ttl)
  end
end
return lockSet

Скрипт разблокировки: unlock.lua

-- unlock key
local key     = KEYS[1]
local content = KEYS[2]
local value = redis.call('get', key)
if value == content then
--  redis.call('decr', "count")
  return redis.call('del', key);
end
return 0

3.2. LuaDistributeLock

Реализация интерфейса ILock LuaDistributeLock реализует ту же бизнес-логику, что и DistributeLock. Здесь при создании LuaDistributeLock будет вызван метод init для инициализации скриптов блокировки и разблокировки, и будут сгенерированы соответствующие объекты DefaultRedisScript.Эти два объекта можно использовать повторно.Нет необходимости инициализировать объект каждый раз при блокировке/разблокировке выполняется.

public class LuaDistributeLock implements ILock {
    private static final int LOCK_MAX_EXIST_TIME = 5;  // 单位s,一个线程持有锁的最大时间
    private static final String LOCK_PREX = "lock_"; // 作为锁的key的前缀

    private StringRedisTemplate redisTemplate;
    private String lockPrex; // 做为锁key的前缀
    private int lockMaxExistTime; // 单位s,一个线程持有锁的最大时间
    private DefaultRedisScript<Long> lockScript; // 锁脚本
    private DefaultRedisScript<Long> unlockScript; // 解锁脚本

    // 线程变量
    private ThreadLocal<String> threadKeyId = new ThreadLocal<String>(){
        @Override
        protected String initialValue() {
            return UUID.randomUUID().toString();
        }
    };  

    public LuaDistributeLock(StringRedisTemplate redisTemplate){
        this(redisTemplate, LOCK_PREX, LOCK_MAX_EXIST_TIME);
    }

    public LuaDistributeLock(StringRedisTemplate redisTemplate, String lockPrex, int lockMaxExistTime){
        this.redisTemplate = redisTemplate;
        this.lockPrex = lockPrex;
        this.lockMaxExistTime = lockMaxExistTime;
        // init
        init();
    }

    /**
     * 生成
     */
    public void init() {
        // Lock script
        lockScript = new DefaultRedisScript<Long>();
        lockScript.setScriptSource(
            new ResourceScriptSource(new ClassPathResource("com/hry/spring/redis/distributedlock/lock/lock.lua")));
        lockScript.setResultType(Long.class);
        // unlock script
        unlockScript = new DefaultRedisScript<Long>();
        unlockScript.setScriptSource(
            new ResourceScriptSource(new ClassPathResource("com/hry/spring/redis/distributedlock/lock/unlock.lua")));
        unlockScript.setResultType(Long.class);
    }

    @Override
    public void lock(String lock2){
        Assert.notNull(lock2, "lock2 can't be null!");
        String lockKey = getLockKey(lock2);
        while(true){
            List<String> keyList = new ArrayList<String>();
            keyList.add(lockKey);
            keyList.add(threadKeyId.get());
            if(redisTemplate.execute(lockScript, keyList, String.valueOf(lockMaxExistTime * 1000)) > 0){
                break;
            }else{
                try {
                    // 短暂休眠,nano避免出现活锁 
                    Thread.sleep(10, (int)(Math.random() * 500));
                } catch (InterruptedException e) {
                    break;
                }
            }
        }
    }


    /**
     * 释放锁,同时要考虑当前锁是否为自己所有,以下情况会导致当前线程失去锁:线程执行的时间超过超时的时间,导致此锁被其它线程拿走; 此时用户不可以执行删除
     */
    @Override
    public void unlock(final String lock) {
        final String lockKey = getLockKey(lock);
        List<String> keyList = new ArrayList<String>();
        keyList.add(lockKey);
        keyList.add(threadKeyId.get());
        redisTemplate.execute(unlockScript, keyList);
    }

    /**
     * 生成key
     * @param lock
     * @return
     */
    private String getLockKey(String lock){
        StringBuilder sb = new StringBuilder();
        sb.append(lockPrex).append(lock);
        return sb.toString();
    }

}

3.3. LuaLockRedisLockManager

Унаследуйте вышеописанный SimpleRedisLockManager, перепишите init и инициализируйте только что написанную блокировку.

@Component
public class LuaLockRedisLockManager extends SimpleRedisLockManager {
    @PostConstruct
    public void init(){
        // 初始化锁
        distributeLock = new LuaDistributeLock(redisTemplate, "mylock_", 5);
    }
}

3.4 Код, который реально использует блокировку TestCtrl

То же использование, что и SimpleRedisLockManager

@Autowired
private LuaLockRedisLockManager luaLockRedisLockManager;

luaLockRedisLockManager.lockCallBack("distributeLock2" + ThreadLocalRandom.current().nextInt(1000), new SimpleCallBack() {
    @Override
    public void execute() {
        System.out.println("distributeLock2");
    }
});

4. Сравнение производительности

Давайте проведем стресс-тестирование этих двух реализаций с помощью инструмента стресс-тестирования ab: 100 одновременных потоков, отправив в общей сложности 1000 запросов.
простой RedisLockManager: ab -n 1000 -c 100http://192.168.188.4:8080/distributeLock2
luaLockRedisLockManager: ab -n 1000 -c 100http://192.168.188.4:8080/distributeLock

Подробные данные следующие
这里写图片描述
Анализ: lua-скрипт намного быстрее, чем реализация redis, а lua-скрипт в два раза быстрее, чем использование обычных команд. Чем более стрессовая ситуация, тем выше производительность lua.

5. Резюме

Для того, чтобы лучше использовать замок, рекомендуется выполнить следующие условия

  1. Распределенная блокировка с использованием lua
  2. Установите разумный тайм-аут в соответствии с бизнес-логикой
  3. Степень детализации блокировки настолько мала, насколько это возможно, чтобы уменьшить количество конфликтов.

6. Код

Подробности смотрите в коде