Реализация распределенной блокировки (1): Redis

Redis задняя часть сервер Lua

предисловие

单机环境下我们可以通过JAVA的Synchronized和Lock来实现进程内部的锁,但是随着分布式应用和集群环境的出现,系统资源的竞争从单进程多线程的竞争变成了多进程的竞争,这时候就需要分布式锁来保证。
实现分布式锁现在主流的方式大致有以下三种
1. 基于数据库的索引和行锁
2. 基于Redis的单线程原子操作:setNX
3. 基于Zookeeper的临时有序节点

这篇文章我们用Redis来实现,会基于现有的各种锁实现来分析,最后分享Redission的锁源码分析来看下分布式锁的开源实现

Реализация дизайна

замок

один,Реализовано через setNx и getSet

Это метод реализации большинства версий в Интернете, таким же образом реализованы и распределенные блокировки, использовавшиеся в предыдущих проектах автора.

public boolean lock(Jedis jedis, String lockName, Integer expire) {

     //返回是否设置成功
     //setNx加锁
     long now = System.currentTimeMillis();
     boolean result = jedis.setnx(lockName, String.valueOf(now + expire * 1000)) == 1;

     if (!result) {
         //防止死锁的容错
         String timestamp = jedis.get(lockName);
         if (timestamp != null && Long.parseLong(timestamp) < now) {
             //不通过del方法来删除锁。而是通过同步的getSet
             String oldValue = jedis.getSet(lockName, String.valueOf(now + expire));
             if (oldValue != null && oldValue.equals(timestamp)) {
                 result = true;
                 jedis.expire(lockName, expire);
             }
         }
     }
     if (result) {
         jedis.expire(lockName, expire);
     }
     return result;
 }

анализ кода:

  1. Атомарность операции гарантируется командой setNx, блокировка приобретается, а время истечения устанавливается равным значению

  2. Время истечения устанавливается методом expire.Если время истечения не может быть установлено, временная метка значения сравнивается с текущей временной меткой, чтобы предотвратить взаимоблокировку.

  3. С помощью команды getSet, когда обнаруживается, что срок действия блокировки истек и она не была снята, она избегает удаления блокировки, которая может быть получена другими потоками в этом процессе.

Существует проблема

  1. Решение для предотвращения взаимоблокировки определяется текущим системным временем, но системное время онлайн-сервера, как правило, одинаково, это не является серьезной проблемой.
  2. Когда блокировка истечет, несколько потоков могут выполнить команду getSet.В случае конкуренции временная метка значения будет изменена.Теоретически будут ошибки.
  3. У замка не может быть идентификатора клиента, и этот же ключ может быть удален другими клиентами при разблокировке.
  4. Хотя есть небольшие проблемы, в целом реализация этой распределенной блокировки в основном удовлетворяет требованиям, позволяет добиться взаимного исключения блокировок и избежать взаимоблокировок.

два,Атомарные команды через старшие версии Redis

Команда set jedis может вносить свои собственные сложные параметры, через которые может быть реализована команда атомарной распределенной блокировки.

 jedis.set(lockName, "", "NX", "PX", expireTime);

анализ кода

  1. Команда set redis может содержать сложные параметры, первый — это ключ блокировки, второй — значение, в котором может храниться идентификатор клиента, получившего блокировку, благодаря этому проверяется, получил ли текущий клиент блокировку, третий параметр — значение NX/XX, четвертый параметр EX|PX, пятый — время

  2. NX: установите этот ключ, если он не существует. XX: установите этот ключ, если он существует.

  3. EX: в секундах, PX: в миллисекундах

  4. Эта команда, по сути, является комбинацией наших предыдущих команд setNx и expire в команду атомарной операции, которая не требует от нас рассмотрения сбоя набора или сбоя срока действия.


разблокировать

один,Через команду Redis del

 	 public boolean unlock(Jedis jedis, String lockName) {
    	 jedis.del(lockName);
     	return true;
 }

анализ кода

Блокировка может быть удалена напрямую с помощью команды redis del, а блокировка, которая уже существует в других потоках, может быть удалена по ошибке.

два,проверка для Redis

public static void unlock2(Jedis jedis, String lockKey, String requestId) {
       
   // 判断加锁与解锁是不是同一个客户端
   if (requestId.equals(jedis.get(lockKey))) {
       // 若在此时,这把锁突然不是这个客户端的,则会误解锁
       jedis.del(lockKey);
   }

}

анализ кода

Добавлена ​​оценка идентификатора клиента requestId, но, поскольку это не атомарная операция, она не может гарантировать безопасность в случае одновременной конкуренции в нескольких процессах.

три,LUA-скрипт для Redis

public static boolean unlock3(Jedis jedis, String lockKey, String requestId) {

      String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
      Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(""));

      if (1L == (long) result) {
          return true;
      }
      return false;

  }

анализ кода

Чтобы обеспечить атомарность операции через скрипт Lua, на самом деле нужно объединить предыдущее суждение, а затем удалить в команду атомарного скрипта.Логика состоит в том, чтобы сначала решить, равно ли значение через get, и удалить, если оно равно, иначе вернуться напрямую.

Распределенный замок Редиссона

Redission是redis官网推荐的一个redis客户端,除了基于redis的基础的CURD命令以外,重要的是就是Redission提供了方便好用的分布式锁API

один,Основное использование

		RedissonClient redissonClient = RedissonTool.getInstance();

        RLock distribute_lock = redissonClient.getLock("distribute_lock");

        try {
            boolean result = distribute_lock.tryLock(3, 10, TimeUnit.SECONDS);
            
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (distribute_lock.isLocked()) {
                distribute_lock.unlock();
            }
        }

поток кода

  1. Получить экземпляр RLock через redissonClient
  2. Trylock пытается получить блокировку, первое время ожидания, второе время тайм-аута блокировки, третье единица времени
  3. После выполнения бизнес-логики блокировка, наконец, снимается.

два,Выполнение

Мы используем tryLock для анализа распределенной реализации редиссии.Метод блокировки похож на tryLock, за исключением того, что нет настройки самого длительного времени ожидания.Он будет вращаться и ждать снятия блокировки, пока блокировка не будет получена.

 		long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        //获取当前线程ID,用于实现可重入锁
        final long threadId = Thread.currentThread().getId();
        //尝试获取锁
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }
        
        time -= (System.currentTimeMillis() - current);
        if (time <= 0) {
        	//等待时间结束,返回获取失败
            acquireFailed(threadId);
            return false;
        }
        
        current = System.currentTimeMillis();
        //订阅锁的队列,等待锁被其余线程释放后通知
        final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
                    @Override
                    public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                        if (subscribeFuture.isSuccess()) {
                            unsubscribe(subscribeFuture, threadId);
                        }
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }

        try {
            time -= (System.currentTimeMillis() - current);
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
        
            while (true) {
                long currentTime = System.currentTimeMillis();
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }

                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }

                // waiting for message,等待订阅的队列消息
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(subscribeFuture, threadId);
        }

анализ кода

  1. Сначала tryAcquire пытается получить блокировку.Если возвращенный ttl равен нулю, это означает, что блокировка была получена.

  2. Определить, истекло ли время ожидания, и если оно истекло, возвратить ошибку получения блокировки напрямую.

  3. Подпишитесь на очередь прослушивания через канал Redis, подпишитесь внутренне через семафор семафора, а затем заблокируйте с помощью метода ожидания.Внутренне для достижения блокировки используется CountDownLatch, а результат асинхронного выполнения подписки получается для обеспечения успеха подписки , а затем определить, истекло ли время ожидания.

  4. Попробуйте еще раз подать заявку на блокировку и оцените время ожидания. Здесь цикл блокирует сообщение, ожидая снятия блокировки. RedissonLockEntry также поддерживает семафорный семафор

  5. Независимо от того, снята блокировка или нет, сообщение очереди в конечном итоге должно быть отписано.

  6. getEntityName внутри redisson — это имя блокировки идентификатора экземпляра клиента, чтобы гарантировать, что блокировки в нескольких экземплярах могут быть повторно введены.


tryAcquire получает блокировку

Основной код перераспределения для получения блокировки на самом деле является внутренним асинхронным вызовом, но он блокируется методом get.

 private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
        return get(tryAcquireAsync(leaseTime, unit, threadId));
    }
 private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.addListener(new FutureListener<Long>() {
            @Override
            public void operationComplete(Future<Long> future) throws Exception {
                if (!future.isSuccess()) {
                    return;
                }

                Long ttlRemaining = future.getNow();
                // lock acquired
                if (ttlRemaining == null) {
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }
  1. Внутренняя часть метода tryLockInnerAsync основана на скрипте Lua для получения блокировки.

    • Сначала определите, существует ли ключ, соответствующий KEYS[1] (имя блокировки), и блокировка не существует, hset устанавливает значение ключа, pexpire устанавливает время истечения срока действия и возвращает null, чтобы указать, что блокировка была получена.
    • Если он существует, блокировка занята, и hexists определяет, является ли она блокировкой текущего потока.Если да, hincrby увеличивает количество повторных входов, сбрасывает время истечения, а не блокировку текущего потока, и возвращает время истечения текущего замка.
     <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
            internalLockLeaseTime = unit.toMillis(leaseTime);
    
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                      "if (redis.call('exists', KEYS[1]) == 0) then " +
                          "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                      "end; " +
                      "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                      "end; " +
                      "return redis.call('pttl', KEYS[1]);",
                        Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
        }
    
  2. Решение Redission, чтобы избежать взаимоблокировки:

    Чтобы блокировка не была снята, Redisson применяет специальное решение: если время истечения не установлено, время истечения срока действия по умолчанию составляет 30 с. В то же время не исключено, что блокировка будет снята заранее. до того, как бизнес будет обработан Redisson получает блокировку Когда установлено время истечения срока действия по умолчанию, запланированная задача будет запущена внутри текущего клиента, а время истечения срока действия ключа будет обновляться каждый раз internalLockLeaseTime/3, что не только предотвращает раннее освобождение блокировки, но также предотвращает снятие блокировки в случае сбоя клиента. Время, которое сохраняется до 30 секунд, будет автоматически освобождено (запланированный процесс задачи, который обновляет время истечения срока действия, также не работает)

     			// lock acquired,获取到锁的时候设置定期更新时间的任务
                    if (ttlRemaining) {
                        scheduleExpirationRenewal(threadId);
                    }
                    
                    //expirationRenewalMap的并发安全MAP记录设置过的缓存,避免并发情况下重复设置任务,internalLockLeaseTime / 3的时间后重新设置过期时间
                       private void scheduleExpirationRenewal(final long threadId) {
            if (expirationRenewalMap.containsKey(getEntryName())) {
                return;
            }
    
            Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
                @Override
                public void run(Timeout timeout) throws Exception {
                    
                    RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                                "return 1; " +
                            "end; " +
                            "return 0;",
                              Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
                    
                    future.addListener(new FutureListener<Boolean>() {
                        @Override
                        public void operationComplete(Future<Boolean> future) throws Exception {
                            expirationRenewalMap.remove(getEntryName());
                            if (!future.isSuccess()) {
                                log.error("Can't update lock " + getName() + " expiration", future.cause());
                                return;
                            }
                            
                            if (future.getNow()) {
                                // reschedule itself
                                scheduleExpirationRenewal(threadId);
                            }
                        }
                    });
                }
            }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
            if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
                task.cancel();
            }
        }
    

    разблокироватьразблокировать

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                    "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                    "end;" +
                    "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                    "end; " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                    "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; "+
                    "end; " +
                    "return nil;",
                    Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
    
        }
    

    Разблокировка Redission также реализована на основе скрипта Lua.Внутренняя логика заключается в том, чтобы сначала определить, существует ли блокировка.Если она не существует, значит, она снята.После выпуска сообщения о снятии блокировки происходит возврат.После существования блокировки , он определяет, является ли текущий поток владельцем блокировки.Если нет, то нет Если право освобождено и возвращено, если оно разблокировано, количество повторных входов будет вычтено, а время истечения срока действия будет повторно обновлено.

    написать на обороте

    Он в основном основан на Redis для разработки и реализации распределенных блокировок. Он распространяется на реализацию Redission с помощью общих дизайнерских идей. Будь то дизайнерские идеи или надежность кода, дизайн Redission превосходен и заслуживает изучения. Следующий шаг объяснит распределенный дистрибутив Zookeeper.Lock реализация и соответствующий анализ исходного кода с открытым исходным кодом.