Автор писал о распределенных блокировках и раньше, но в то время исследование распределенных блокировок было не слишком глубоким, но теперь я обнаружил, что некоторые проблемы все еще существуют, поэтому я написал эту статью, чтобы заняться реализацией распределенных блокировок Redis. всесторонняя и расширенная разработка и краткое изложение помогут читателям получить более глубокое и объективное представление о распределенных блокировках Redis. Другие реализации более распределенных блокировок будут раскрыты в последующих статьях.
Мы по-прежнему используем классический силлогизм WWH (что, почему, как). Прежде всего, давайте разберемся, что такое распределенная блокировка, ее ограничения и общие методы реализации с точки зрения макросов.
Распределенная блокировка
В этой части в основном делается более полный обзор и сводка по распределенным блокировкам.
Что такое распределенная блокировка
Цитируя запись Du Niang, объяснение распределенных блокировок выглядит следующим образом:
Этот абзац является хорошим подведением итогов.Согласно обзору и нашему пониманию одномашинных блокировок, мы можем выделить и проанализировать несколько основных ограничений распределенных блокировок:
Ограничения распределенной блокировки
Функции | описывать |
---|---|
взаимная исключительность | То есть: в любой момент только один клиент может удерживать блокировку |
безопасность | То есть взаимоблокировки не будет.Когда клиент не может разблокировать блокировку из-за случайного сбоя в течение периода удержания блокировки, блокировка, которую он держит, также может быть правильно снята, и гарантируется, что другие клиенты также будут следовать костюм можно запирать; |
Доступность | То есть распределенные блокировки должны иметь определенную способность восстанавливаться после сбоев, а механизм высокой доступности может гарантировать, что услуги могут быть предоставлены в максимальной степени в случае сбоев, и нет единой точки риска. Например: обеспечить высокую доступность с помощью режима кластера и дозорного режима Redis; возможность выбора мастера кластера ETCD/zookeeper и т. д. |
симметрия | Для любой блокировки она должна быть заблокирована и разблокирована одним и тем же клиентом, то есть клиент A не может разблокировать блокировку, добавленную клиентом B. Это также известно как повторный вход в блокировку. |
Исходя из вышеперечисленных характеристик, общие методы реализации прямо приведены здесь. В предыдущих статьях автора также есть подробные описания этих распространенных методов реализации. Это просто обобщение и не будет расширяться. Заинтересованные студенты могут проверить историю блога по самих себя.
Общие реализации распределенных блокировок
категория | Пример |
---|---|
Реализовать через базу данных | Например: с использованием оптимистической блокировки, пессимистической блокировки или на основе реализации уникального ограничения первичного ключа. |
Служба блокировки на основе реализации распределенного кэша | Например: Redis и RedLock на основе Redis (Redisson предоставляет эталонную реализацию) |
Служба блокировки на основе алгоритма распределенной согласованности | Например: ZooKeeper, Chubby (реализация Google с закрытым исходным кодом) и т. д. |
После краткого изложения концепции распределенных блокировок мы перейдем к теме этой статьи и обсудим механизм Redis для реализации распределенных блокировок.
Принцип распределенной блокировки Redis
В этой части обсуждается, как Redis реализует распределенные блокировки.
Инструкция ядра распределенной блокировки Redis: блокировка
Поскольку это блокировка, основная операция — это не что иное, как блокировка и разблокировка.Во-первых, давайте посмотрим, какая инструкция Redis используется для выполнения операции блокировки.
SET lock_name my_random_value NX PX 30000
Смысл этой директивы в том, чтобы установить значение ключа, когда ключ "lock_name" не существует, со временем истечения 30 секунд. Мы можем получить функцию блокировки с помощью этой команды.
Вот более подробное объяснение команды.
Формат команды:
SET KEY VALUE [EX seconds] [PX milliseconds] [NX|XX]
- EX секунд — установите время истечения срока действия (в секундах).
- PX миллисекунды — установите время истечения в миллисекундах.
- NX - Установить ключ, только если ключ не существует.
- XX - Устанавливается только в том случае, если ключ уже существует.
Наша цель — сделать блокировку взаимоисключающей, поэтому с помощью параметра NX блокировка может быть успешно установлена только тогда, когда блокировка не существует.
Анализ параметров блокировки
Вернемся назад и посмотрим на полный пример блокировки:
SET lock_name my_random_value NX PX 30000
- lock_name — это имя распределенной блокировки.Для Redis lock_name — это ключ в ключе-значении и является уникальным.
- my_random_value — случайная строка, сгенерированная клиентом, которая гарантированно будет уникальной в течение достаточно длительного периода времени и среди всех запросов на получение блокировки от всех клиентов, чтобы однозначно идентифицировать владельца блокировки.
- NX означает, что SET может быть успешным только в том случае, если имя_лока (ключ) не существует, что гарантирует, что только один клиент может получить блокировку, а другие клиенты не могут получить блокировку, пока блокировка не будет снята.
- PX 30000 указывает, что этот узел блокировки имеет автоматическое время истечения срока действия, равное 30 секундам (цель состоит в том, чтобы предотвратить сбой клиента, удерживающего блокировку, и неспособность активно снять блокировку и вызвать взаимоблокировку, поэтому держатель блокировки должен находиться в пределах времени истечения срока действия. выполните соответствующие операции и снимите блокировку).
Инструкция ядра распределенной блокировки Redis: разблокировать
Разблокировку можно запустить с помощью команды del.Полная команда выглядит следующим образом:
del lock_name
Объяснение этой команды:
- Установите время истечения срока действия блокировки при блокировке.Когда время истечения срока действия наступит, Redis автоматически удалит соответствующий ключ-значение, чтобы избежать взаимоблокировки.
- Обратите внимание, что это время истечения необходимо установить в сочетании с всесторонней оценкой конкретного бизнеса, чтобы гарантировать, что владелец замка может выполнить соответствующие операции и снять блокировку в течение времени истечения срока действия.
- После завершения нормального выполнения время истечения блокировки еще не наступило, и блокировка активно снимается через del lock_name.
Выше приведены основные инструкции по реализации возможностей распределенной блокировки на основе Redis. Давайте рассмотрим распространенный случай реализации ошибки.
Распространенные случаи ошибок распределенных блокировок Redis: setNx
Сначала посмотрите на кусок кода Java:
Jedis jedis = jedisPool.getResource();
// 如果锁不存在则进行加锁
Long lockResult = jedis.setnx(lockName, myRandomValue);
if (lockResult == 1) {
// 设置锁过期时间,加锁和设置过期时间是两步完成的,非原子操作
jedis.expire(lockName, expireTime);
}
Роль метода setnx() — УСТАНОВИТЬ, ЕСЛИ НЕ СУЩЕСТВУЕТ, а метода expire() — добавить к блокировке время истечения срока действия. На первый взгляд этот код кажется правильным, но после тщательного изучения вы можете увидеть, что здесь действительно есть проблема: блокировка на самом деле использует две команды Redis, и эта комбинированная операция не является атомарной.
Если выполнение setNx успешно, возникает исключение при выполнении expire, и время истечения срока действия блокировки не может быть установлено, что приведет к тому, что у блокировки не будет срока действия. Если в последующем процессе выполнения возникает исключение бизнес-выполнения или FullGC, блокировка не будет снята последовательно, что приведет к взаимоблокировке.
Этот относительно простой метод реализации используется во многих блогах в Интернете, и ему не рекомендуется следовать.
Причина в том, что хотя setNx сам по себе может гарантировать атомарность установленного значения, при его использовании в сочетании с expire вся операция (блокировка и установка времени истечения) не является атомарной, что скрывает риск взаимоблокировки.
Элегантное решение для разблокировки
После разговора о блокировке, давайте поговорим о том, как выполнить элегантную и надежную разблокировку.
Здесь есть два варианта:
- Разблокировка через Lua-скрипт
- Используя функцию транзакций Redis, используя функцию транзакций Redis и используя команду Watch для отслеживания ключа, соответствующего замку, для обеспечения надежной разблокировки.
1. Используйте скрипт Lua для разблокировки
Давайте взглянем на объяснение атомарности скрипта на официальном сайте:
Давайте посмотрим на код разблокировки, реализованный сценарием Lua;
String script = "if redis.call('get', KEYS[1]) == ARGV[1]
then return redis.call('del', KEYS[1])
else return 0
end";
Некоторые читатели и друзья могут мало знать о Lua-скриптах, вот краткое введение в значение этого скрипта:
Мы используем Rediseval()Функция выполняет Lua-скрипт, где входной параметр lockName присваивается параметру KEYS[1], конкретное значение блокировки присваивается ARGV[1], а функция eval() передает Lua-скрипт на сервер Redis. для исполнения. Как видно из скриншота документа официального сайта Redis выше, когда код Lua выполняется через eval(), код Lua будет выполняться как команда (для обеспечения атомарности), а Redis не будет выполнять другие команды до тех пор, пока не будет выполнено eval(). команда выполняется. Следовательно, комбинируя сценарий Lua с функцией eval, атомарность операции разблокировки может быть реализована с научной точки зрения, чтобы избежать случайной разблокировки.
Код версии Java, реализованный с помощью Jedis, выглядит следующим образом:
Long unlock = 1L;
Jedis jedis = null;
// Lua脚本,用于校验并释放锁
String script = "if redis.call('get', KEYS[1]) == ARGV[1]
then return redis.call('del', KEYS[1])
else
return 0 end";
try {
jedis = jedisPool.getResource();
// 通过 Redis 的 eval() 函数执行 Lua 脚本,
// 入参 lockName 赋值给参数 KEYS[1],myRandomValue 赋值给 ARGV[1],
// eval() 函数将 Lua 脚本交给 Redis 服务端执行。
Object result =
jedis.eval(script,
Collections.singletonList(lockName),
Collections.singletonList(myRandomValue));
// 注意:如果脚本顺利执行将返回1,
// 如果执行脚本时,其它的客户端对这个lockName对应的值进行了更改
// 则返回0
if (unlock.equals(result) {
return true;
}
}
catch (Exception e) {
throw e;
}
finally {
if (null != jedis) {
jedis.close();
}
}
return false;
2. Используйте транзакцию Redis для разблокировки
Во-первых, давайте взглянем на кодовую реализацию разблокировки с помощью транзакций Redis:
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
// 监控锁对应的Key,如果其它的客户端对这个Key进行了更改,那么本次事务会被取消。
jedis.watch(lockName);
// 成功获取锁,则操作公共资源执行自定义流程
// ...自定义流程代码省略...
// 校验是否持有锁
if (lockValue.equals(jedis.get(lockName))) {
// 开启事务功能,
Transaction multi = jedis.multi();
// 释放锁
multi.del(lockName);
// 执行事务(如果其它的客户端对这个Key进行了更改,那么本次事务会被取消,不会执行)
// 如果正常执行,由于只有一个删除操作,返回的list将只有一个对象。
List<Object> result = multi.exec();
if (RELEASE_SUCCESS.equals(result.size())) {
return true;
}
}
}
catch (Exception e) {
throw e;
}
finally {
if (null != jedis) {
jedis.unwatch();
jedis.close();
}
}
В соответствии с реализацией кода мы суммируем шаги по мониторингу и снятию блокировок с помощью функции транзакций Redis:
- сначала черезWatchКоманда отслеживает ключ (lockName), соответствующий замку. Если при открытии транзакции другие клиенты внесут изменения в Ключ, транзакция будет отменена и не будет выполнена.jedis.watch(lockName).
- Включите функцию транзакции, код:jedis.multi()
- Выполните операцию снятия блокировки. Когда транзакция открыта, операция снятия блокировки является элементом транзакции и принадлежит транзакции, код:multi.del(lockName);
- Выполните транзакцию, код:multi.exec();
- Наконец, ресурсы освобождены, кодjedis.unwatch();jedis.close();
Распространенный неправильный метод разблокировки
Здесь мы сосредоточимся на распространенном неправильном методе разблокировки для предупреждения.
Сначала посмотрите на реализацию кода:
Jedis jedis = jedisPool.getResource();
jedis.del(lockName);
Этот метод использует непосредственноjedis.del()Метод снимает блокировку и не проверяет. Этот грубый способ прямой разблокировки без проверки владельца блокировки приведет к тому, что существующая блокировка будет снята по ошибке, тем самым разрушив взаимное исключение (например, процесс напрямую разблокирует блокировку другого процесса через эту сторону))
Так как его оптимизировать? Один из способов — проверить перед разблокировкой, чтобы определить, являются ли заблокированный и разблокированный клиенты одним и тем же клиентом. код показывает, как показано ниже:
Jedis jedis = jedisPool.getResource();
if (lockValue.equals(jedis.get(lockName))) {
jedis.del(lockName);
}
По сравнению с грубым методом, описанным выше, этот метод разблокировки был значительно улучшен и проверен перед разблокировкой. Но проблема не решена, и весь процесс разблокировки по-прежнему состоит из двух независимых команд, а не атомарных операций.
Более важным моментом является то, что если клиент заблокирован из-за исключений (таких как исключения бизнес-кода, феномен остановки мира, вызванный FullGC и т. д.), когда выполняется операция разблокировки, а блокировки автоматически снимаются после истечения срока действия, текущий Клиент больше не удерживает блокировку.
Когда процесс возобновляет выполнение, он напрямую вызывает блокировку, не проверяя удержание блокировки (то есть процесс думает, что он все еще удерживает блокировку)del(lockName)Непосредственная разблокировка существующей блокировки, что приводит к ненормальному явлению, когда блокировки, удерживаемые другими процессами, разблокируются между процессами. Эта ситуация недопустима и нарушает принцип взаимного исключения.
резюме этапа
Выше мы узнали о принципе реализации распределенных блокировок на базе Redis, а также узнали о проблемах, которые необходимо решить для реализации распределенной блокировки Redis.
Мы чувствуем, что реализация надежной распределенной блокировки — непростая задача.
В дополнение к упомянутым выше явлениям, даже если наш код реализован очень надежно, при использовании кластера Redis с архитектурой master-slave все равно будут возникать аномалии:
Для режима архитектуры асинхронной репликации master-slave, когда главный узел не работает, данные подчиненного узла не были синхронизированы во времени.В это время процесс обращается к подчиненному компьютеру и определяет, что его можно заблокировать, поэтому блокировка получена, что приводит к нескольким процессам Аномалия получения блокировки.
Итак, существует ли более надежная, надежная и простая в использовании реализация блокировки Redis? Ответ очевиден, далее будет объяснена реализация распределенной блокировки Redisson.
О том, как инкапсулировать готовый компонент распределенной блокировки на основе Redisson, вы можете перейти к другой моей статье:«Напишите свой собственный распределенный замок на основе переиздания», В этой статье я делаю только глубокий анализ реализации распределенной блокировки Redisson. Пожалуйста, прочитайте мой пост в блоге, чтобы узнать о конкретном использовании и процессе упаковки.
По поводу распределенной блокировки Redisson более подробные официальные документы есть на github.Распределенные блокировки и синхронизаторы, Мы выбираем ключевые моменты, чтобы объяснить здесь.
Часть приведенного ниже кода цитируется из официальных документов, и здесь делается унифицированное заявление.
Распределенный замок Redisson
В этой части представлено более подробное введение в распределенные блокировки Redisson.
Распределенная блокировка Redisson — блокировка с повторным входом
Распределенная реентерабельная блокировка Redisson RLock Java-объект на основе Redis реализует интерфейс java.util.concurrent.locks.Lock. Он также предоставляет асинхронный (Async), отражающий (Reactive) и стандартный интерфейс RxJava2.
Обычное использование выглядит следующим образом:
RLock lock = redisson.getLock("anyLock");
// 最常见的使用方法
lock.lock();
Когда узел Redisson, на котором хранится распределенная блокировка, выходит из строя, и блокировка оказывается заблокированной, возникает тупиковая ситуация. Чтобы избежать взаимоблокировок такого типа, в Redisson предусмотрен внутренний сторожевой таймер, который отслеживает блокировки.Его функция заключается в предоставлении возможности продления блокировки и постоянном продлении срока действия блокировок до закрытия экземпляра Redisson.
По умолчанию время ожидания блокировки проверки сторожевого таймера составляет 30 секунд. Это конкретное значение можно указать, изменив Config.lockWatchdogTimeout.
Redisson также предоставляет интерфейс для явного указания времени истечения срока действия блокировки, по истечении которого блокировка будет автоматически разблокирована.Код выглядит следующим образом:
// 显式制定解锁时间,无需调用unlock方法手动解锁
lock.lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
lock.unlock();
}
Redisson также предоставляет асинхронный метод выполнения распределенной блокировки.Поскольку он мало используется, он не будет здесь повторяться.Заинтересованные студенты могут самостоятельно проверить официальные документы.
Здесь следует добавить, что одним из преимуществ реализации распределенной блокировки Redisson является то, что ее объект RLock полностью соответствует спецификации блокировки Java. RLock реализует интерфейс блокировки JUC. Причина, по которой она называется реентерабельной блокировкой, заключается в том, что только блокировка имеет блокировку. , Процесс может быть разблокирован только, и когда другие процессы разблокированы, будет выдана ошибка IllegalMonitorStateException.
Это видно из объявления исходного кода RLock
public interface RLock extends Lock, RLockAsync {
......
В следующей статье я предложу читателю более подробную интерпретацию исходного кода реализации RLock. Давайте сначала посмотрим на остальную часть реализации блокировки.
Распределенный замок Redisson — Fair Lock
Распределенная реентерабельная справедливая блокировка Redisson, основанная на Redis, также является объектом RLock, который реализует интерфейс java.util.concurrent.locks.Lock. Он также предоставляет асинхронный (Async), отражающий (Reactive) и стандартный интерфейс RxJava2. Это гарантирует, что когда несколько клиентских потоков Redisson запрашивают блокировки одновременно, приоритет отдается потоку, который первым выдал запрос. Все потоки запросов будут помещены в очередь в очереди.Когда поток не работает, Redisson будет ждать 5 секунд, прежде чем перейти к следующему потоку, то есть, если перед ними ожидают 5 потоков, то последний поток будет ждать не менее 25 секунд.
Обычное использование честной блокировки Redisson выглядит следующим образом:
RLock fairLock = redisson.getFairLock("anyLock");
// 最常见的使用方法
fairLock.lock();
Реализация справедливой блокировки также имеет возможность автоматического продления контракта, что также реализуется сторожевым таймером, точно таким же, как и упомянутая выше повторная блокировка RLock. Типы блокировок, упомянутые ниже, также имеют эту возможность, поэтому я не буду их повторять, читатели просто должны помнить, что эти типы блокировок могут автоматически продлевать блокировку через сторожевой таймер, а сторожевой таймер проверяет время ожидания блокировки по умолчанию на 30 с. Параметр можно настроить, изменив Config.lockWatchdogTimeout.
Справедливые блокировки также могут явно указывать продолжительность блокировки:
// 10秒钟以后自动解锁
// 无需调用unlock方法手动解锁
fairLock.lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
...
fairLock.unlock();
Распределенный замок Redisson — Interlock (MultiLock)
Распределенный блокирующий объект Redisson MultiLock, основанный на Redis, может связывать несколько объектов RLock в блокировку, и каждый экземпляр объекта RLock может поступать из разных экземпляров Redisson.
Этот тип блокировки довольно интересен. Он предоставляет нам механизм многократной блокировки. Когда все блокировки успешно заблокированы, это считается успешным. Код вызова выглядит следующим образом. )
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 所有的锁都上锁成功才算成功。
lock.lock();
...
lock.unlock();
Распределенный замок Redisson — Red Lock (RedLock)
Red lock — это высокодоступная распределенная реализация блокировки, реализованная Redisson, так что вот более подробная разработка red lock.
Redisson Red Lock Объект RedissonRedLock реализует алгоритм блокировки, представленный Redlock. Этот объект также можно использовать для связывания нескольких объектов RLock в качестве красного замка, и каждый экземпляр объекта RLock может исходить из другого экземпляра Redisson.
Основываясь на приведенном выше обзоре красного замка, мы можем знать, что красный замок является составным замком, и каждый экземпляр замка находится в другом экземпляре Redisson.
Взгляните на пример использования красного замка:
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 红锁在大部分节点上加锁成功就算成功。
lock.lock();
...
lock.unlock();
Красный замок также может отображать назначенное время блокировки:
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 给lock1,lock2,lock3加锁,如果没有手动解开的话,10秒钟后将会自动解开
lock.lock(10, TimeUnit.SECONDS);
// 为加锁等待100秒时间,并在加锁成功10秒钟后自动解开
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();
Вот пример скриншота реализации алгоритма красного замка на официальном сайте:
Из него мы можем извлечь ключевые моменты реализации красного замка:Если более половины узлов успешно получают блокировку, блокировка считается успешной, и если время ожидания узла истекает, он переходит к следующему, чтобы продолжить получение.
Вот общая идея для решения проблемы согласованности в распределенном домене:мнение большинства. Эта идея отражена в алгоритме Raft, алгоритме Zab и алгоритме Paxos.
Распределенная блокировка Redisson — ReadWriteLock (ReadWriteLock)
Redisson также реализует интерфейс java.util.concurrent.locks.ReadWriteLock, что позволяет считывать и записывать блокировки. Среди них как блокировки чтения, так и блокировки записи наследуют интерфейс RLock.
Как и вышеперечисленные блокировки, блокировки чтения-записи также распространяются.
Распределенные реентерабельные блокировки чтения-записи позволяют одновременно блокировать несколько блокировок чтения и одну блокировку записи.
Обычное использование выглядит следующим образом:
RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常见的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();
По соглашению, давайте посмотрим на метод вызова блокировки чтения-записи, который явно указывает продолжительность блокировки:
// 10秒钟以后自动解锁
// 无需调用unlock方法手动解锁
rwlock.readLock().lock(10, TimeUnit.SECONDS);
// 或
rwlock.writeLock().lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
// 或
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();
Redisson также реализует компоненты распределенного синхронизатора AQS, такие как: распределенный семафор (RSemaphore), распределенный семафор строки с истекающим сроком действия (RPermitExpirableSemaphore), распределенная защелка (RCountDownLatch) и т. д. Поскольку в этой статье в основном говорится о разблокировке связанного контента, поэтому дальнейшее введение не будет Заинтересованные студенты могут самостоятельно просмотреть официальные документы и исходный код.
Анализ исходного кода распределенной блокировки Redisson
В этой главе я сосредоточусь на обсуждении на уровне исходного кода механизма реализации повторной блокировки (RLock) в Redisson.
структура исходного кода
Мы загружаем последний код Redisson из официального репозитория Redisson на github и импортируем его в IDEA для просмотра Структура исходного кода выглядит следующим образом:
Исходный код части распределенной блокировки реализован по следующему пути
redisson-master
|-redisson
|-src
|-main
|-java
|-org.redisson
Мы можем просматривать исходный код ключа, расширяя его уровень за уровнем, поэтому не будем нести чушь, а просто посмотрим на код напрямую.
Анализ исходного кода
То, как автор смотрит на исходный код, тоже должно быть близко к мейнстриму.Я обычно начинаю с демки и читаю ее слой за слоем от входа в код.Сначала находим демку реентерабельной блокировки.
RLock lock = redisson.getLock(lockName);
boolean getLock = false;
try {
getLock = rLock.tryLock(0, expireSeconds, TimeUnit.SECONDS);
if (getLock) {
LOGGER.info("获取Redisson分布式锁[成功],lockName={}", lockName);
} else {
LOGGER.info("获取Redisson分布式锁[失败],lockName={}", lockName);
}
} catch (InterruptedException e) {
LOGGER.error("获取Redisson分布式锁[异常],lockName=" + lockName, e);
e.printStackTrace();
return false;
}
return getLock;
Этот код перехвачен из распакованного автором компонента распределенного замка Текущее количество звездочек 92.Адрес источника, Если вам интересно, вы можете дать мне звезду, ха-ха.
Во-первых, поredisson.getLock(lockName);Получите экземпляр блокировки RLock. LockName обычно представляет собой распределенный ключ блокировки с бизнес-идентификатором.
Получить экземпляр RLock
Давайте сначала посмотрим, как получить экземпляр RLock:
Войдите в класс Redisson.java и найдите следующий код:
@Override
public RLock getLock(String name) {
return new RedissonLock(connectionManager.getCommandExecutor(), name, id);
}
Идентификатор здесь UUID.
protected final UUID id = UUID.randomUUID();
Вы можете видеть, что вызывается перегруженный метод, щелкните его и перейдите в RedissonLock.java. Через объявление класса вы можете увидеть, что класс реализует интерфейс RLock. Методы объявления и построения следующие:
public class RedissonLock extends RedissonExpirable implements RLock {
...省略部分代码...
protected static final LockPubSub PUBSUB = new LockPubSub();
final CommandAsyncExecutor commandExecutor;
public RedissonLock(CommandAsyncExecutor commandExecutor, String name, UUID id) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = id;
// 看门狗锁续约检查时间周期,默认30s
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
}
Экземпляр RedissonLock создается с помощью этого метода построения, где internalLockLeaseTime — это время ожидания контрольной блокировки сторожевого таймера, а значение по умолчанию — 30 с. Этот параметр можно указать с новым значением, изменив Config.lockWatchdogTimeout.
Логика блокировки tryLock
Когда экземпляр блокировки получен успешно, попробуйте операцию блокировки.Код выглядит следующим образом:
boolean getLock = rLock.tryLock(0, expireSeconds, TimeUnit.SECONDS);
Введите RedissonLock.java, чтобы увидеть реализацию.
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
final long threadId = Thread.currentThread().getId();
// 申请锁,返回还剩余的锁过期时间
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
// 如果ttl为空,表示锁申请成功
if (ttl == null) {
return true;
}
time -= (System.currentTimeMillis() - current);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
current = System.currentTimeMillis();
// 订阅监听redis的消息,并创建RedissonLockEntry
// 其中,RedissonLockEntry中比较关键的是一个Semaphore
// 属性对象用来控制本地的锁的请求的信号量同步,返回Netty框架的Future
final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
// 阻塞等待subscribe的future的结果对象,如果subscribe方法调用超过了time,
// 说明已经超过了客户端设置的最大的wait time,直接返回false,取消订阅,并且不会再继续申请锁
if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
@Override
public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
if (subscribeFuture.isSuccess()) {
unsubscribe(subscribeFuture, threadId);
}
}
});
}
acquireFailed(threadId);
return false;
}
try {
time -= (System.currentTimeMillis() - current);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
while (true) {
long currentTime = System.currentTimeMillis();
// 再次尝试申请一次锁
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
// 获得锁并返回
if (ttl == null) {
return true;
}
time -= (System.currentTimeMillis() - currentTime);
if (time <= 0) {
// 不等待申请锁并返回
acquireFailed(threadId);
return false;
}
// waiting for message
// 阻塞等待锁
currentTime = System.currentTimeMillis();
// 通过信号量(共享锁)进行阻塞,等待解锁消息
// 如果剩余时间 TTL 小于wait time,就在ttl时间内
// 从Entry的信号量获取一个许可(除非发生中断或者一直不存在可用的许可)
// 否则就在wait time时间范围内等待可以通过的信号量
if (ttl >= 0 && ttl < time) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
// 更新等待时间,(最大等待时间-已经消耗的阻塞时间)
time -= (System.currentTimeMillis() - currentTime);
if (time <= 0) {
// 等待时间小于等于0,不等待申请锁并返回
acquireFailed(threadId);
return false;
}
}
} finally {
// 无论最终获取锁是否成功,都需要取消订阅解锁消息,防止死锁发生。
unsubscribe(subscribeFuture, threadId);
}
}
В приведенной выше логике основного кода мы фокусируемся на tryAcquire (длительное время аренды, блок TimeUnit), и логика блокировки вызова в основном находится в этой логике кода.
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
нажмите, чтобы посмотретьget(tryAcquireAsync(leaseTime, unit, threadId))
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
...省略部分逻辑...
}
ryAcquire(длинное время аренды, блок TimeUnit) выполняет соответствующую логику обработки переадресации только для различных параметров аренды.
Метод trylock без аргументов вызывается напрямуюget(tryLockInnerAsync(Thread.currentThread().getId()));
Давайте взглянем на ядро tryLockInnerAsyn, которое возвращает объект будущего для повышения пропускной способности системы за счет асинхронной обработки ввода-вывода.
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// 检查key是否已被占用,如果没有则设置超时时间及唯一标识,初始化value=1
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 锁重入的情况,判断锁的key field,一致的话,value加1
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 返回剩余的过期时间
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
Вот объяснение конкретных параметров этого заблокированного скрипта Lua:
- KEYS[1] : ключ, который необходимо заблокировать, здесь он должен быть строкового типа.
- ARGV[1] : Тайм-аут блокировки для предотвращения взаимоблокировки.
- ARGV[2] : уникальный идентификатор блокировки, который представляет собой только что введенный идентификатор (UUID.randomUUID()) + ":" + threadId.
При выполнении этого Lua-скрипта он возвращает пустое значение, указывая на то, что блокировка была получена; если он возвращает длинное значение (возвращаемое значение команды pttl), это указывает на то, что блокировка была занята. снаружи может сделать некоторые выводы и корректировки времени ожидания логики.
tryLock(длинное время ожидания, длинное время аренды, блок TimeUnit) Метод приложения блокировки с параметром LeezeTime автоматически снимает блокировку в соответствии с временем аренды.
Для случаев, когда нет параметра арендного времени, например, tryLock() или tryLock(long waitTime, блок TimeUnit) и lock() всегда будут удерживать блокировку.
разблокироватьлогика разблокировки
Основная логика разблокировки также реализована через сценарии Lua.Видно, что Redisson также использует сценарии для обеспечения атомарности блокировки и разблокировки, что согласуется с нашим объяснением в начале статьи.
Давайте взглянем на основную логику метода unlock().
@Override
public void unlock() {
// 解锁核心逻辑
Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
// 解锁返回空,抛出异常
if (opStatus == null) {
throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}
if (opStatus) {
// 解锁成功之后取消更新锁expire的时间的任务
cancelExpirationRenewal();
}
}
При успешной разблокировке вызывается cancelExpirationRenewal() для снятия задачи обновления времени истечения блокировки, то есть блокировки больше не существует, и продлевать время истечения блокировки нет необходимости. Просто посмотрите на его реализацию кода:
void cancelExpirationRenewal() {
Timeout task = expirationRenewalMap.remove(getEntryName());
if (task != null) {
task.cancel();
}
}
Введите метод unlockInnerAsync.
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 如果锁的key已经不存在,表明锁已经被解锁,直接发布redis消息
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
// key和field不匹配,说明当前的客户端线程并没有持有锁,不能进行主动解锁操作。
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// 将value减1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
// 如果counter>0表明锁进行了重入,不能删除key,也就是不进行解锁操作
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
// 否则删除key并发布解锁消息进行解锁
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
Видно, что разблокировка выполняется Lua-скриптом, поэтому разберем конкретное значение этого скрипта.
- KEYS[1] : ключ, который необходимо заблокировать, здесь он должен быть строкового типа.
- KEYS[2] : Имя канала сообщения redis, распределенная блокировка соответствует уникальному имени канала: "redisson_lock__channel__{" + getName() + "}"
- ARGV[1] : тело сообщения Reids, здесь требуется только один байтовый знак. Основной признак заключается в том, что ключ redis был разблокирован. В сочетании с подпиской на redis он может разбудить другие клиентские потоки, которые подписываются на разблокировку сообщений для подать заявку на замок.
- ARGV[2] : Тайм-аут блокировки для предотвращения взаимоблокировки.
- ARGV[3] : уникальный идентификатор блокировки, который представляет собой только что введенный идентификатор (UUID.randomUUID()) + ":" + threadId.
Комментарии к коду должны ясно понимать основной контекст разблокировки.
В качестве дополнительного упоминания мы можем видеть, что команда публикации используется в сценарии разблокировки lua, который работает следующим образом:
Публикуя сообщение о разблокировке на единственном канале блокировки, можно сократить время ожидания или бездействия других распределенных узлов, а также повысить эффективность блокировки в целом.
Мы смотрим, как Redisson обрабатывает сообщение о разблокировке.Содержание сообщения здесь: unlockMessage = 0L. Это соответствует содержимому публикации в методе разблокировки.
public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {
// 解锁消息
public static final Long UNLOCK_MESSAGE = 0L;
public static final Long READ_UNLOCK_MESSAGE = 1L;
...省略部分逻辑...
@Override
protected void onMessage(RedissonLockEntry value, Long message) {
// 如果订阅的消息为解锁消息,UNLOCK_MESSAGE = 0L
if (message.equals(UNLOCK_MESSAGE)) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute != null) {
runnableToExecute.run();
}
// 释放一个许可,并唤醒等待entry.
value.getLatch().release();
}
......
}
}
метод блокировки
В дополнение к методу tryLock, который может получать блокировки, Redisson также предоставляет метод блокировки для прямого получения блокировок.Давайте посмотрим, как он выполняет операции получения блокировок.
@Override
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
Взгляните на конкретную логику lockInterruptably.
@Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}
Нажмите, чтобы увидеть перегрузку lockInterruptably (длительное время аренды, единица измерения TimeUnit).
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
// 尝试获取锁
Long ttl = tryAcquire(leaseTime, unit, threadId);
// 锁获取成功
if (ttl == null) {
return;
}
// 通过异步方式订阅Redis的channel,阻塞方式获取订阅结果
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
// 通过循环判断,直到锁获取成功,经典写法。
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// 锁获取成功,跳出循环
if (ttl == null) {
break;
}
// 如果剩余时间 TTL 大于0,从Entry的信号量获取一个许可(除非发生中断或者一直不存在可用的许可)
// 否则就在wait time时间范围内等待可以通过的信号量
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
// 无论最终获取锁是否成功,都需要取消订阅解锁消息,防止死锁发生。
unsubscribe(future, threadId);
}
}
Эта логика кажется вам знакомой?Она очень похожа на логику tryLock, о которой мы упоминали выше. Конкретная логика написана четко в комментариях и повторяться не буду.
Это анализ исходного кода реентерабельной блокировки Redisson плюс базовая логика разблокировки, я считаю, что это поможет вам с умом.
Суммировать
В этой статье мы начнем с обзора распределенных блокировок и проведем более полный анализ принципа реализации распределенных блокировок в Redis. И сосредоточьтесь на подробном объяснении реализации распределенной блокировки Redisson, начиная с примера вызова автором библиотеки классов инкапсуляции Redisson и углубленного анализа исходного кода реентерабельной блокировки Redisson. После этой серии исследований я просто объяснил механизм реализации распределенных блокировок Redis/Redisson.
Более распределённая реализация блокировки и анализ исходного кода будут выпущены одна за другой, пожалуйста, подождите и посмотрите.
Ссылка на ссылку
Описание официального документа Redis по RedLock
Инструкции для Lua-скриптов в официальной документации Redis