[Распределенная блокировка] 01-Используйте Redisson для реализации принципа распределенной блокировки с повторным входом

распределенный

предисловие

Обычно существует три реализации основных распределенных блокировок:

  1. Оптимистическая блокировка базы данных
  2. Распределенная блокировка на основе Redis
  3. Распределенная блокировка на основе ZooKeeper

Я уже писал о конкретной схеме реализации распределенных блокировок mysql и redis в своем блоге:Блог woohoo.cn на.com/wang-meng/afraid…Это в основном начинается с принципа реализации.

В этой серии статей [Распределенная блокировка] в основном подробно рассматриваются принципы реализации двух распределенных блокировок клиента Redis, исходного кода Reddision и zk.

надежность

Во-первых, чтобы обеспечить доступность распределенных блокировок, мы должны как минимум убедиться, что реализация блокировки соответствует следующим четырем условиям:

  1. взаимная исключительность. В любой момент только один клиент может удерживать блокировку.
  2. Взаимной блокировки не произойдет. Даже если клиент выйдет из строя, удерживая блокировку, и не разблокирует ее активно, другие клиенты могут быть гарантированно заблокированы в будущем.
  3. Отказоустойчивой. Клиенты могут блокировать и разблокировать, пока большинство узлов Redis запущены и работают.
  4. Беда должна положить этому конец. Блокировку и разблокировку должен выполнять один и тот же клиент, и клиент не может разблокировать блокировки, добавленные другими.

Принцип блокировки Редиссона

redisson — очень мощная клиентская среда Redis с открытым исходным кодом, официальный адрес:redisson.org/

Он очень прост в использовании, настройке maven и информации о подключении, вот прямой взгляд на реализацию кода:

RLock lock = redisson.getLock("anyLock");

lock.lock();
lock.unlock();

Конкретная логика блокировки выполнения redisson дополняется сценарием lua, который может гарантировать атомарность.

Сначала посмотрите на код для инициализации RLock:

public class Redisson implements RedissonClient {
	
	@Override
    public RLock getLock(String name) {
        return new RedissonLock(connectionManager.getCommandExecutor(), name);
    }
}

public class RedissonLock extends RedissonExpirable implements RLock {
	public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
    super(commandExecutor, name);
    this.commandExecutor = commandExecutor;
    this.id = commandExecutor.getConnectionManager().getId();
    this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
    this.entryName = id + ":" + name;
}

Первый взглядRedissonLockid возвращает объект UUID, каждая машина соответствует своему атрибуту id,idЗначение похоже на: "8743c9c0-0795-4907-87fd-6c719a6b4586"

затем оглянуться назадlock()Реализация кода:

public class RedissonLock extends RedissonExpirable implements RLock {
	@Override
	public void lock() {
	    try {
	        lockInterruptibly();
	    } catch (InterruptedException e) {
	        Thread.currentThread().interrupt();
	    }
	}

	@Override
	public void lockInterruptibly() throws InterruptedException {
	    lockInterruptibly(-1, null);
	}

	@Override
	public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
		// 获取当前线程id
	    long threadId = Thread.currentThread().getId();
	    Long ttl = tryAcquire(leaseTime, unit, threadId);
	    // lock acquired
	    if (ttl == null) {
	        return;
	    }

	    RFuture<RedissonLockEntry> future = subscribe(threadId);
	    commandExecutor.syncSubscription(future);

	    try {
	        while (true) {
	            ttl = tryAcquire(leaseTime, unit, threadId);
	            // lock acquired
	            if (ttl == null) {
	                break;
	            }

	            // waiting for message
	            if (ttl >= 0) {
	                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
	            } else {
	                getEntry(threadId).getLatch().acquire();
	            }
	        }
	    } finally {
	        unsubscribe(future, threadId);
	    }
	}

	<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
	    internalLockLeaseTime = unit.toMillis(leaseTime);

	    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
	              "if (redis.call('exists', KEYS[1]) == 0) then " +
	                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
	                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
	                  "return nil; " +
	              "end; " +
	              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
	                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
	                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
	                  "return nil; " +
	              "end; " +
	              "return redis.call('pttl', KEYS[1]);",
	                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
	}
}

Некоторые промежуточные коды здесь опущены, и здесь мы в основном рассматриваемtryAcquire()метод, переданное здесь время истечения -1, затем идентификатор текущего потока, а затем основной процесс выполнения сценария lua, давайте посмотрим, как он выполняется шаг за шагом:

"if (redis.call('exists', KEYS[1]) == 0) then " +
  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  "return nil; " +
"end; " +

KEYS[1]Параметры: "любой замок"ARGV[2]это: "id + ":" + threadId"

впервые использованныйexistsОпределить, существует ли текущий ключ в redis, если нет, то он равен 0, а затем выполнитьhsetИнструкция по сохранению «anyLock id: threadId 1» в Redis, окончательные данные, хранящиеся в Redis, аналогичны:

{
  "8743c9c0-0795-4907-87fd-6c719a6b4586:1":1
}

По секрету, последняя 1 предназначена для подсчета статистики повторного входа позже, что будет объяснено позже.

Затем посмотрите вниз и используйтеpexpireУстановите время истечения срока действия, которое используется по умолчанию.internalLockLeaseTimeна 30с. Окончательный возврат равен нулю, и мгновенная блокировка прошла успешно.

Принцип реентерабельности Редисона

Давайте посмотрим, как один и тот же поток на той же машине блокируется, когда ключ блокировки существует?

"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  "return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",

ARGV[2]это: "id + ":" + threadId" Если та же машина и тот же поток снова придут на запрос, здесь будет 1, а затем выполнитьhincrby, значение +1, заданное параметром hset, становится равным 2, а затем продолжайте устанавливать время истечения срока действия.

Точно так же после повторного входа потока значение - 1, когда он разблокирован.

Принцип Redisson watchDog

Если сценарий: A и B сейчас выполняют бизнес, A добавил распределенную блокировку, но производственная среда меняется по-разному, если время ожидания блокировки A истекло, но бизнес A все еще работает. В это время, поскольку блокировка A снимается с течением времени, B получает блокировку, а B выполняет бизнес-логику. Делает ли это распределенные блокировки бессмысленными?

Поэтому Redisson вводит концепцию сторожевого таймера.Когда A получает блокировку для выполнения, если срок действия блокировки не истек, фоновый поток автоматически продлевает время истечения блокировки, чтобы предотвратить истечение срока действия блокировки, поскольку бизнес не был выполнен. .

Давайте посмотрим на конкретную реализацию:

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            if (!future.isSuccess()) {
                return;
            }

            Long ttlRemaining = future.getNow();
            // lock acquired
            if (ttlRemaining == null) {
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

когда мыtryLockInnerAsyncПосле выполнения будет добавлен слушатель, чтобы увидеть конкретную реализацию в слушателе:

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                "return 1; " +
            "end; " +
            "return 0;",
        Collections.<Object>singletonList(getName()), 
        internalLockLeaseTime, getLockName(threadId));
}

Здесь задача планирования выполняется каждые 10 сек. Сценарий lua — это время истечения срока действия обновления, так что блокировка, удерживаемая текущим потоком, не будет аннулирована из-за истечения времени истечения срока действия.

01_redisson watchdog_.png

Принцип взаимного исключения Redisson

Или посмотрите на lua-скрипт, который выполняет блокировку выше, и, наконец, он выполнится:

"return redis.call('pttl', KEYS[1]);",

Сколько времени действует блокировка возврата, давайте продолжим смотреть на код:

@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    // 返回ttl说明加锁成功,不为空则是加锁失败
    if (ttl == null) {
        return;
    }

    RFuture<RedissonLockEntry> future = subscribe(threadId);
    commandExecutor.syncSubscription(future);

    try {
    	// 死循环去尝试获取锁
        while (true) {
        	// 再次尝试加锁
            ttl = tryAcquire(leaseTime, unit, threadId);
            // 如果ttl=null说明抢占锁成功
            if (ttl == null) {
                break;
            }

            // ttl 大于0,抢占锁失败,这个里面涉及到Semaphore,后续会讲解
            if (ttl >= 0) {
                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                getEntry(threadId).getLatch().acquire();
            }
        }
    } finally {
        unsubscribe(future, threadId);
    }
}

Принцип открытия замка Redisson

Посмотрите прямо на код lua:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
		// 判断锁key值是否存在
        "if (redis.call('exists', KEYS[1]) == 0) then " +
            "redis.call('publish', KEYS[2], ARGV[1]); " +
            "return 1; " +
        "end;" +
        // 判断当前机器、当前线程id对应的key是否存在
        "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
            "return nil;" +
        "end; " +
        // 计数器数量-1 可重入锁
        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
        // 如果计数器大于0,说明还在持有锁
        "if (counter > 0) then " +
            "redis.call('pexpire', KEYS[1], ARGV[2]); " +
            "return 0; " +
        "else " +
        	// 使用del指令删除key
            "redis.call('del', KEYS[1]); " +
            "redis.call('publish', KEYS[2], ARGV[1]); " +
            "return 1; "+
        "end; " +
        "return nil;",
        Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}

Суммировать

Резюме картины:

01_redission 可重入锁实现原理.jpg

объявить

Эта статья была впервые опубликована в моем публичном аккаунте:Считается ли цветок романтичным?, если перепечатано, просьба указывать источник!

Заинтересованные друзья могут обратить внимание на личный публичный номер: Считается ли цветок романтичным?

22.jpg