последовательность
В этой статье в основном изучается распределенная блокировка Redisson.
maven
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.8.1</version>
</dependency>
пример
@Test
public void testDistributedLock(){
Config config = new Config();
// config.setTransportMode(TransportMode.EPOLL);
config.useSingleServer()
.setAddress("redis://192.168.99.100:6379");
RedissonClient redisson = Redisson.create(config);
IntStream.rangeClosed(1,5)
.parallel()
.forEach(i -> {
executeLock(redisson);
});
executeLock(redisson);
}
public void executeLock(RedissonClient redisson){
RLock lock = redisson.getLock("myLock");
boolean locked = false;
try{
LOGGER.info("try lock");
locked = lock.tryLock();
// locked = lock.tryLock(1,2,TimeUnit.MINUTES);
LOGGER.info("get lock result:{}",locked);
if(locked){
TimeUnit.HOURS.sleep(1);
LOGGER.info("get lock and finish");
}
}catch (Exception e){
e.printStackTrace();
}finally {
LOGGER.info("enter unlock");
if(locked){
lock.unlock();
}
}
}
Анализ исходного кода
RedissonLock.tryLock
redisson-3.8.1-sources.jar!/org/redisson/RedissonLock.java
@Override
public boolean tryLock() {
return get(tryLockAsync());
}
@Override
public RFuture<Boolean> tryLockAsync() {
return tryLockAsync(Thread.currentThread().getId());
}
@Override
public RFuture<Boolean> tryLockAsync(long threadId) {
return tryAcquireOnceAsync(-1, null, threadId);
}
private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Boolean ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
protected String getLockName(long threadId) {
return id + ":" + threadId;
}
private void scheduleExpirationRenewal(final long threadId) {
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}
if (future.getNow()) {
// reschedule itself
scheduleExpirationRenewal(threadId);
}
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) {
task.cancel();
}
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
- Если лизингтайм здесь не установлен, значение по умолчанию равно -1, используется commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), а значение по умолчанию равно 30 секундам.
- tryLockInnerAsync использует lua-скрипт, который имеет 3 параметра, первый параметр — это массив KEYS, а последние параметры — это элементы массива ARGV.
- Значением ключа здесь является имя redissonLock, указанное вызывающей стороной, две переменные, первая — LeezeTime, вторая — имя блокировки, использующее идентификатор redissonLock + идентификатор потока.
- Первый метод сценария lua оценивает, существует ли хэш-карта redissonLock, если нет, то она создается.Хеш-карта имеет запись, ключ которой является именем блокировки, значение равно 1, а затем устанавливает время истечения хэш-карты как LeaseTime.
- Второй метод lua-скрипта — увеличить значение имени блокировки на 1, когда существует хэш-карта redissonLock, и установить время истечения срока действия в лизинг.
- Наконец, верните ttl ключа имени redissonLock.
- После успешного выполнения определите, имеет ли еще значение ttl, и если да, вызовите scheduleExpirationRenewal, чтобы предотвратить сбой блокировки до ее выполнения.
- scheduleExpirationRenewal предназначен для регистрации задачи задержки, которая запускается, когда internalLockLeaseTime / 3. Выполняется метод renewExpirationAsync, который сбрасывает время истечения блокировки обратно на internalLockLeaseTime.
- ScheduleExpirationRenewal добавляет прослушиватель к задаче scheduleExpirationRenewal.Если параметр выполнен успешно, он снова рекурсивно вызовет scheduleExpirationRenewal для повторной регистрации отложенной задачи.
- Метод tryAcquireAsync (длинное время аренды, модуль TimeUnit, final long threadId) — это метод, вызываемый, когда указано время автоматической разблокировки. Разница между ним и tryAcquireOnceAsync заключается в том, что он использует длинное значение для оценки квадратного возвращаемого значения ttl. Если он имеет значение null, расширение будет выполнено.Временная задача времени истечения срока действия, а метод tryAcquireOnceAsync использует BooleanNullReplayConvertor, если возвращаемое значение не является нулевым, оно возвращает true
RedissonLock.unlock
redisson-3.8.1-sources.jar!/org/redisson/RedissonLock.java
@Override
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException)e.getCause();
} else {
throw e;
}
}
// Future<Void> future = unlockAsync();
// future.awaitUninterruptibly();
// if (future.isSuccess()) {
// return;
// }
// if (future.cause() instanceof IllegalMonitorStateException) {
// throw (IllegalMonitorStateException)future.cause();
// }
// throw commandExecutor.convertException(future);
}
@Override
public RFuture<Void> unlockAsync(final long threadId) {
final RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
cancelExpirationRenewal(threadId);
result.tryFailure(future.cause());
return;
}
Boolean opStatus = future.getNow();
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
if (opStatus) {
cancelExpirationRenewal(null);
}
result.trySuccess(null);
}
});
return result;
}
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
String getChannelName() {
return prefixName("redisson_lock__channel", getName());
}
void cancelExpirationRenewal(Long threadId) {
ExpirationEntry task = expirationRenewalMap.get(getEntryName());
if (task != null && (threadId == null || task.getThreadId() == threadId)) {
expirationRenewalMap.remove(getEntryName());
task.getTimeout().cancel();
}
}
- unlockInnerAsync снимает блокировку с помощью сценария lua, lua использует два ключа, один из них — имя redissonLock, а другой — имя канала.
- В этом lua используются три переменные: одна — unlockMessage pubSub, по умолчанию — 0, другая — internalLockLeaseTime, по умолчанию — commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), а другая — имя блокировки.
- Если redissonLock не существует, напрямую опубликуйте сообщение о разблокировке и верните 1, если блокировки не существует, верните nil;
- Если блокировка существует, она будет считаться -1.Если счетчик больше 0, следующее время истечения будет сброшено и будет возвращено 0, если счетчик не больше 0, блокировка redissonLock будет удалена, unlockMessage будет выдано, и будет возвращено 1; если ни одно из вышеперечисленных условий не выполнено, верните nil
- UnlockAsync регистрирует FutureListener с помощью unlockInnerAsync, в основном для вызова cancelExpirationRenewal и отмены задачи scheduleExpirationRenewal.
LockPubSub
redisson-3.8.1-sources.jar!/org/redisson/pubsub/LockPubSub.java
public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {
public static final Long unlockMessage = 0L;
@Override
protected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) {
return new RedissonLockEntry(newPromise);
}
@Override
protected void onMessage(RedissonLockEntry value, Long message) {
if (message.equals(unlockMessage)) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute != null) {
runnableToExecute.run();
}
value.getLatch().release();
}
}
}
- При получении сообщения unlockMessage будет вызван слушатель RedissonLockEntry, а затем будет запущен выпуск защелки.
- Метод tryAcquireOnceAsync не создает LockPubSub по умолчанию и не указывает время автоматической разблокировки, запланированная задача будет продолжать продлевать срок действия, что может привести к риску того, что блокировка не будет снята.
резюме
Существуют следующие меры предосторожности при блокировке:
- Для блокировки необходимо установить тайм-аут, чтобы предотвратить тупиковые ситуации.
- При блокировке и установке тайм-аута необходимо гарантировать атомарность двух операций, поэтому лучше всего использовать сценарии lua или использовать метод set, поддерживающий NX и EX.
- При блокировке вам необходимо записать информацию о вызывающей стороне блокировки, такую как идентификатор потока, который необходимо использовать при разблокировке.
- Для задач с неопределенной продолжительностью блокировки, чтобы предотвратить выполнение задачи и освобождение тайм-аута, необходимо увеличить время аннулирования для задач, которые еще не завершили выполнение.
Разблокировка имеет следующие меры предосторожности:
- Разблокировать последовательность операций (
判断key是否存在,存在的话删除key等
) должен гарантировать атомарность, поэтому лучше использовать lua-скрипты - Разблокировка должна определить, соответствует ли вызывающий абонент записи при блокировке, чтобы предотвратить удаление блокировки по ошибке.
- Если есть отложенная задача, которая продлевает срок действия, ее необходимо завершить, когда она будет разблокирована.
doc
- Распределенные блокировки за одну минуту
- Это настоящая распределенная блокировка
- Комикс: Что такое распределенная блокировка?
- Правильная реализация распределенных блокировок Redis (Java Edition)
- Безопасна ли распределенная блокировка на основе Redis (часть 1)?
- Безопасна ли распределенная блокировка на основе Redis (ниже)?
- Правильная реализация распределенной блокировки Redis
- Анализ распределенной блокировки Redisson