Распределенная блокировка на основе Zookeeper

ZooKeeper

Эта статья требует всего 10 минут вашего времени.

В настоящее время популярны три схемы реализации распределенных блокировок, а именно схемы на основе базы данных, Redis и Zookeeper, по первым двум схемам в сети много информации для ознакомления, и в этой статье мы не будем их расширять. Давайте посмотрим, как реализовать распределенные блокировки с помощью Zookeeper.

Что такое зоозащитник?

Zookeeper (называемый в отрасли zk) — это централизованная служба, которая обеспечивает управление конфигурацией, распределенную совместную работу и именование.Эти функции являются очень низкоуровневыми и важными базовыми функциями в распределенных системах, но если вы реализуете эти функции самостоятельно и Достижение высокой пропускной способности и низкая задержка при сохранении согласованности и доступности на самом деле очень сложны. Поэтому zookeeper предоставляет эти функции, а разработчики строят свои собственные различные распределенные системы поверх zookeeper.

Хотя реализация zookeeper более сложна, абстракция модели, которую он предоставляет, очень проста. Zookeeper предоставляет многоуровневое пространство имен узлов (узлы называются znodes), каждый узел представлен путем, разделенным косой чертой (/), и каждый узел имеет родительский узел (кроме корневого узла), очень похожий на файловую систему. Например, /foo/doo представляет znode, чьим родителем является /foo, чьим родителем является /, а / является корневым узлом без родителя. В отличие от файловой системы, эти узлы могут устанавливать связанные данные, а в файловой системе только файловый узел может хранить данные, а узел каталога не может. Чтобы обеспечить высокую пропускную способность и низкую задержку, Zookeeper поддерживает эту древовидную структуру каталогов в памяти, что делает Zookeeper неспособным хранить большой объем данных, а верхний предел хранения данных для каждого узла составляет 1 МБ.

Чтобы обеспечить высокую доступность, zookeeper необходимо развернуть в виде кластера, чтобы, пока большинство машин в кластере были доступны (что может допускать определенные сбои машин), сам zookeeper оставался доступным. Когда клиент использует zookeeper, ему необходимо знать список машин кластера и использовать службу, устанавливая TCP-соединение с машиной в кластере.Клиент использует это TCP-соединение для отправки запросов, получения результатов, получения событий прослушивания и отправлять пакеты сердцебиения. Если соединение аварийно разорвано, клиент может подключиться к другому компьютеру.

Схема архитектуры выглядит следующим образом:

zk-framework

Запрос клиента на чтение может быть обработан любой машиной в кластере.Если у запроса на чтение есть прослушиватель, зарегистрированный на узле, прослушиватель также обрабатывается подключенной машиной zookeeper. Для запросов на запись эти запросы будут отправлены на другие компьютеры zookeeper одновременно, и запрос не будет возвращать успех, пока не будет достигнуто соглашение. Следовательно, по мере увеличения числа компьютеров кластера zookeeper пропускная способность запросов на чтение будет увеличиваться, но пропускная способность запросов на запись будет уменьшаться.

Порядок — очень важная функция в zookeeper.Все обновления упорядочены глобально, и каждое обновление имеет уникальную отметку времени, которая называется zxid (идентификатор транзакции Zookeeper). Запрос на чтение будет упорядочен только относительно обновления, то есть возвращаемый результат запроса на чтение будет иметь последний zxid зоопарка.

Как реализовать распределенную блокировку с помощью zookeeper?

Перед описанием алгоритма рассмотрим несколько интересных свойств узлов в zookeeper:

  1. Упорядоченные узлы: если в настоящее время существует родительский узел /lock, мы можем создать дочерние узлы под этим родительским узлом; zookeeper предоставляет дополнительную упорядоченную функцию, например, мы можем создавать дочерние узлы «/lock/node-» и указывает порядок, тогда zookeeper автоматически добавит целочисленный серийный номер в соответствии с текущим количеством дочерних узлов при создании дочерних узлов, то есть, если это первый созданный дочерний узел, то сгенерированный дочерний узел будет /lock/node-0000000000, следующий узел /lock/node-0000000001 и так далее.
  2. Временный узел: клиент может установить временный узел, который будет автоматически удален zookeeper после завершения сеанса или истечения времени сеанса.
  3. Мониторинг событий: при чтении данных мы можем одновременно установить мониторинг событий на узле.Когда данные или структура узла изменяются, zookeeper уведомит клиента. В настоящее время у zookeeper есть следующие четыре события: 1) создание узла, 2) удаление узла, 3) изменение данных узла, 4) изменение дочернего узла.

Ниже описан алгоритм использования zookeeper для реализации распределенных блокировок, предполагая, что корневым узлом пространства блокировки является /lock:

  1. Клиент подключается к zookeeper и создает его в /lockвременныйа такжеЗаказалДочерние узлы, дочерний узел, соответствующий первому клиенту, — /lock/lock-0000000000, второму — /lock/lock-0000000001 и так далее.
  2. Клиент получает список дочерних узлов в /lock и определяет, находится ли дочерний узел, созданный им самим, в текущем списке дочерних узлов.Минимальный серийный номерЕсли это так, считается, что блокировка получена, в противном случае он отслеживает сообщение об изменении дочернего узла /lock и повторяет этот шаг после получения уведомления об изменении дочернего узла, пока блокировка не будет получена;
  3. Выполнить бизнес-код;
  4. После завершения бизнес-процесса удалите соответствующий дочерний узел, чтобы снять блокировку.

Временный узел, созданный на шаге 1, может гарантировать снятие блокировки в случае сбоя.Рассмотрите следующий сценарий: если дочерний узел, созданный в настоящее время клиентом a, является узлом с наименьшим серийным номером, машина, на которой находится клиент, после получения блокировки клиент не удаляет дочерние узлы активно; если создается постоянный узел, блокировка никогда не будет снята, что приводит к тупиковой ситуации; поскольку создается временный узел, после того, как клиент уходит не работает, zookeeper не получает его по истечении определенного периода времени Пакет пульса клиента определяет, что сеанс недействителен, и удаляет временный узел, чтобы снять блокировку.

Кроме того, осторожные друзья могут подумать об атомарной проблеме получения списка дочерних узлов и настройки мониторинга на шаге 2. Рассмотрим следующий сценарий: соответствующий дочерний узел клиента a — это /lock/lock-0000000000, client b Соответствующий дочерний узел это /lock/lock-0000000001.Когда клиент b получает список дочерних узлов, он обнаруживает, что это не узел с наименьшим серийным номером, а клиент a завершает бизнес-процесс и удаляет дочерний узел /lock/lock-0000000000 перед установкой прослушивателя. Разве прослушиватель, установленный end b, не потерял это событие и не заставил его ждать вечно? Этой проблемы не существует. Поскольку операция и операция чтения установки прослушивателя в API, предоставляемом zookeeper,Атомное исполнениеТо есть прослушиватель устанавливается одновременно с чтением списка дочерних узлов, чтобы гарантировать, что события не будут потеряны.

Наконец, у этого алгоритма есть отличный момент оптимизации: если в данный момент 1000 узлов ожидают блокировки, если клиент, получивший блокировку, снимает блокировку, эти 1000 клиентов будут разбужены.Эта ситуация называется эффектом «стада». "; в этом стадном эффекте zookeeper должен уведомить 1000 клиентов, что заблокирует другие операции, и в лучшем случае он должен разбудить только клиента, соответствующего новому наименьшему узлу. Что я должен делать? При настройке прослушивателей событий каждый клиент должен устанавливать прослушиватели событий для дочерних узлов непосредственно перед ним, например, список дочерних узлов /lock/lock-0000000000, /lock/lock-0000000001, /lock/lock-0000000002, serial number Клиент с порядковым номером 1 прослушивает сообщение об удалении дочернего узла с порядковым номером 0, а клиент с порядковым номером 2 прослушивает сообщение об удалении дочернего узла с порядковым номером 1.

Таким образом, процесс скорректированного алгоритма распределенной блокировки выглядит следующим образом:

  1. Клиент подключается к zookeeper и создает его в /lockвременныйа такжеЗаказалДочерние узлы, дочерний узел, соответствующий первому клиенту, — /lock/lock-0000000000, второму — /lock/lock-0000000001 и так далее.
  2. Клиент получает список дочерних узлов в /lock и определяет, находится ли дочерний узел, созданный им самим, в текущем списке дочерних узлов.Минимальный серийный номерДочерний узел , если он есть, считается, что он получил блокировку, в противном случаеПрослушайте сообщение об удалении дочернего узла непосредственно перед ним, повторяйте этот шаг после получения уведомления об изменении дочернего узла, пока не будет получена блокировка;
  3. Выполнить бизнес-код;
  4. После завершения бизнес-процесса удалите соответствующий дочерний узел, чтобы снять блокировку.

Анализ исходного кода куратора

Хотя API, предоставляемый собственным клиентом zookeeper, очень прост, реализовать распределенную блокировку по-прежнему довольно проблематично... Мы можем использовать его напрямую.curatorРеализация распределенной блокировки zookeeper, предоставляемая этим проектом с открытым исходным кодом.

Нам просто нужно импортировать следующий пакет (на основе maven):

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.0</version>
</dependency>

Тогда вы можете использовать его! код показывает, как показано ниже:

public static void main(String[] args) throws Exception {
    //创建zookeeper的客户端
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    CuratorFramework client = CuratorFrameworkFactory.newClient("10.21.41.181:2181,10.21.42.47:2181,10.21.49.252:2181", retryPolicy);
    client.start();

    //创建分布式锁, 锁空间的根节点路径为/curator/lock
    InterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock");
    mutex.acquire();
    //获得了锁, 进行业务流程
    System.out.println("Enter mutex");
    //完成业务流程, 释放锁
    mutex.release();
    
    //关闭客户端
    client.close();
}

Вы можете видеть, что единственными ключевыми основными операциями являются mutex.acquire() и mutex.release(), что так удобно!

Давайте проанализируем реализацию получения блокировок в исходном коде. Способ получения следующий:

/*
 * 获取锁,当锁被占用时会阻塞等待,这个操作支持同线程的可重入(也就是重复获取锁),acquire的次数需要与release的次数相同。
 * @throws Exception ZK errors, connection interruptions
 */
@Override
public void acquire() throws Exception
{
    if ( !internalLock(-1, null) )
    {
        throw new IOException("Lost connection while trying to acquire lock: " + basePath);
    }
}

Здесь следует отметить одну вещь: когда возникает исключение при обмене данными с zookeeper, функция Acquisition напрямую выдает исключение, и пользователю необходимо выполнить стратегию повторных попыток. В коде вызывается InternalLock(-1, null), и параметр указывает, что блокировка постоянно заблокирована в ожидании, пока блокировка занята. Код внутренней блокировки выглядит следующим образом:

private boolean internalLock(long time, TimeUnit unit) throws Exception
{

    //这里处理同线程的可重入性,如果已经获得锁,那么只是在对应的数据结构中增加acquire的次数统计,直接返回成功
    Thread currentThread = Thread.currentThread();
    LockData lockData = threadData.get(currentThread);
    if ( lockData != null )
    {
        // re-entering
        lockData.lockCount.incrementAndGet();
        return true;
    }

    //这里才真正去zookeeper中获取锁
    String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
    if ( lockPath != null )
    {
        //获得锁之后,记录当前的线程获得锁的信息,在重入时只需在LockData中增加次数统计即可
        LockData newLockData = new LockData(currentThread, lockPath);
        threadData.put(currentThread, newLockData);
        return true;
    }

    //在阻塞返回时仍然获取不到锁,这里上下文的处理隐含的意思为zookeeper通信异常
    return false;
}

В код добавлены специальные комментарии без расширения. Взгляните на конкретную реализацию блокировки блокировок zookeeper:

String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
    //参数初始化,此处省略
    //...
   
    //自旋获取锁
    while ( !isDone )
    {
        isDone = true;

        try
        {
            //在锁空间下创建临时且有序的子节点
            ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
            //判断是否获得锁(子节点序号最小),获得锁则直接返回,否则阻塞等待前一个子节点删除通知
            hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
        }
        catch ( KeeperException.NoNodeException e )
        {
            //对于NoNodeException,代码中确保了只有发生session过期才会在这里抛出NoNodeException,因此这里根据重试策略进行重试
            if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
            {
                isDone = false;
            }
            else
            {
                throw e;
            }
        }
    }

    //如果获得锁则返回该子节点的路径
    if ( hasTheLock )
    {
        return ourPath;
    }

    return null;
}

В приведенном выше коде есть два основных шага:

  • driver.createsTheLock: создание временных и упорядоченных дочерних узлов, которые относительно просты в реализации и не расширяются, в основном сосредоточены на нескольких режимах узлов: 1) PERSISTENT (постоянный); 2) PERSISTENT_SEQUENTIAL (постоянный и упорядоченный); 3) EPHEMERAL (временный). ); 4) EPHEMERAL_SEQUENTIAL (временная и упорядоченная).
  • internalLockLoop: блоки, ожидающие получения блокировки.

Давайте посмотрим, как internalLockLoop оценивает блокировки и ожидание блокировки.Некоторый ненужный код здесь удален, и сохранен только основной процесс:

//自旋直至获得锁
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
    //获取所有的子节点列表,并且按序号从小到大排序
    List<String>        children = getSortedChildren();
    
    //根据序号判断当前子节点是否为最小子节点
    String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
    PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
    if ( predicateResults.getsTheLock() )
    {
        //如果为最小子节点则认为获得锁
        haveTheLock = true;
    }
    else
    {
        //否则获取前一个子节点
        String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

        //这里使用对象监视器做线程同步,当获取不到锁时监听前一个子节点删除消息并且进行wait(),当前一个子节点删除(也就是锁释放)时,回调会通过notifyAll唤醒此线程,此线程继续自旋判断是否获得锁
        synchronized(this)
        {
            try 
            {
                //这里使用getData()接口而不是checkExists()是因为,如果前一个子节点已经被删除了那么会抛出异常而且不会设置事件监听器,而checkExists虽然也可以获取到节点是否存在的信息但是同时设置了监听器,这个监听器其实永远不会触发,对于zookeeper来说属于资源泄露
                client.getData().usingWatcher(watcher).forPath(previousSequencePath);

                //如果设置了阻塞等待的时间
                if ( millisToWait != null )
                {
                    millisToWait -= (System.currentTimeMillis() - startMillis);
                    startMillis = System.currentTimeMillis();
                    if ( millisToWait <= 0 )
                    {
                        doDelete = true;    // 等待时间到达,删除对应的子节点
                        break;
                    }
                    
                    //等待相应的时间
                    wait(millisToWait);
                }
                else
                {
                   //永远等待
                    wait();
                }
            }
            catch ( KeeperException.NoNodeException e ) 
            {
                //上面使用getData来设置监听器时,如果前一个子节点已经被删除那么会抛出NoNodeException,只需要自旋一次即可,无需额外处理
            }
        }
    }
}

Конкретная логика показана в примечаниях и здесь повторяться не будет. Прослушиватель событий, установленный в коде, — это просто notifyAll, чтобы разбудить текущий поток для повторного определения, когда происходит обратный вызов события, что относительно просто и больше не расширяется.

над.