Распределенная блокировка Redis (3): поддержка повторного входа в блокировку, избежание взаимоблокировки при блокировке рекурсивного вызова

задняя часть

Статус использования

Базовое содержимое распределенных замков Redis мы ужеРаспределенная блокировка Redis: упрощенная версия распределенной блокировки на основе АОП и Redis.Эта статья говорила об этом, а также демонстрирует обычные методы блокировки и разблокировки в статье.

Почему распределенные блокировки Redis поддерживают обновление и как поддерживать методы обновления, мы такжеРаспределенная блокировка Redis (2): поддерживает обновление блокировки, чтобы избежать получения блокировок несколькими потоками после тайм-аута блокировки.

Спустя полгода после последнего обновления до возобновляемого распределенного замка эта простая версия распределенного замка собственной разработки все еще работает хорошо.

проблема найдена

Однако, когда я недавно проверял онлайн-журнал, я случайно обнаружил, что в бизнес-сценарии взаимоблокировка может возникнуть в распределенных блокировках.

После предварительного расследования мы обнаружили, что это было вызвано вызовом другого метода, который необходимо выполнить в методе, который уже удерживает блокировку.

Упрощенная функция, как показано ниже:

@Service
public class Lock1ServiceImpl implements Lock1Service {
    @Override
    @LockAnnotation(lockField = "test", lockKey = "1")
    public void test() {
    
    }
}

@Service
public class Lock2ServiceImpl implements Lock2Service {

    @Autowired
    private Lock1Service lock1Service;

    @Override
    @LockAnnotation(lockField = "test", lockKey = "1")
    public void test(){
        lock1Service.test();
    }
}

Процесс взаимоблокировки в основном выглядит следующим образом:

Тестовый метод Lock2Service должен быть заблокирован (ключ блокировки «test:1»), а затем внутри метода вызывается тестовый метод Lock1Service, и тестовый метод Lock1Service также должен добавить ту же блокировку.

Когда выполняется lock1Service.test(), тестовый метод Lock2Service не завершается, поэтому блокировка не будет снята, и lock1Service должен получить блокировку при выполнении тестового метода, и в это время возникает взаимоблокировка.

Этот код может продолжать работать только после того, как lock1Service добровольно сдастся и продолжит захват блокировки после ожидания в течение определенного периода времени. И тестовый метод lock1Service никогда не будет выполнен.

решение

Теперь, когда проблема возникла, следующее, что мы должны сделать, это найти способ избежать этой ситуации, насколько это возможно.

Мы быстро подумали о ReentrantLock, который поставляется с jdk, и мы можем реализовать реентерабельные распределенные блокировки по тому же принципу.

Принцип реализации повторного входа аналогичен принципу ReentrantLock.Отличие от ReentrantLock заключается в том, что распределенные блокировки должны использовать Redis для конкуренции за распределенные блокировки при получении блокировок в первый раз. Когда блокировка уже удерживается и требуется повторный вход, уровень распределенной блокировки будет понижен до локальной блокировки. Только ресурсы одного и того же потока имеют концепцию повторного входа.

Ниже приведены основные сведения о классе блокировки.Процессы блокировки и разблокировки следующие:

final Boolean tryLock(String lockValue, int waitTime) {
    long startTime = System.currentTimeMillis();
    long endTime = startTime + waitTime * 1000;
    try {
        do {
            final Thread current = Thread.currentThread();
            int c = this.getState();
            if (c == 0) {
                int lockTime = LOCK_TIME;
                if (lockRedisClient.setLock(lockKey, lockValue, lockTime)) {
                    lockOwnerThread = current;
                    this.setState(c + 1);
                    survivalClamProcessor = new SurvivalClamProcessor(lockKey, lockValue, lockTime, lockRedisClient);
                    (survivalThread = threadFactoryManager.getThreadFactory().newThread(survivalClamProcessor)).start();
                    log.info("线程获取重入锁成功,锁的名称为{}", lockKey);
                    return Boolean.TRUE;
                }
            } else if (lockOwnerThread == Thread.currentThread()) {
                if (c + 1 < 0) {
                    throw new Error("Maximum lock count exceeded");
                }
                this.setState(c + 1);
                log.info("线程重入锁成功,锁的名称为{},当前LockCount为{}", lockKey, state);
                return Boolean.TRUE;
            }
            int sleepTime = SLEEP_TIME_ONCE;
            if (waitTime > 0) {
                log.info("线程暂时无法获得锁,当前已等待{}ms,本次将再等待{}ms,锁的名称为{}", System.currentTimeMillis() - startTime, sleepTime, lockKey);
                try {
                    Thread.sleep(sleepTime);
                } catch (InterruptedException e) {
                    log.info("线程等待过程中被中断,锁的名称为{}", lockKey, e);
                }
            }
        } while (System.currentTimeMillis() <= endTime);
        if (waitTime == 0) {
            log.info("线程获得锁失败,将放弃获取锁,锁的名称为{}", lockKey);
        } else {
            log.info("线程获得锁失败,之前共等待{}ms,将放弃等待获取锁,锁的名称为{}", System.currentTimeMillis() - startTime, lockKey);
        }
        return Boolean.FALSE;
    } catch (Exception e) {
        return Boolean.FALSE;
    }
}
final void unLock(String lockValue) {
    if (lockOwnerThread == Thread.currentThread()) {
        int c = this.getState() - 1;
        if (c == 0) {
            this.setLockOwnerThread(null);
            survivalClamProcessor.stop();
            survivalThread.interrupt();
            this.setSurvivalClamProcessor(null);
            this.setSurvivalThread(null);
            this.setState(c);
            lockRedisClient.delLock(lockKey, lockValue);
            log.info("重入锁LockCount-1,线程已成功释放锁,锁的名称为{}", lockKey);
        } else {
            this.setState(c);
            log.info("重入锁LockCount-1,锁的名称为{},剩余LockCount为{}", lockKey, c);
        }
    }
}

Затем, чтобы пользователи часто не забывали разблокировать или разблокировать нарушения, метод добавления и разблокировки не подвергается внешнему миру, только метод выполнения подвергается внешнему миру:

public <T> T execute(Supplier<T> supplier, int waitTime) {
    String randomValue = UUID.randomUUID().toString();
    Boolean holdLock = Boolean.FALSE;
    try {
        if (holdLock = this.tryLock(randomValue, waitTime)) {
            return supplier.get();
        }
        return null;
    } catch (Exception e) {
        log.error("execute error", e);
        return null;
    } finally {
        if (holdLock) {
            this.unLock(randomValue);
        }
    }
}

Кроме того, при реализации aop, поскольку тот же объект блокировки не может быть получен из контекста, необходимо получить объект блокировки через lockManager.getLock(lockField, lockKey). Если lockPrefix и lockKey совпадают, будет получен один и тот же объект блокировки, что реализует реентерабельную функцию.

public Lock getLock(String lockPrefix, String lockKey) {
    String finalLockKey = StringUtils.isEmpty(lockPrefix) ? lockKey : (lockPrefix.concat(":").concat(lockKey));
    if (lockMap.containsKey(finalLockKey)) {
        return lockMap.get(finalLockKey);
    } else {
        Lock lock = new Lock(finalLockKey, lockRedisClient, threadFactoryManager);
        Lock existLock = lockMap.putIfAbsent(finalLockKey, lock);
        return Objects.nonNull(existLock) ? existLock : lock;
    }
}

Примечания к выпуску

В этом обновлении, помимо реализации функции ReentRant распределенных замков, она также реализует программные распределенные блокировки через метод выполнения на основе аннотации @Clockannotation от декларативных распределенных замков.

Кроме того, поскольку предыдущая версия реализовала функцию продления, время блокировки на блоканотацию помечено как истекло, а время истечения замка равномерно изменяется на 30-е годы. Используйте функцию обновления для достижения функций, которые требуют длительного замка.

план дальнейших действий

В текущей реализованной версии можно выполнить большинство сценариев распределенных блокировок (недобросовестные + автообновляемые + реентерабельные распределенные блокировки), и их можно использовать в производственных средах. Тем не менее, он по-прежнему не поддерживает честные блокировки, а при сбое конкурентной блокировки для достижения блокировки потока используется метод спин + ожидание потока, который может быть оптимизирован в этих двух направлениях в будущем.

Что ж, увидимся в следующем выпуске, добро пожаловать, чтобы оставить сообщение и обсудить вместе. В то же время, добро пожаловать, чтобы поставить лайк, добро пожаловать, чтобы отправить маленькие звездочки~