Статус использования
Базовое содержимое распределенных замков Redis мы ужеРаспределенная блокировка Redis: упрощенная версия распределенной блокировки на основе АОП и Redis.Эта статья говорила об этом, а также демонстрирует обычные методы блокировки и разблокировки в статье.
Почему распределенные блокировки Redis поддерживают обновление и как поддерживать методы обновления, мы такжеРаспределенная блокировка Redis (2): поддерживает обновление блокировки, чтобы избежать получения блокировок несколькими потоками после тайм-аута блокировки.
Спустя полгода после последнего обновления до возобновляемого распределенного замка эта простая версия распределенного замка собственной разработки все еще работает хорошо.
проблема найдена
Однако, когда я недавно проверял онлайн-журнал, я случайно обнаружил, что в бизнес-сценарии взаимоблокировка может возникнуть в распределенных блокировках.
После предварительного расследования мы обнаружили, что это было вызвано вызовом другого метода, который необходимо выполнить в методе, который уже удерживает блокировку.
Упрощенная функция, как показано ниже:
@Service
public class Lock1ServiceImpl implements Lock1Service {
@Override
@LockAnnotation(lockField = "test", lockKey = "1")
public void test() {
}
}
@Service
public class Lock2ServiceImpl implements Lock2Service {
@Autowired
private Lock1Service lock1Service;
@Override
@LockAnnotation(lockField = "test", lockKey = "1")
public void test(){
lock1Service.test();
}
}
Процесс взаимоблокировки в основном выглядит следующим образом:
Тестовый метод Lock2Service должен быть заблокирован (ключ блокировки «test:1»), а затем внутри метода вызывается тестовый метод Lock1Service, и тестовый метод Lock1Service также должен добавить ту же блокировку.
Когда выполняется lock1Service.test(), тестовый метод Lock2Service не завершается, поэтому блокировка не будет снята, и lock1Service должен получить блокировку при выполнении тестового метода, и в это время возникает взаимоблокировка.
Этот код может продолжать работать только после того, как lock1Service добровольно сдастся и продолжит захват блокировки после ожидания в течение определенного периода времени. И тестовый метод lock1Service никогда не будет выполнен.
решение
Теперь, когда проблема возникла, следующее, что мы должны сделать, это найти способ избежать этой ситуации, насколько это возможно.
Мы быстро подумали о ReentrantLock, который поставляется с jdk, и мы можем реализовать реентерабельные распределенные блокировки по тому же принципу.
Принцип реализации повторного входа аналогичен принципу ReentrantLock.Отличие от ReentrantLock заключается в том, что распределенные блокировки должны использовать Redis для конкуренции за распределенные блокировки при получении блокировок в первый раз. Когда блокировка уже удерживается и требуется повторный вход, уровень распределенной блокировки будет понижен до локальной блокировки. Только ресурсы одного и того же потока имеют концепцию повторного входа.
Ниже приведены основные сведения о классе блокировки.Процессы блокировки и разблокировки следующие:
final Boolean tryLock(String lockValue, int waitTime) {
long startTime = System.currentTimeMillis();
long endTime = startTime + waitTime * 1000;
try {
do {
final Thread current = Thread.currentThread();
int c = this.getState();
if (c == 0) {
int lockTime = LOCK_TIME;
if (lockRedisClient.setLock(lockKey, lockValue, lockTime)) {
lockOwnerThread = current;
this.setState(c + 1);
survivalClamProcessor = new SurvivalClamProcessor(lockKey, lockValue, lockTime, lockRedisClient);
(survivalThread = threadFactoryManager.getThreadFactory().newThread(survivalClamProcessor)).start();
log.info("线程获取重入锁成功,锁的名称为{}", lockKey);
return Boolean.TRUE;
}
} else if (lockOwnerThread == Thread.currentThread()) {
if (c + 1 < 0) {
throw new Error("Maximum lock count exceeded");
}
this.setState(c + 1);
log.info("线程重入锁成功,锁的名称为{},当前LockCount为{}", lockKey, state);
return Boolean.TRUE;
}
int sleepTime = SLEEP_TIME_ONCE;
if (waitTime > 0) {
log.info("线程暂时无法获得锁,当前已等待{}ms,本次将再等待{}ms,锁的名称为{}", System.currentTimeMillis() - startTime, sleepTime, lockKey);
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
log.info("线程等待过程中被中断,锁的名称为{}", lockKey, e);
}
}
} while (System.currentTimeMillis() <= endTime);
if (waitTime == 0) {
log.info("线程获得锁失败,将放弃获取锁,锁的名称为{}", lockKey);
} else {
log.info("线程获得锁失败,之前共等待{}ms,将放弃等待获取锁,锁的名称为{}", System.currentTimeMillis() - startTime, lockKey);
}
return Boolean.FALSE;
} catch (Exception e) {
return Boolean.FALSE;
}
}
final void unLock(String lockValue) {
if (lockOwnerThread == Thread.currentThread()) {
int c = this.getState() - 1;
if (c == 0) {
this.setLockOwnerThread(null);
survivalClamProcessor.stop();
survivalThread.interrupt();
this.setSurvivalClamProcessor(null);
this.setSurvivalThread(null);
this.setState(c);
lockRedisClient.delLock(lockKey, lockValue);
log.info("重入锁LockCount-1,线程已成功释放锁,锁的名称为{}", lockKey);
} else {
this.setState(c);
log.info("重入锁LockCount-1,锁的名称为{},剩余LockCount为{}", lockKey, c);
}
}
}
Затем, чтобы пользователи часто не забывали разблокировать или разблокировать нарушения, метод добавления и разблокировки не подвергается внешнему миру, только метод выполнения подвергается внешнему миру:
public <T> T execute(Supplier<T> supplier, int waitTime) {
String randomValue = UUID.randomUUID().toString();
Boolean holdLock = Boolean.FALSE;
try {
if (holdLock = this.tryLock(randomValue, waitTime)) {
return supplier.get();
}
return null;
} catch (Exception e) {
log.error("execute error", e);
return null;
} finally {
if (holdLock) {
this.unLock(randomValue);
}
}
}
Кроме того, при реализации aop, поскольку тот же объект блокировки не может быть получен из контекста, необходимо получить объект блокировки через lockManager.getLock(lockField, lockKey). Если lockPrefix и lockKey совпадают, будет получен один и тот же объект блокировки, что реализует реентерабельную функцию.
public Lock getLock(String lockPrefix, String lockKey) {
String finalLockKey = StringUtils.isEmpty(lockPrefix) ? lockKey : (lockPrefix.concat(":").concat(lockKey));
if (lockMap.containsKey(finalLockKey)) {
return lockMap.get(finalLockKey);
} else {
Lock lock = new Lock(finalLockKey, lockRedisClient, threadFactoryManager);
Lock existLock = lockMap.putIfAbsent(finalLockKey, lock);
return Objects.nonNull(existLock) ? existLock : lock;
}
}
Примечания к выпуску
В этом обновлении, помимо реализации функции ReentRant распределенных замков, она также реализует программные распределенные блокировки через метод выполнения на основе аннотации @Clockannotation от декларативных распределенных замков.
Кроме того, поскольку предыдущая версия реализовала функцию продления, время блокировки на блоканотацию помечено как истекло, а время истечения замка равномерно изменяется на 30-е годы. Используйте функцию обновления для достижения функций, которые требуют длительного замка.
план дальнейших действий
В текущей реализованной версии можно выполнить большинство сценариев распределенных блокировок (недобросовестные + автообновляемые + реентерабельные распределенные блокировки), и их можно использовать в производственных средах. Тем не менее, он по-прежнему не поддерживает честные блокировки, а при сбое конкурентной блокировки для достижения блокировки потока используется метод спин + ожидание потока, который может быть оптимизирован в этих двух направлениях в будущем.
Что ж, увидимся в следующем выпуске, добро пожаловать, чтобы оставить сообщение и обсудить вместе. В то же время, добро пожаловать, чтобы поставить лайк, добро пожаловать, чтобы отправить маленькие звездочки~