Зачем нужна распределенная блокировка cp
Функции и требования распределенных замков у нас естьРаспределенная блокировка Redis: упрощенная версия распределенной блокировки на основе АОП и Redis.Краткое введение.
В настоящее время распределенная блокировка Redis собственной разработки может соответствовать большинству сценариев (недобросовестное + автоматическое продление + распределенная блокировка с повторным входом) и может использоваться в среде с одним компьютером в производственной среде. Однако, поскольку он основан на автономной среде Redis, его можно использовать только в сценариях с низким параллелизмом. С расширением бизнес-сценариев доступа автономный Redis стал ненадежным, поэтому у нас есть только два варианта: 1. Автономный Redis заменен на кластерный. 2. Используйте другие реализации, основанные на алгоритмах консенсуса.
У схемы 1 есть врожденный дефект, и кластер redis не может гарантировать проблему согласованности: в момент, когда мастер-нода выходит из строя, данные между мастер-нодами и ведомыми могут быть несогласованными. Это приведет к тому, что служба а получит блокировку а от главного узла, а затем главный узел выйдет из строя.Прежде чем подчиненный узел полностью синхронизирует данные главного узла, служба b успешно получит ту же блокировку а от подчиненного узла.
В других реализациях, основанных на алгоритмах консенсуса, zk и ectd являются хорошим выбором. Затем, учитывая, что zk довольно старый, мы выбрали восходящую звезду ectd.
Поскольку в сценарии с распределенными блокировками нас больше заботит непротиворечивость блокировок, а не их доступность, поэтому блокировки cp более надежны, чем блокировки ap.
Идеи дизайна
etcd вводит понятие аренды, нам сначала нужно предоставить аренду, а затем одновременно установить время действия аренды. Срок действия аренды может использоваться как срок действия блокировки.
Затем мы можем напрямую вызвать функцию блокировки etcd, чтобы заблокировать указанное имя блокировки в указанной аренде. Если никакой другой поток в настоящее время не удерживает блокировку, поток может напрямую удерживать блокировку. В противном случае нужно ждать. Здесь мы можем установить время ожидания как время ожидания блокировки, чтобы реализовать процесс борьбы за сбой блокировки и ожидания получения. Конечно, из-за колебаний сети и других проблем я рекомендую установить время ожидания не менее 500 мс (или значение, которое вы считаете разумным).
Затем в процессе разблокировки мы отказались от операции разблокировки etcd и напрямую использовали операцию отмены etcd. Причина, по которой операция разблокировки не используется, заключается в том, что параметр, необходимый для разблокировки, — это lockKey, возвращаемый предыдущей операцией блокировки, и мы не хотим поддерживать еще одно поле, а второе — потому что в конечном итоге мы выполним операцию отзыва, и операция отзыва сделает аренду. Все приведенные ниже ключи недействительны, потому что в настоящее время мы разрабатываем аренду, соответствующую блокировке, и нет ситуации, когда блокировки в других бизнес-сценариях будут сняты.
Кроме того, чтобы гарантировать, что срок аренды не истечет, пока поток ожидает получения блокировки, мы должны настроить поток демона для этого потока и запустить поток демона после того, как поток предоставит аренду, и периодически определять нужно ли его обновлять.
В отличие от распределенных блокировок redis, время действия распределенных блокировок redis — это время действия кэша, поэтому поток демона для обновления может быть запущен после успешного получения блокировки, в то время как время действия распределенных блокировок etcd — это время аренды. из , срок аренды может истечь во время ожидания получения блокировки, поэтому поток демона должен быть запущен после получения аренды. Это добавляет много сложности.
##Выполнение Нативный etcd написан на языке Go, и его немного сложно применить непосредственно в java-программе, поэтому мы напрямую используем jetcd в качестве etcd-клиента, чтобы java-программа могла использовать код для связи с etcd-сервером.
jetcd предоставляет LeaseClient, мы можем напрямую использовать функцию предоставления для завершения операции предоставления аренды.
public LockLeaseData getLeaseData(String lockName, Long lockTime) {
try {
LockLeaseData lockLeaseData = new LockLeaseData();
CompletableFuture<LeaseGrantResponse> leaseGrantResponseCompletableFuture = client.getLeaseClient().grant(lockTime);
Long leaseId = leaseGrantResponseCompletableFuture.get(1, TimeUnit.SECONDS).getID();
lockLeaseData.setLeaseId(leaseId);
CpSurvivalClam cpSurvivalClam = new CpSurvivalClam(Thread.currentThread(), leaseId, lockName, lockTime, this);
Thread survivalThread = threadFactoryManager.getThreadFactory().newThread(cpSurvivalClam);
survivalThread.start();
lockLeaseData.setCpSurvivalClam(cpSurvivalClam);
lockLeaseData.setSurvivalThread(survivalThread);
return lockLeaseData;
} catch (InterruptedException | ExecutionException | TimeoutException e) {
return null;
}
}
Кроме того, как упоминалось выше, после получения аренды мы запускаем поток демона CpSurvivalClam для ее периодического продления. Реализация CpSurvivalClam примерно такая же, как наша реализация распределенных блокировок redis, с той лишь разницей, что операция expandLockTime изменена на keepAliveOnce в etcd. Метод expandLockTime выглядит следующим образом:
/**
* 重置锁的有效时间
*
* @param leaseId 锁的租约id
* @return 是否成功重置
*/
public Boolean expandLockTime(Long leaseId) {
try {
CompletableFuture<LeaseKeepAliveResponse> leaseKeepAliveResponseCompletableFuture = client.getLeaseClient().keepAliveOnce(leaseId);
leaseKeepAliveResponseCompletableFuture.get();
return Boolean.TRUE;
} catch (InterruptedException | ExecutionException e) {
return Boolean.FALSE;
}
}
Затем jetcd предоставляет LockClient, мы можем напрямую использовать функцию блокировки, передать в лизингId и lockName, и мы получим lockKey по аренде. Кроме того, чтобы гарантировать, что срок аренды не истечет после успешной блокировки. Мы добавили операцию timeToLive, чтобы определить, действует ли аренда после успешного получения блокировки. Если ttl не больше 0, считается, что блокировка не удалась.
/**
* 在指定的租约上加锁,如果租约过期,则算加锁失败。
*
* @param leaseId 锁的租约Id
* @param lockName 锁的名称
* @param waitTime 加锁过程中的的等待时间,单位ms
* @return 是否加锁成功
*/
public Boolean tryLock(Long leaseId, String lockName, Long waitTime) {
try {
CompletableFuture<LockResponse> lockResponseCompletableFuture = client.getLockClient().lock(ByteSequence.from(lockName, Charset.defaultCharset()), leaseId);
long timeout = Math.max(500, waitTime);
lockResponseCompletableFuture.get(timeout, TimeUnit.MILLISECONDS).getKey();
CompletableFuture<LeaseTimeToLiveResponse> leaseTimeToLiveResponseCompletableFuture = client.getLeaseClient().timeToLive(leaseId, LeaseOption.DEFAULT);
long ttl = leaseTimeToLiveResponseCompletableFuture.get(1, TimeUnit.SECONDS).getTTl();
if (ttl > 0) {
return Boolean.TRUE;
} else {
return Boolean.FALSE;
}
} catch (TimeoutException | InterruptedException | ExecutionException e) {
return Boolean.FALSE;
}
}
В процессе разблокировки мы можем напрямую использовать операцию отзыва в LeaseClient, чтобы снять блокировку по аренде при отзыве аренды.
/**
* 取消租约,并释放锁
*
* @param leaseId 租约id
* @return 是否成功释放
*/
public Boolean unLock(Long leaseId) {
try {
CompletableFuture<LeaseRevokeResponse> revokeResponseCompletableFuture = client.getLeaseClient().revoke(leaseId);
revokeResponseCompletableFuture.get(1, TimeUnit.SECONDS);
return Boolean.TRUE;
} catch (InterruptedException | ExecutionException | TimeoutException e) {
return Boolean.FALSE;
}
}
Кроме того, существует унифицированный объект CpLock, который инкапсулирует процесс добавления и разблокировки и предоставляет только метод execute, чтобы пользователи не забыли шаги разблокировки.
public class CpLock {
private String lockName;
private LockEtcdClient lockEtcdClient;
/**
* 分布式锁的锁持有数
*/
private volatile int state;
private volatile transient Thread lockOwnerThread;
/**
* 当前线程拥有的lease对象
*/
private FastThreadLocal<LockLeaseData> lockLeaseDataFastThreadLocal = new FastThreadLocal<>();
/**
* 锁自动释放时间,单位s,默认为30
*/
private static Long LOCK_TIME = 30L;
/**
* 获取锁失败单次等待时间,单位ms,默认为300
*/
private static Integer SLEEP_TIME_ONCE = 300;
CpLock(String lockName, LockEtcdClient lockEtcdClient) {
this.lockName = lockName;
this.lockEtcdClient = lockEtcdClient;
}
private LockLeaseData getLockLeaseData(String lockName, long lockTime) {
if (lockLeaseDataFastThreadLocal.get() != null) {
return lockLeaseDataFastThreadLocal.get();
} else {
LockLeaseData lockLeaseData = lockEtcdClient.getLeaseData(lockName, lockTime);
lockLeaseDataFastThreadLocal.set(lockLeaseData);
return lockLeaseData;
}
}
final Boolean tryLock(long waitTime) {
final long startTime = System.currentTimeMillis();
final long endTime = startTime + waitTime * 1000;
final long lockTime = LOCK_TIME;
final Thread current = Thread.currentThread();
try {
do {
int c = this.getState();
if (c == 0) {
LockLeaseData lockLeaseData = this.getLockLeaseData(lockName, lockTime);
if (Objects.isNull(lockLeaseData)) {
return Boolean.FALSE;
}
Long leaseId = lockLeaseData.getLeaseId();
if (lockEtcdClient.tryLock(leaseId, lockName, endTime - System.currentTimeMillis())) {
log.info("线程获取重入锁成功,cp锁的名称为{}", lockName);
this.setLockOwnerThread(current);
this.setState(c + 1);
return Boolean.TRUE;
}
} else if (lockOwnerThread == Thread.currentThread()) {
if (c + 1 <= 0) {
throw new Error("Maximum lock count exceeded");
}
this.setState(c + 1);
log.info("线程重入锁成功,cp锁的名称为{},当前LockCount为{}", lockName, state);
return Boolean.TRUE;
}
int sleepTime = SLEEP_TIME_ONCE;
if (waitTime > 0) {
log.info("线程暂时无法获得cp锁,当前已等待{}ms,本次将再等待{}ms,cp锁的名称为{}", System.currentTimeMillis() - startTime, sleepTime, lockName);
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
log.info("线程等待过程中被中断,cp锁的名称为{}", lockName, e);
}
}
} while (System.currentTimeMillis() <= endTime);
if (waitTime == 0) {
log.info("线程获得cp锁失败,将放弃获取,cp锁的名称为{}", lockName);
} else {
log.info("线程获得cp锁失败,之前共等待{}ms,将放弃等待获取,cp锁的名称为{}", System.currentTimeMillis() - startTime, lockName);
}
this.stopKeepAlive();
return Boolean.FALSE;
} catch (Exception e) {
log.error("execute error", e);
this.stopKeepAlive();
return Boolean.FALSE;
}
}
/**
* 停止续约,并将租约对象从线程中移除
*/
private void stopKeepAlive() {
LockLeaseData lockLeaseData = lockLeaseDataFastThreadLocal.get();
if (Objects.nonNull(lockLeaseData)) {
lockLeaseData.getCpSurvivalClam().stop();
lockLeaseData.setCpSurvivalClam(null);
lockLeaseData.getSurvivalThread().interrupt();
lockLeaseData.setSurvivalThread(null);
}
lockLeaseDataFastThreadLocal.remove();
}
final void unLock() {
if (lockOwnerThread == Thread.currentThread()) {
int c = this.getState() - 1;
if (c == 0) {
this.setLockOwnerThread(null);
this.setState(c);
LockLeaseData lockLeaseData = lockLeaseDataFastThreadLocal.get();
this.stopKeepAlive();
//unLock操作必须在最后执行,避免其他线程获取到锁时的state等数据不正确
lockEtcdClient.unLock(lockLeaseData.getLeaseId());
log.info("重入锁LockCount-1,线程已成功释放锁,cp锁的名称为{}", lockName);
} else {
this.setState(c);
log.info("重入锁LockCount-1,cp锁的名称为{},剩余LockCount为{}", lockName, c);
}
}
}
public <T> T execute(Supplier<T> supplier, int waitTime) {
Boolean holdLock = Boolean.FALSE;
Preconditions.checkArgument(waitTime >= 0, "waitTime必须为自然数");
try {
if (holdLock = this.tryLock(waitTime)) {
return supplier.get();
}
return null;
} catch (Exception e) {
log.error("cpLock execute error", e);
return null;
} finally {
if (holdLock) {
this.unLock();
}
}
}
public <T> T execute(Supplier<T> supplier) {
return this.execute(supplier, 0);
}
}
Реализация CpLock примерно такая же, как у ApLock в предыдущей распределенной блокировке Redis. Основные отличия:
1. Поскольку мы запускаем поток демона в операции предоставления аренды, мы должны остановить поток демона для обновления в сценариях сбоя в борьбе за блокировки, исключения и блокировки блокировки. И поскольку это реентерабельный сценарий, мы надеемся сгенерировать аренду для конкуренции за блокировки только тогда, когда состояние равно 0. Поэтому, чтобы избежать множественных суждений, мы вводим FastThreadLocal lockLeaseDataFastThreadLocal для сохранения объекта Lease текущего потока.
2. Распределенные блокировки Redis В любом сценарии ожидание получения блокировки достигается за счет бездействующего опроса В сценарии etcd мы используем собственную логику ожидания etcd для завершения ожидания, когда состояние равно 0. Когда состояние не равно 0 сценарии ожидание по-прежнему реализуется посредством опроса во время сна. Поскольку могут быть случаи, когда состояние изменяется с не-0 на 0, наше значение waitTime равно endTime - System.currentTimeMillis(), а не исходному входящему waitTime. Это может приблизить время ожидания к тому, что мы ожидаем.
Примечания к выпуску
В этом обновлении мы реализовали распределенную блокировку cp на основе etcd, а также исправили скрытую проблему в распределенной блокировке redis.
Предыдущая операция setState выполняется после unLock, что вызовет проблемы в параллельных сценариях. Поток a и поток b конкурируют за получение блокировки a. В это время их соответствующие локальные переменные c и состояние равны 0. Затем поток a освобождает блокировку сразу после получения блокировки. В это время сначала выполняется unLock, состояние равно все еще 1, поток b. Блокировка успешно получена, состояние сбрасывается до c+1, что по-прежнему равно 1, а затем поток a выполняет setState и изменяет stete на 0. В это время, если поток b снимает блокировку, выполняет операцию stete-1 и становится -1. Эта проблема в основном связана с тем, что операции получения значения состояния и изменения значения состояния являются асинхронными, а в многопоточном сценарии распределенная блокировка управляется блокировкой, нам нужно только переместить операцию разблокировки после всех назначений, чтобы решить эту проблему.
план дальнейших действий
Реализованная в настоящее время версия распределенной блокировки cp может соответствовать большинству сценариев распределенных блокировок (недобросовестные + автоматическое продление + реентерабельные + строго согласованные распределенные блокировки) и может быть помещена в производственные кластеры, используемые в. В последующем плане блокировка AP и блокировка cp будут соответственно обновлены, а некоторые сценарии использования будут оптимизированы. Он также попытается решить проблему честных блокировок и проблему ожидания перехода в спящий режим для получения блокировок в цикле.
Приведенный выше план выполнен. Подробности о том, как добиться честной блокировкиРаспределенная блокировка Etcd (2): поддерживает справедливые блокировки, чтобы в некоторых сценариях потоки не могли получить блокировки в течение длительного времени.
Эта распределенная блокировка cp должна учитывать большое количество сценариев использования.В настоящее время проведено только небольшое тестирование.
Рекомендуемое чтение
1,Распределенная блокировка Redis: упрощенная версия распределенной блокировки на основе АОП и Redis.
2,Распределенная блокировка Redis (2): поддерживает обновление блокировки, чтобы избежать получения блокировок несколькими потоками после тайм-аута блокировки.
3.Распределенная блокировка Redis (3): поддержка повторного входа в блокировку, избежание взаимоблокировки при блокировке рекурсивного вызова
Что ж, увидимся в следующем выпуске, добро пожаловать, чтобы оставить сообщение и обсудить вместе. Также приветствую лайк~