предисловие
Обычно существует три реализации основных распределенных блокировок:
- Оптимистическая блокировка базы данных
- Распределенная блокировка на основе Redis
- Распределенная блокировка на основе ZooKeeper
Я уже писал о конкретной схеме реализации распределенных блокировок mysql и redis в своем блоге:Блог woohoo.cn на.com/wang-meng/afraid…Это в основном начинается с принципа реализации.
В этой серии статей [Распределенная блокировка] в основном подробно рассматриваются принципы реализации двух распределенных блокировок клиента Redis, исходного кода Reddision и zk.
надежность
Во-первых, чтобы обеспечить доступность распределенных блокировок, мы должны как минимум убедиться, что реализация блокировки соответствует следующим четырем условиям:
- взаимная исключительность. В любой момент только один клиент может удерживать блокировку.
- Взаимной блокировки не произойдет. Даже если клиент выйдет из строя, удерживая блокировку, и не разблокирует ее активно, другие клиенты могут быть гарантированно заблокированы в будущем.
- Отказоустойчивой. Клиенты могут блокировать и разблокировать, пока большинство узлов Redis запущены и работают.
- Беда должна положить этому конец. Блокировку и разблокировку должен выполнять один и тот же клиент, и клиент не может разблокировать блокировки, добавленные другими.
Принцип блокировки Редиссона
redisson — очень мощная клиентская среда Redis с открытым исходным кодом, официальный адрес:redisson.org/
Он очень прост в использовании, настройке maven и информации о подключении, вот прямой взгляд на реализацию кода:
RLock lock = redisson.getLock("anyLock");
lock.lock();
lock.unlock();
Конкретная логика блокировки выполнения redisson дополняется сценарием lua, который может гарантировать атомарность.
Сначала посмотрите на код для инициализации RLock:
public class Redisson implements RedissonClient {
@Override
public RLock getLock(String name) {
return new RedissonLock(connectionManager.getCommandExecutor(), name);
}
}
public class RedissonLock extends RedissonExpirable implements RLock {
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = commandExecutor.getConnectionManager().getId();
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.entryName = id + ":" + name;
}
Первый взглядRedissonLock
id возвращает объект UUID, каждая машина соответствует своему атрибуту id,id
Значение похоже на: "8743c9c0-0795-4907-87fd-6c719a6b4586"
затем оглянуться назадlock()
Реализация кода:
public class RedissonLock extends RedissonExpirable implements RLock {
@Override
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
// 获取当前线程id
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
}
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
}
Некоторые промежуточные коды здесь опущены, и здесь мы в основном рассматриваемtryAcquire()
метод, переданное здесь время истечения -1, затем идентификатор текущего потока, а затем основной процесс выполнения сценария lua, давайте посмотрим, как он выполняется шаг за шагом:
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
KEYS[1]
Параметры: "любой замок"ARGV[2]
это: "id + ":" + threadId"
впервые использованныйexists
Определить, существует ли текущий ключ в redis, если нет, то он равен 0, а затем выполнитьhset
Инструкция по сохранению «anyLock id: threadId 1» в Redis, окончательные данные, хранящиеся в Redis, аналогичны:
{
"8743c9c0-0795-4907-87fd-6c719a6b4586:1":1
}
По секрету, последняя 1 предназначена для подсчета статистики повторного входа позже, что будет объяснено позже.
Затем посмотрите вниз и используйтеpexpire
Установите время истечения срока действия, которое используется по умолчанию.internalLockLeaseTime
на 30с. Окончательный возврат равен нулю, и мгновенная блокировка прошла успешно.
Принцип реентерабельности Редисона
Давайте посмотрим, как один и тот же поток на той же машине блокируется, когда ключ блокировки существует?
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
ARGV[2]
это: "id + ":" + threadId"
Если та же машина и тот же поток снова придут на запрос, здесь будет 1, а затем выполнитьhincrby
, значение +1, заданное параметром hset, становится равным 2, а затем продолжайте устанавливать время истечения срока действия.
Точно так же после повторного входа потока значение - 1, когда он разблокирован.
Принцип Redisson watchDog
Если сценарий: A и B сейчас выполняют бизнес, A добавил распределенную блокировку, но производственная среда меняется по-разному, если время ожидания блокировки A истекло, но бизнес A все еще работает. В это время, поскольку блокировка A снимается с течением времени, B получает блокировку, а B выполняет бизнес-логику. Делает ли это распределенные блокировки бессмысленными?
Поэтому Redisson вводит концепцию сторожевого таймера.Когда A получает блокировку для выполнения, если срок действия блокировки не истек, фоновый поток автоматически продлевает время истечения блокировки, чтобы предотвратить истечение срока действия блокировки, поскольку бизнес не был выполнен. .
Давайте посмотрим на конкретную реализацию:
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
когда мыtryLockInnerAsync
После выполнения будет добавлен слушатель, чтобы увидеть конкретную реализацию в слушателе:
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
Здесь задача планирования выполняется каждые 10 сек. Сценарий lua — это время истечения срока действия обновления, так что блокировка, удерживаемая текущим потоком, не будет аннулирована из-за истечения времени истечения срока действия.
Принцип взаимного исключения Redisson
Или посмотрите на lua-скрипт, который выполняет блокировку выше, и, наконец, он выполнится:
"return redis.call('pttl', KEYS[1]);",
Сколько времени действует блокировка возврата, давайте продолжим смотреть на код:
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// 返回ttl说明加锁成功,不为空则是加锁失败
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
// 死循环去尝试获取锁
while (true) {
// 再次尝试加锁
ttl = tryAcquire(leaseTime, unit, threadId);
// 如果ttl=null说明抢占锁成功
if (ttl == null) {
break;
}
// ttl 大于0,抢占锁失败,这个里面涉及到Semaphore,后续会讲解
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
}
Принцип открытия замка Redisson
Посмотрите прямо на код lua:
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 判断锁key值是否存在
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
// 判断当前机器、当前线程id对应的key是否存在
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// 计数器数量-1 可重入锁
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
// 如果计数器大于0,说明还在持有锁
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
// 使用del指令删除key
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
Суммировать
Резюме картины:
объявить
Эта статья была впервые опубликована в моем публичном аккаунте:Считается ли цветок романтичным?, если перепечатано, просьба указывать источник!
Заинтересованные друзья могут обратить внимание на личный публичный номер: Считается ли цветок романтичным?