Реализация распределенной блокировки (2): Zookeeper

задняя часть алгоритм ZooKeeper Открытый исходный код

[TOC]

предисловие

Следуя вышеизложенному:Реализация распределенной блокировки (1): Redis, В этой статье мы используем Zookeeper для проектирования и реализации распределенных блокировок, а также изучаем исходный код распределенной блокировки клиентского инструмента с открытым исходным кодом Curator.

Реализация дизайна

1. Основной алгоритм

1.在某父节点下创建临时有序节点
2.判断创建的节点是否是当前父节点下所有子节点中序号最小的
3.是序号最小的成功获取锁,否则监听比自己小的那个节点,进行watch,当该节点被删除的时候通知当前节点,重新获取锁
4.解锁的时候删除当前节点

2. Ключевые моменты

Временный упорядоченный узел

实现Zookeeper分布式锁关键就在于其[临时有序节点]的特性,在Zookeeper中有四种节点
1.PERSISTENT 持久,若不手动删除就永久存在
2.PERSISTENT_SEQUENTIAL 持久有序节点,zookeeper会为节点编号(保证有序)
3.EPHEMERAL 临时,一个客户端会话断开后会自动删除
4.EPHEMERAL_SEQUENTIAL 临时有序节点,zookeeper会为节点编号(保证有序)

монитор

Zookeeper提供事件监听机制,通过对节点、节点数据、子节点都提供了监听,我们通过这种监听watcher机制实现锁的等待

3. Реализация кода

我们基于ZkClient这个客户端来实现,当然也可以用原生Zookeeper API,大致是一样的
坐标如下:
  <dependency>
        <groupId>com.101tec</groupId>
        <artifactId>zkclient</artifactId>
        <version>0.2</version>
    </dependency>

код показывает, как показано ниже:

public class MyDistributedLock {


    private ZkClient zkClient;
    private String name;
    private String currentLockPath;
    private CountDownLatch countDownLatch;

    private static final String PARENT_LOCK_PATH = "/distribute_lock";

    public MyDistributedLock(ZkClient zkClient, String name) {
        this.zkClient = zkClient;
        this.name = name;
    }

	//加锁
    public void lock() {
    	//判断父节点是否存在,不存在就创建
        if (!zkClient.exists(PARENT_LOCK_PATH)) {
            try {
            	//多个线程只会成功建立一次
                zkClient.createPersistent(PARENT_LOCK_PATH);
            } catch (Exception ignored) {
            }
        }
        //创建当前目录下的临时有序节点
        currentLockPath = zkClient.createEphemeralSequential(PARENT_LOCK_PATH + "/", System.currentTimeMillis());
        //校验是否最小节点
        checkMinNode(currentLockPath);
    }

	//解锁
    public void unlock() {
        System.out.println("delete : " + currentLockPath);
        zkClient.delete(currentLockPath);
    }


    private boolean checkMinNode(String lockPath) {
		//获取当前目录下所有子节点
        List<String> children = zkClient.getChildren(PARENT_LOCK_PATH);
        Collections.sort(children);
        int index = children.indexOf(lockPath.substring(PARENT_LOCK_PATH.length() + 1));
        if (index == 0) {
            System.out.println(name + ":success");
            if (countDownLatch != null) {
                countDownLatch.countDown();
            }
            return true;
        } else {
            String waitPath = PARENT_LOCK_PATH + "/" + children.get(index - 1);
            //等待前一个节点释放的监听
            waitForLock(waitPath);
            return false;
        }
    }


    private void waitForLock(String prev) {
        System.out.println(name + " current path :" + currentLockPath + ":fail add listener" + " wait path :" + prev);
        countDownLatch = new CountDownLatch(1);
        zkClient.subscribeDataChanges(prev, new IZkDataListener() {
            @Override
            public void handleDataChange(String s, Object o) throws Exception {

            }

            @Override
            public void handleDataDeleted(String s) throws Exception {
                System.out.println("prev node is done");
                checkMinNode(currentLockPath);
            }
        });
        if (!zkClient.exists(prev)) {
            return;
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        countDownLatch = null;
    }
}

замок

  1. zkClient.exists сначала определяет, существует ли родительский узел, и создает его, если он не существует. Zookeeper может гарантировать, что создание будет успешным только один раз.

  2. Создайте временный упорядоченный узел в zkClient.createEphemeralSequential в текущем каталоге, а затем оцените, имеет ли узел в текущем каталоге наименьший порядковый номер.Если это так, успешно получите блокировку, в противном случае возьмите узел меньше, чем он сам, и отслеживайте его.

  3. waitForLock ожидает узел меньшего размера, чем он сам, subscribeDataChanges отслеживает изменение узла и снова оценивает checkMinNode в handleDataDeleted.

  4. После прослушивания оцените, существует ли снова узел, потому что в процессе мониторинга возможно, что предыдущий малый узел снова снимет блокировку.Если предыдущий узел не существует, здесь ждать не нужно. реализуется с помощью countDownLatch.

разблокировать

Разблокировка заключается в удалении текущего узла через удаление zkClient.

прецедент

Проверьте процесс блокировки и разблокировки, запустив несколько потоков, чтобы убедиться, что все в порядке.

public class MyDistributedLockTest {


    public static void main(String[] args) {

        ZkClient zk = new ZkClient("127.0.0.1:2181", 5 * 10000);

        for (int i = 0; i < 20; i++) {

            String name = "thread" + i;
            Thread thread = new Thread(() -> {
                MyDistributedLock myDistributedLock = new MyDistributedLock(zk, name);
                myDistributedLock.lock();
//                try {
//                    Thread.sleep(1 * 1000);
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
                myDistributedLock.unlock();
            });
            thread.start();
        }

    }
}

Результаты выполнения следующие: в случае многопоточности блокировка/разблокировка и мониторинг проходят нормально:

thread1 current path :/distribute_lock2/0000000007:fail add listener wait path :/distribute_lock2/0000000006
thread6 current path :/distribute_lock2/0000000006:fail add listener wait path :/distribute_lock2/0000000005
thread3:success
delete : /distribute_lock2/0000000000
thread2 current path :/distribute_lock2/0000000005:fail add listener wait path :/distribute_lock2/0000000004
thread7 current path :/distribute_lock2/0000000004:fail add listener wait path :/distribute_lock2/0000000003
thread9 current path :/distribute_lock2/0000000009:fail add listener wait path :/distribute_lock2/0000000008
thread5 current path :/distribute_lock2/0000000008:fail add listener wait path :/distribute_lock2/0000000007
thread0 current path :/distribute_lock2/0000000001:fail add listener wait path :/distribute_lock2/0000000000
thread8 current path :/distribute_lock2/0000000002:fail add listener wait path :/distribute_lock2/0000000001
thread4 current path :/distribute_lock2/0000000003:fail add listener wait path :/distribute_lock2/0000000002
delete : /distribute_lock2/0000000001
prev node is done
thread8:success
delete : /distribute_lock2/0000000002
prev node is done
thread4:success
delete : /distribute_lock2/0000000003
prev node is done
thread7:success
delete : /distribute_lock2/0000000004
prev node is done
thread2:success
delete : /distribute_lock2/0000000005
prev node is done
thread6:success
delete : /distribute_lock2/0000000006
prev node is done
thread1:success
delete : /distribute_lock2/0000000007
prev node is done
thread5:success
delete : /distribute_lock2/0000000008
prev node is done
thread9:success
delete : /distribute_lock2/0000000009

Кураторский анализ исходного кода

1. Основное использование

 		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);
        client.start();
        InterProcessMutex lock2 = new InterProcessMutex(client, "/test");

        try {
            lock.acquire();
            //业务
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.release();
        }
  1. CuratorFrameworkFactory.newClient получает клиент zookeeper, retryPolicy указывает политику повторных попыток и открывает клиент.

  2. Curator сам предоставляет множество реализаций блокировок.В качестве примера мы возьмем реентерабельную блокировку InterProcessMutex.Метод lock.acquire() получает блокировку, lock.release() снимает блокировку, а метод Acquire также предоставляет перегруженные параметры времени ожидания.

2. Анализ исходного кода

замок

Метод internalLock используется непосредственно внутри захвата, и передается время ожидания -1.

 public void acquire() throws Exception {
        if(!this.internalLock(-1L, (TimeUnit)null)) {
            throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);
        }
    }

Метод internalLock сначала определяет, является ли это блокировкой с повторным входом, поддерживает потоки и атомарный счетчик через ConcurrentMap, и если это не блокировка с повторным входом, то получает блокировку через попыткуLock.

 private boolean internalLock(long time, TimeUnit unit) throws Exception
    {
        /*
           Note on concurrency: a given lockData instance
           can be only acted on by a single thread so locking isn't necessary
        */

        Thread currentThread = Thread.currentThread();

        LockData lockData = threadData.get(currentThread);
        if ( lockData != null )
        {
            // re-entering
            lockData.lockCount.incrementAndGet();
            return true;
        }

        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if ( lockPath != null )
        {
            LockData newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }

        return false;
    }

tryLock выполняет здесь циклическое ожидание, создает методTheLock для создания узла и internalLockLoop, чтобы определить, является ли текущий узел наименьшим узлом.

String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
    {
        final long      startMillis = System.currentTimeMillis();
        final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
        final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
        int             retryCount = 0;

        String          ourPath = null;
        boolean         hasTheLock = false;
        boolean         isDone = false;
        while ( !isDone )
        {
            isDone = true;

            try
            {
                ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            }
            catch ( KeeperException.NoNodeException e )
            {
                // gets thrown by StandardLockInternalsDriver when it can't find the lock node
                // this can happen when the session expires, etc. So, if the retry allows, just try it all again
                if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
                {
                    isDone = false;
                }
                else
                {
                    throw e;
                }
            }
        }

        if ( hasTheLock )
        {
            return ourPath;
        }

        return null;
    }

createTheLock заключается в вызове инкапсулированного куратором API для создания временной упорядоченной ноды

   public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
    {
        String ourPath;
        if ( lockNodeBytes != null )
        {
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
        }
        else
        {
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
        }
        return ourPath;
    }

Решение о блокировке InternalLockLoop, внутренним является driver.getsTheLock, чтобы определить, является ли он наименьшим узлом в текущем каталоге, если это так, вернуть успех получения блокировки, в противном случае отследить предыдущийSequencePath, а затем повторно оценить время ожидания после прослушивания действие завершено

private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
    {
        boolean     haveTheLock = false;
        boolean     doDelete = false;
        try
        {
            if ( revocable.get() != null )
            {
                client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
            }

            while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
            {
                List<String>        children = getSortedChildren();
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash

                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                if ( predicateResults.getsTheLock() )
                {
                    haveTheLock = true;
                }
                else
                {
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                    synchronized(this)
                    {
                        try 
                        {
                            // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                            client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                            if ( millisToWait != null )
                            {
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if ( millisToWait <= 0 )
                                {
                                    doDelete = true;    // timed out - delete our node
                                    break;
                                }

                                wait(millisToWait);
                            }
                            else
                            {
                                wait();
                            }
                        }
                        catch ( KeeperException.NoNodeException e ) 
                        {
                            // it has been deleted (i.e. lock released). Try to acquire again
                        }
                    }
                }
            }
        }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            doDelete = true;
            throw e;
        }
        finally
        {
            if ( doDelete )
            {
                deleteOurPath(ourPath);
            }
        }
        return haveTheLock;
    }

разблокировать

Код релиза относительно прост. Сначала нужно определить, есть ли счетчик блокировок текущего потока на карте. Если исключение не выброшено, если оно существует, выполнить операцию атомарного вычитания. Внутри releaseLock находится операция удаления узла. .удалить внутри

  public void release() throws Exception
    {
        /*
            Note on concurrency: a given lockData instance
            can be only acted on by a single thread so locking isn't necessary
         */

        Thread currentThread = Thread.currentThread();
        LockData lockData = threadData.get(currentThread);
        if ( lockData == null )
        {
            throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
        }

        int newLockCount = lockData.lockCount.decrementAndGet();
        if ( newLockCount > 0 )
        {
            return;
        }
        if ( newLockCount < 0 )
        {
            throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
        }
        try
        {
            internals.releaseLock(lockData.lockPath);
        }
        finally
        {
            threadData.remove(currentThread);
        }
    }

постскриптум

分布式锁的实现目前主流比较常用的实现就是Redis和Zookeeper了,相比较自己的实现,Redission和Curator的设计实现更为优秀,也更值得我们借鉴和学习

千里之行,积于跬步;万里之船,成于罗盘,共勉。