предисловие
Недавно, когда Сяобай работал над системной функцией, он обнаружил, что есть метод, который необходимо синхронизировать.Однако развертывание проекта в производственной среде кластеризовано несколькими котами, и простое использование синхронизированной блокировки для одного и того же процесса JVM.Реализуется многопоточная синхронизация, и цель единой блокировки не может быть достигнута для синхронизации между процессами. Итак, Xiaobai подумал о распределенных замках. Некоторое время назад я видел интересный мультфильм, в котором упоминалось, что первоначальное намерение Zookeeper — использовать временные последовательные узлы для простой реализации распределенных блокировок, поэтому я изучил использование zk для реализации распределенных блокировок. В этой статье рассматриваются только основные характеристики zk и использование java для реализации простой распределенной блокировки.Если есть ошибка, добро пожаловать в кирпичи.Кроме того, если вам это не нравится, не брызгайте.
гипотетический фон
Предположим, что 2 кота (t1 и t2) развернуты в производственной среде системы Xiaobai, и в то же время запросы пользователей A и B обрабатываются t1 и t2 соответственно, а запросы пользователей A и B должны вызывать метод m для выполнения Для связанной обработки (обработки общих данных), чтобы обеспечить точность данных, Сяобай надеется, что только один поток может выполнить метод m в определенный момент времени, то есть когда есть выполняющийся поток m в t1, другие потоки в t1 и t2 не могут выполнить m до тех пор, пока этот поток не завершит вызов m.
План мышления
Как добиться синхронизации в автономной среде? Это может быть реализовано с помощью synchronized или ReentrantLock. Принцип заключается в том, что есть переменная флага блокировки. Каждый раз, когда поток хочет выполнить код синхронизации, сначала проверьте, не занят ли флаг другими потоками. Если да, блокируйте и ждите другие потоки, чтобы снять блокировку, если нет, то Execute после установки флага (здесь просто краткое описание, конкретный принцип обширен и глубок).
Почему он не может работать через процессы? Потому что в рамках одного и того же процесса к блокировке могут получить доступ все потоки этого процесса, но не могут получить доступ потоки других процессов. Хорошо, тогда, если вы предоставите флаг блокировки, видимый для всех потоков в процессе, проблема будет решена. Таким образом, zookeeper может выступать в качестве стороннего процесса и открывать права доступа к процессам, которыми необходимо управлять.Все коды, которые необходимо синхронизировать между процессами, должны приходить в мой большой zk, чтобы увидеть, могут ли они быть выполнены, прежде чем они будут казнен.
1. Задайте еще несколько вопросов, прежде чем начать
Почему zookeeper может реализовывать распределенные блокировки?
Есть потоки, выполняющие метод m в нескольких процессах одновременно, и есть только одна блокировка. Если вы получите блокировку для выполнения, я должен быть заблокирован. Тогда кто разбудит меня после того, как вы ее выполните? Вы не знаете, что я заблокирован, поэтому вы не можете сказать мне: «Эй, Сяобай, у меня заканчивается, можешь использовать». Все, что вы можете сделать, это установить флаг блокировки, когда вам это нужно, и отменить установленный флаг, когда закончите. Я должен проявлять инициативу, чтобы время от времени просматривать его, когда он заблокирован, но это всегда немного хлопотно, и лучше, чтобы кто-нибудь уведомил меня, что его можно выполнить. Zookeeper предоставляет функцию уведомления о событиях для слушателей своих собственных узлов, похоже ли это на отправку древесного угля в снег?
Что такое узел?Узлы — это базовая структура хранения данных в zookeeper.Все в zk является узлом, так же как все в java является объектом. Модель данных zk представляет собой древовидную структуру, основанную на множестве узлов, но zk предусматривает, что правило ссылки каждого узлассылка на путь. Каждый узел состоит из четырех частей: ссылка на дочерний узел, данные хранилища, права доступа и метаданные узла.
Существуют ли типы узлов в zk?имеют. В zk предусмотрено четыре типа узлов, различные типы узлов и их отличия заключаются в следующем:
- Постоянный узел (PERSISTENT): после создания узла он всегда будет существовать до тех пор, пока не будет выполнена операция удаления для активной очистки узла.
- Постоянный узел последовательности (PERSISTENT_SEQUENTIAL): сохраняет характеристики постоянных узлов.Дополнительная функция заключается в том, что каждый узел будет поддерживать последовательность для своих дочерних узлов первого уровня, записывать порядок, в котором создается каждый дочерний узел, и ZK автоматически назначит имя узла к заданному узлу. Добавьте числовой суффикс (самоувеличивающийся) в качестве имени нового узла.
- Временный узел (EPHEMERAL): В отличие от постоянных узлов, жизненный цикл временных узлов привязан к сеансу клиента, и, конечно же, его также можно активно удалить.
- Временные последовательные узлы (EPHEMERAL_SEQUENTIAL): сохраняют характеристики временных узлов и дополнительные характеристики, такие как характеристики постоянных последовательных узлов.
Как управлять узлом?Добавление, удаление и изменение узлов создаются\удаляются\setData\getData, существует для определения того, существует ли узел, а getChildren получает ссылки на все дочерние узлы.
Слушатель узла упоминался выше.Мы можем установить, будет ли текущий поток прослушивать запрошенный узел при запросе узла zk. getData, getChildren и exists — все операции запроса на узлах.Эти методы имеют логический параметр наблюдения, чтобы указать, следует ли отслеживать узел. Как только поток прослушивает узел, создание (создание нового дочернего узла под узлом), установка данных, удаление (удаление самого узла или удаление одного из его дочерних узлов) на этом узле вызовет zk для уведомления монитора узла нить. Тем не менее, следует отметить, что мониторинг установки узла потоком является одноразовым, то есть после того, как zk уведомит поток мониторинга, поток необходимо изменить, чтобы снова установить узел мониторинга, иначе узел будет не получать уведомления снова, если узел снова будет изменен.
Zookeeper имеет базовые условия для реализации распределенных блокировок: совместное использование несколькими процессами, может хранить информацию о блокировках и иметь механизм активного уведомления.
Как использовать zookeeper для реализации распределенной блокировки?
Распределенные блокировки также являются блокировками, ничего особенного.Им также нужно имя, чтобы сообщить другим, какой ресурс синхронизации они управляют, и идентификатор, чтобы сообщить другим, простаивают они или используются. В zk необходимо создать специальный узел блокировки, а затем различные узлы блокировки используются в качестве дочерних узлов узла для облегчения управления, а имя узла используется для указания управляемых им ресурсов синхронизации. Что с идентификатором замка?
Вариант 1: Использовать область хранения данных в узле Размер хранилища данных узла в zk не может превышать 1M, но достаточно для хранения только одного идентификатора. Когда поток получает блокировку, он сначала проверяет, является ли идентификатор идентификатором без блокировки.Если его можно изменить на идентификатор занятия, он будет восстановлен до идентификатора без блокировки после использования.
Вариант 2: использовать дочерние узлы. Всякий раз, когда поток запрашивает блокировку, дочерний узел создается под узлом блокировки. Тип дочернего узла должен поддерживать порядок, а самовозрастающие порядковые номера дочерних узлов сортируются. По умолчанию всегда наименьший Поток, соответствующий дочернему узлу, получает блокировку, и соответствующий дочерний узел может быть удален при снятии блокировки.
тупиковый риск
Оба решения на самом деле осуществимы, но вы должны избегать взаимоблокировок при использовании блокировок. С первой схемой проблем вроде бы нет, при ее использовании устанавливайте флаг, а при ее использовании сбрасывайте флаг, однако если в цепочке удерживающей блокировку произойдет авария, то код снятия блокировки выполниться не сможет, т.к. блокировка не может быть снята, и другие потоки будут продолжать ждать блокировки. , соответствующий код синхронизации не может быть выполнен. Второе решение также имеет эту проблему, но второе решение может использовать узел временной последовательности zk для решения этой проблемы.Пока поток ненормальный и программа прервана, соединение с zk будет потеряно, и zk автоматически обнаружить, что ссылка отключена.Удалить временный узел, созданный ссылкой, так что даже если программа потока, занимающая блокировку, потерпит аварию, цель обеспечения нормального снятия блокировки может быть достигнута.
Что делать, если zk зависает? Грустно, если zk зависнет, то тут ничего не поделаешь, потому что потоки нельзя линковать к zk, не говоря уже о получении блокировок и выполнении синхронного кода. Однако при обычном развертывании, чтобы обеспечить высокую доступность zk, несколько zk будут использоваться для развертывания в качестве кластера.В кластере есть один главный и несколько подчиненных.Как только главный zk умирает, он будет заменен новый мастер zk через механизм выборов. Что делать, если завис кластер zk? Извините, кластер zk будет зависать только в том случае, если все zk зависнут одновременно, вероятность очень мала.
Во-вторых, начните это делать.
хотеть что-либо
- Требуется объект блокировки, и zk нужно подключать каждый раз при создании этого объекта блокировки (операция подключения также может быть размещена при блокировке);
- Объект блокировки должен предоставлять метод блокировки;
- Объект блокировки должен предоставить метод для снятия блокировки;
- Объект блокировки должен отслеживать узел zk и предоставлять метод обратного вызова для получения уведомлений zk.
Анализ реализации
- В конструкторе создайте zk-соединение и создайте корневой узел блокировки. Связанные API следующие:
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
Создайте zk-соединение. Конструктор требует передачи трех параметров: ip: порт (строка), время ожидания сеанса и прослушиватель для этого соединения.
public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
Создайте узлы. Параметры: путь узла, данные узла, политика разрешений, тип узла. -
замок, сначала нужно быть под корневым узлом блокировкиСоздать временный узел последовательности(Правила именования узлов унифицированы, а самоувеличивающийся порядковый номер объединяется с помощью zk), затем получите все дочерние узлы под корневым узлом, отсортируйте узлы в соответствии с самоувеличивающимся порядковым номером и оцените, является ли наименьший узел — это узел, созданный этой блокировкой, и если да, то добавить Если блокировка успешна, если нет, заблокировать текущий поток и дождаться снятия блокировки (можно использовать заблокированный поток). Соответствующие API следующие:
public List<String> getChildren(String path, boolean watch)
Получить все дочерние узлы узла. Параметры: путь к узлу, следует ли отслеживать узел -
разблокировать замокчас,Удалить дочерние узлы, созданные потоком, закрывая при этом соединение zk. Соответствующие API следующие:
public void delete(String path, int version)
Удалить указанный узел. Параметры: путь узла, номер версии данных
public synchronized void close()
отключить зк линк -
прослушивающий узел. Прежде всего, нам нужно указать, какой узел отслеживать.Мы можем отслеживать корневой узел блокировки, чтобы всякий раз, когда поток снимает блокировку для удаления соответствующего дочернего узла, zk уведомлял слушающий поток о снятии блокировки. В настоящее время нам нужно только получить все корневые узлы.Дочерний узел может определить, является ли соответствующий узел наименьшим в соответствии с самовозрастающим порядковым номером, чтобы узнать, может ли он получить блокировку. Но описанный выше подход явно не очень хорош: пока дочерняя нода будет удалена, zk будет повторно уведомлять все потоки, ожидающие блокировки. Получив уведомление, поток, который не может получить блокировку, обнаруживает, что ему все еще нужно ждать, и ему приходится сбрасывать монитор и снова ждать. Так как мы будем использовать временноеаккуратныйУзел, характеристики этого типа узла упорядочены, тогда вы можете отслеживать только предыдущий узел, то есть узел, ожидающий удаления, чтобы гарантировать, что при получении уведомления соответствующий дочерний узел будет наименьшим, и замок можно получить. При реализации распределенных блокировок, если поток не может сразу получить блокировку при блокировке, он будет заблокирован определенным образом.получить уведомлениекогдазамок можно получить, то соответствующая операция должна бытьвозобновить обсуждениеисполнение,разблокировать.
zk предоставляет интерфейс Watcher, и объект блокировки должен реализовать этот интерфейс, если ему необходимо отслеживать предыдущий узел в zk. Интерфейс Watcher содержит интерфейс Event, который инкапсулирует тип события и тип соединения, а также предоставляет единственный метод, который необходимо реализовать.
void process(WatchedEvent var1)
Этот метод является методом обратного вызова, используемым для получения уведомлений zk. Параметр — это событие, которое происходит на узле прослушивателя. Когда узел, отслеживаемый слушателем, изменяется, zk уведомляет слушателя, и в то же время выполняется метод, а параметром является информация, сообщаемая zk.
открытый код
Хотя это простая реализация распределенной блокировки, код немного длиннее. Друзьям, которые разбираются в реализации распределенных блокировок с нуля, как Xiaobai, рекомендуется сначала проанализировать большие шаги выше и вкратце подумать о конкретной реализации внутри каждого метода, а затем посмотреть код, который будет более впечатляющим и более простым для понимания. . Если у вас есть другие идеи, пожалуйста, оставьте сообщение для обсуждения. В методе оценки блокировки в коде строка-разделитель используется для различения блокировок каждого ресурса. В проекте есть критические ресурсы A и B, поэтому снята блокировка управления A или нет, не имеет ничего общего с потоком, удерживающим блокировку управления B. Конечно, для каждого типа блокировки также может быть установлен независимый корневой узел.
public class ZooKeeperLock implements Watcher {
private ZooKeeper zk = null;
private String rootLockNode; // 锁的根节点
private String lockName; // 竞争资源,用来生成子节点名称
private String currentLock; // 当前锁
private String waitLock; // 等待的锁(前一个锁)
private CountDownLatch countDownLatch; // 计数器(用来在加锁失败时阻塞加锁线程)
private int sessionTimeout = 30000; // 超时时间
// 1. 构造器中创建ZK链接,创建锁的根节点
public ZooKeeperLock(String zkAddress, String rootLockNode, String lockName) {
this.rootLockNode = rootLockNode;
this.lockName = lockName;
try {
// 创建连接,zkAddress格式为:IP:PORT
zk = new ZooKeeper(zkAddress,this.sessionTimeout,this);
// 检测锁的根节点是否存在,不存在则创建
Stat stat = zk.exists(rootLockNode,false);
if (null == stat) {
zk.create(rootLockNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
// 2. 加锁方法,先尝试加锁,不能加锁则等待上一个锁的释放
public boolean lock() {
if (this.tryLock()) {
System.out.println("线程【" + Thread.currentThread().getName() + "】加锁(" + this.currentLock + ")成功!");
return true;
} else {
return waitOtherLock(this.waitLock, this.sessionTimeout);
}
}
public boolean tryLock() {
// 分隔符
String split = "_lock_";
if (this.lockName.contains("_lock_")) {
throw new RuntimeException("lockName can't contains '_lock_' ");
}
try {
// 创建锁节点(临时有序节点)
this.currentLock = zk.create(this.rootLockNode + "/" + this.lockName + split, new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("线程【" + Thread.currentThread().getName()
+ "】创建锁节点(" + this.currentLock + ")成功,开始竞争...");
// 取所有子节点
List<String> nodes = zk.getChildren(this.rootLockNode, false);
// 取所有竞争lockName的锁
List<String> lockNodes = new ArrayList<String>();
for (String nodeName : nodes) {
if (nodeName.split(split)[0].equals(this.lockName)) {
lockNodes.add(nodeName);
}
}
Collections.sort(lockNodes);
// 取最小节点与当前锁节点比对加锁
String currentLockPath = this.rootLockNode + "/" + lockNodes.get(0);
if (this.currentLock.equals(currentLockPath)) {
return true;
}
// 加锁失败,设置前一节点为等待锁节点
String currentLockNode = this.currentLock.substring(this.currentLock.lastIndexOf("/") + 1);
int preNodeIndex = Collections.binarySearch(lockNodes, currentLockNode) - 1;
this.waitLock = lockNodes.get(preNodeIndex);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
private boolean waitOtherLock(String waitLock, int sessionTimeout) {
boolean islock = false;
try {
// 监听等待锁节点
String waitLockNode = this.rootLockNode + "/" + waitLock;
Stat stat = zk.exists(waitLockNode,true);
if (null != stat) {
System.out.println("线程【" + Thread.currentThread().getName()
+ "】锁(" + this.currentLock + ")加锁失败,等待锁(" + waitLockNode + ")释放...");
// 设置计数器,使用计数器阻塞线程
this.countDownLatch = new CountDownLatch(1);
islock = this.countDownLatch.await(sessionTimeout,TimeUnit.MILLISECONDS);
this.countDownLatch = null;
if (islock) {
System.out.println("线程【" + Thread.currentThread().getName() + "】锁("
+ this.currentLock + ")加锁成功,锁(" + waitLockNode + ")已经释放");
} else {
System.out.println("线程【" + Thread.currentThread().getName() + "】锁("
+ this.currentLock + ")加锁失败...");
}
} else {
islock = true;
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return islock;
}
// 3. 释放锁
public void unlock() throws InterruptedException {
try {
Stat stat = zk.exists(this.currentLock,false);
if (null != stat) {
System.out.println("线程【" + Thread.currentThread().getName() + "】释放锁 " + this.currentLock);
zk.delete(this.currentLock, -1);
this.currentLock = null;
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
} finally {
zk.close();
}
}
// 4. 监听器回调
@Override
public void process(WatchedEvent watchedEvent) {
if (null != this.countDownLatch && watchedEvent.getType() == Event.EventType.NodeDeleted) {
// 计数器减一,恢复线程操作
this.countDownLatch.countDown();
}
}
}
Тестовый класс выглядит следующим образом:
public class Test {
public static void doSomething() {
System.out.println("线程【" + Thread.currentThread().getName() + "】正在运行...");
}
public static void main(String[] args) {
Runnable runnable = new Runnable() {
public void run() {
ZooKeeperLock lock = null;
lock = new ZooKeeperLock("10.150.27.51:2181","/locks", "test1");
if (lock.lock()) {
doSomething();
try {
Thread.sleep(1000);
lock.unlock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
for (int i = 0; i < 5; i++) {
Thread t = new Thread(runnable);
t.start();
}
}
}
Здесь запускается 5 потоков для проверки, и вывод выглядит следующим образом. Следует отметить, что порядок создания дочерних узлов должен быть от малого к большему, но случайность порядка создания, показанная в следующих выходных результатах, вызвана тем фактом, что узел создания и оператор вывода не являются атомарными операциями. Основное внимание уделяется получению и освобождению блокировок.Как видно из вывода, каждый поток может выполняться только после удаления предыдущего узла. ок, реализована простая распределенная блокировка на основе zk.
线程【Thread-3】创建锁节点(/locks/test1_lock_0000000238)成功,开始竞争...
线程【Thread-2】创建锁节点(/locks/test1_lock_0000000237)成功,开始竞争...
线程【Thread-1】创建锁节点(/locks/test1_lock_0000000236)成功,开始竞争...
线程【Thread-0】创建锁节点(/locks/test1_lock_0000000240)成功,开始竞争...
线程【Thread-4】创建锁节点(/locks/test1_lock_0000000239)成功,开始竞争...
线程【Thread-1】加锁(/locks/test1_lock_0000000236)成功!
线程【Thread-1】正在运行...
线程【Thread-3】锁(/locks/test1_lock_0000000238)加锁失败,等待锁(/locks/test1_lock_0000000237)释放...
线程【Thread-2】锁(/locks/test1_lock_0000000237)加锁失败,等待锁(/locks/test1_lock_0000000236)释放...
线程【Thread-0】锁(/locks/test1_lock_0000000240)加锁失败,等待锁(/locks/test1_lock_0000000239)释放...
线程【Thread-4】锁(/locks/test1_lock_0000000239)加锁失败,等待锁(/locks/test1_lock_0000000238)释放...
线程【Thread-1】释放锁 /locks/test1_lock_0000000236
线程【Thread-2】锁(/locks/test1_lock_0000000237)加锁成功,锁(/locks/test1_lock_0000000236)已经释放
线程【Thread-2】正在运行...
线程【Thread-2】释放锁 /locks/test1_lock_0000000237
线程【Thread-3】锁(/locks/test1_lock_0000000238)加锁成功,锁(/locks/test1_lock_0000000237)已经释放
线程【Thread-3】正在运行...
线程【Thread-3】释放锁 /locks/test1_lock_0000000238
线程【Thread-4】锁(/locks/test1_lock_0000000239)加锁成功,锁(/locks/test1_lock_0000000238)已经释放
线程【Thread-4】正在运行...
线程【Thread-4】释放锁 /locks/test1_lock_0000000239
线程【Thread-0】锁(/locks/test1_lock_0000000240)加锁成功,锁(/locks/test1_lock_0000000239)已经释放
线程【Thread-0】正在运行...
线程【Thread-0】释放锁 /locks/test1_lock_0000000240
3. Колеса чужого производства
Говорят, что zookeeper так долго процветал, и не так много влиятельных людей, чтобы открыть исходный код некоторых полезных инструментов, и нужно ли так много работать, чтобы написать реализацию распределенных замков? Да-да, вышеупомянутый Xiaobai просто пытается сделать простую реализацию, чтобы углубить свое понимание реализации распределенных блокировок zk. Отличный парень по имени Джордан Циммерман предоставил куратора, чтобы лучше управлять зоопарком.
Распределенная блокировка куратора
Куратор предоставляет четыре распределенных замка, а именно:
- InterProcessMutex: распределенная реентерабельная эксклюзивная блокировка
- InterProcessSemaphoreMutex: распределенная эксклюзивная блокировка
- InterProcessReadWriteLock: распределенная блокировка чтения-записи.
- InterProcessMultiLock: контейнер для управления несколькими блокировками как единым объектом.
пом зависимости:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
Здесь используется InterProcessMutex, то есть распределенная реентерабельная монопольная блокировка, которая используется следующим образом:
// 设置重试策略,创建zk客户端
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
CuratorFramework client = CuratorFrameworkFactory.newClient("10.150.27.51:2181",retryPolicy);
// 启动客户端
client.start();
// 创建分布式可重入排他锁,监听客户端为client,锁的根节点为/locks
InterProcessMutex mutex = new InterProcessMutex(client,"/locks");
try {
// 加锁
if (mutex.acquire(3,TimeUnit.SECONDS)) {
// TODO-同步操作
//释放锁
mutex.release();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
client.close();
}
Интерпретация исходного кода InterProcessMutex
InterProcessMutex
Моддеров много, поэтому исходный код моддера здесь показываться не будет, рекомендуем заинтересованным друзьям посмотреть самим.InterProcessMutex
EстьConcurrentMap
ТипthreadData
Атрибут, этот атрибут будет использовать объект потока в качестве ключа и поток, соответствующийLcokData
Объект представляет собой значение, которое записывает информацию о каждой блокировке. в новомInterProcessMutex
например, его конструктор в основном дляthreadData
провестиMap
Инициализируйте, проверьте правильность корневого узла блокировки и используйтеbasePath
записи свойств, в дополнение к созданию экземпляраLockInternals
объект по свойствамinternals
Цитировать,LockInternals
даInterProcessMutex
Заблокированное ядро.
замок
// InterProcessMutex.class
public void acquire() throws Exception {
if (!this.internalLock(-1L, (TimeUnit)null)) {
throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);
}
}
public boolean acquire(long time, TimeUnit unit) throws Exception {
return this.internalLock(time, unit);
}
private boolean internalLock(long time, TimeUnit unit) throws Exception {
Thread currentThread = Thread.currentThread();
InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
if (lockData != null) {
// 锁的可重入性
lockData.lockCount.incrementAndGet();
return true;
} else {
// 加锁并返回锁节点
String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes());
if (lockPath != null) {
InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath);
this.threadData.put(currentThread, newLockData);
return true;
} else {
return false;
}
}
}
Блокировка предоставляет два интерфейса, соответственно не устанавливая таймаут и устанавливая таймаут. Если тайм-аут не установлен, поток будет заблокирован в ожидании блокировки, пока блокировка не будет получена. Независимо от того, какой интерфейс блокировки вызывается, он называетсяinternalLock()
метод. Код в этом методе отражает повторный вход блокировки.InterProcessMutex
непосредственно изthreadData
получить его в соответствии с текущим потокомLockData
, Если LockData не пустой, это означает, что текущий поток владеет этим, и он возвращает true напрямую, добавляя единицу к количеству блокировок. Если пусто, пройтиinternals
атрибутattemptLock()
для борьбы за блокировки, этот метод возвращает путь к узлу, соответствующему блокировке. Если путь не пуст, это означает, что текущий поток получил блокировку, а затем создает соответствующую блокировку для текущего потока.LcokData
и записано вthreadData
середина.
блокировка конкуренции
// LockInternals.class
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
long startMillis = System.currentTimeMillis();
Long millisToWait = unit != null ? unit.toMillis(time) : null;
byte[] localLockNodeBytes = this.revocable.get() != null ? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
while(!isDone) {
isDone = true;
try {
// 创建锁节点
ourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes);
// 竞争锁
hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath);
} catch (NoNodeException var14) {
if (!this.client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++,
System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
throw var14;
}
isDone = false;
}
}
return hasTheLock ? ourPath : null;
}
Взгляните на этот метод, много определений переменных, все сначала игнорируются. Окончательное возвращаемое значение определяется выражениемhasTheLock
решить, вернуть, если верноourPath
.ourPath
Инициализируется нулем послеthis.driver.createsTheLock(this.client, this.path, localLockNodeBytes)
Назначение, щелкните этот метод, чтобы увидеть метод создания узла блокировки по умолчанию для класса, управляемого блокировкой.Видно, что здесь создается только узел блокировки. посмотри сноваhasTheLock
,заinternalLockLoop()
Возвращаемое значение метода, только когда метод возвращает true,attemptLock()
Путь узла блокировки будет возвращен, и блокировка будет успешной. Все в порядке, конкурирующая реализация блокировки даетсяinternalLockLoop
провести. Перехват исключения в приведенном выше цикле заключается в повторной попытке в соответствии со стратегией повторных попыток клиента.
// LockInternals.class
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
boolean haveTheLock = false;
boolean doDelete = false;
try {
if (this.revocable.get() != null) {
((BackgroundPathable)this.client.getData().usingWatcher(this.revocableWatcher)).forPath(ourPath);
}
while(this.client.getState() == CuratorFrameworkState.STARTED && !haveTheLock) {
// 获取所有子节点
List<String> children = this.getSortedChildren();
// 获取当前锁节点
String sequenceNodeName = ourPath.substring(this.basePath.length() + 1);
// 使用锁驱动加锁
PredicateResults predicateResults = this.driver.getsTheLock(this.client, children,
sequenceNodeName, this.maxLeases);
if (predicateResults.getsTheLock()) {
haveTheLock = true;
} else {
// 阻塞等待上一个锁释放
String previousSequencePath = this.basePath + "/" + predicateResults.getPathToWatch();
synchronized(this) {
try {
((BackgroundPathable)this.client.getData().usingWatcher(this.watcher)).forPath(previousSequencePath);
if (millisToWait == null) {
// 未设置超时一直阻塞
this.wait();
} else {
millisToWait = millisToWait - (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
// 根据时间设置阻塞时间
if (millisToWait > 0L) {
this.wait(millisToWait);
} else {
// 已经超时,设置删除节点标识
doDelete = true;
break;
}
}
} catch (NoNodeException var19) {
;
}
}
}
}
} catch (Exception var21) {
ThreadUtils.checkInterrupted(var21);
doDelete = true;
throw var21;
} finally {
if (doDelete) {
// 删除已超时的锁节点
this.deleteOurPath(ourPath);
}
}
return haveTheLock;
}
Ну и еще целая куча кода. Или возьмите его первым, возвращаемое значениеhaveTheLock
, Boolean, посмотрите на имя, чтобы знать, что эта переменная представляет успех блокировки соревнования. Присвоение переменной происходит внутри цикла, хорошо, смотрите цикл. Сначала получаются все дочерние узлы и имя текущего узла, затем класс драйвера выполняет соревнование блокировок, и результат соревнования инкапсулируется вPredicateResults
класс, содержащий логический идентификатор результатаgetsTheLock
и путь узла прослушиванияpathToWatch
. Наконец, решается, блокировать ли поток, чтобы дождаться освобождения узла блокировки мониторинга в соответствии с результатом соревнования. Следует отметить, что блок здесь использует механизм ожидания() объекта и в то же время определяет, блокирует ли поток время или удаляет узел тайм-аута в зависимости от того, установлено ли время тайм-аута и истекло ли время ожидания. Но конкретной реализации блокировки блокировки здесь нет, есть только подробная реализация блокировки блокировки.Curator
Класс драйвера блокировки по умолчанию:StandardLockInternalsDriver
.
// StandardLockInternalsDriver.class
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName,
int maxLeases) throws Exception {
int ourIndex = children.indexOf(sequenceNodeName);
validateOurIndex(sequenceNodeName, ourIndex);
boolean getsTheLock = ourIndex < maxLeases;
String pathToWatch = getsTheLock ? null : (String)children.get(ourIndex - maxLeases);
return new PredicateResults(pathToWatch, getsTheLock);
}
Сначала получите индекс положения текущего узла во всех дочерних узлах, а затем проверьте индекс.Внутренняя реализация должна определить, меньше ли он 0. Если он установлен, создается исключение NoNodeException. Это точно не 0. Возможность окончательного получения блокировки зависит от того, равен ли индекс позиции 0, то есть является ли текущий узел наименьшим (maxLeases устанавливается равным 1 при инициализации LockInternals в конструкторе InterProcessMutex).
Суммировать
Эта статья основана на идее и реализации ZK для реализации распределенных блокировок и принципиальном анализе распределенных реентерабельных монопольных блокировок Curator. Лично для меня ключевыми являются следующие моменты:
- Используйте временные узлы, чтобы избежать взаимоблокировок, вызванных аномальными клиентскими программами;
- Используйте упорядоченные узлы для установки правил получения блокировки;
- Используйте механизм синхронизации потоков внутри процесса, чтобы обеспечить распределенное ожидание блокировки между процессами.
Ну, это должно быть все, если Xiaobai что-то упустил, я добавлю это позже.