Zookeeper раздал блокировку мертвой серии синхронизации java

Java

проблема

(1) Как zookeeper реализует распределенные блокировки?

(2) Каковы преимущества распределенных замков zookeeper?

(3) Каковы недостатки распределенных замков zookeeper?

Введение

zooKeeper — это распределенная служба координации распределенных приложений с открытым исходным кодом. Она может предоставлять согласованные услуги для распределенных приложений. Она является важным компонентом Hadoop и Hbase. Ее также можно использовать в качестве центра конфигурации и центра регистрации в микросервисах. система.

В этой главе мы познакомим вас с тем, как zookeeper реализует распределенные блокировки в распределенных системах.

Базовые знания

Что такое зноды?

ZooKeeper управляет и поддерживает узлы данных, называемые znodes, которые управляются иерархической древовидной структурой, похожей на файловую систему.Если узлы znode содержат данные, они хранятся в виде массивов байтов.

Более того, когда несколько клиентов на одном и том же узле создаются одновременно [эта статья изначально была создана общедоступной учетной записью «Tongge Reading Source Code»], только один клиент будет успешным, а другие клиенты не будут созданы при создании.

zooKeeper

Тип узла

Существует четыре типа znodes:

  • стойкий (неупорядоченный)
  • Постоянный заказ
  • Временный (незаказанный)
  • временный порядок

Среди них постоянный узел будет существовать всегда, если его не удалить вручную, а временный узел автоматически удалит узел при сбое сеанса клиента.

Что такое наблюдатель?

Watcher (прослушиватель событий) — очень важная функция в zookeeper.

zookeeper позволяет пользователям регистрировать некоторых наблюдателей на указанных узлах, и когда происходят определенные события, сервер zookeeper уведомляет заинтересованных клиентов о событиях.Этот механизм заключается в том, что Zookeeper реализует службы распределенной координации.Важные особенности.

KeeperState EventType Условия срабатывания инструкция действовать
СинкКоннектед (3) Нет (-1) Клиент и сервер успешно устанавливают соединение В этот момент клиент и сервер подключены -
То же NodeCreated (1) Создается соответствующий узел данных, отслеживаемый Watcher. То же Create
То же Узелудален (2) Соответствующий узел данных, отслеживаемый Наблюдателем, удаляется. То же Delete/znode
То же NodeDataChanged(3) Содержимое данных соответствующего узла данных, отслеживаемого Наблюдателем, изменяется. То же setDate/znode
То же NodeChildChanged (4) Список дочерних узлов соответствующего узла данных, отслеживаемого Wather, изменился То же Create/child
Отключено (0) Нет (-1) Клиент отключается от сервера ZooKeeper В этот момент клиент и сервер находятся в отключенном состоянии. -
Просрочено (-112) Нет (-1) время ожидания сеанса В это время клиентский сеанс недействителен, и он обычно также подвергается исключению SessionExpiredException. -
Ошибка авторизации (4) Нет (-1) Обычно бывает два случая: 1: используется неправильная схема для проверки разрешений 2: проверка разрешений SASL завершается неудачно Обычно одновременно получают исключение AuthFailedException. -

Принципиальный анализ

Вариант первый

Так как один и тот же узел может быть создан только один раз, то при блокировке проверяйте, существует ли узел, создайте его, если он не существует, и прослушивайте событие удаления этого узла, если он существует или создание не удается, чтобы при блокировке отпускается, прослушивающий клиент снова начинает конкурировать за создание этого узла. В случае успеха блокировка будет получена. В случае неудачи узел снова будет отслеживаться.

zooKeeper

Например, есть три клиент-клиент1, Client2, Client3 приобретено одновременно / Locker / user_1 Замок, который будет работать следующие шаги:

(1) Трое пытаются одновременно создать узел /locker/user_1;

(2) client1 успешно создан, он получает блокировку;

(3) Создание client2 и client3 не удается, и они отслеживают событие удаления /locker/user_1;

(4) client1 выполняет бизнес-логику в замке;

(5) client1 снимает блокировку и удаляет узел /locker/user_1;

(6) И client2, и client3 фиксируют событие удаления узла /locker/user_1, и оба пробуждаются;

(7) client2 и client3 одновременно создают узел /locker/user_1;

(8) Вернитесь ко второму шагу и так далее [эта статья изначально была создана общедоступной учетной записью «Tong Ge Reading Source Code»];

Однако у этой схемы есть очень серьезный недостаток — шокирующий стадный эффект.

Если параллелизм высокий, несколько клиентов одновременно наблюдают за одним и тем же узлом, пробуждают такое количество клиентов одновременно при снятии блокировки, а затем снова конкурируют.В конце концов, только один может получить блокировку, а другие клиенты снова будут спать Пробуждение не имеет смысла и сильно тратит системные ресурсы, так что есть ли лучшее решение? Ответ да, конечно, смотрите второй вариант.

Вариант 2

Чтобы решить эффект шокирующего стада в решении 1, мы можем реализовать распределенные блокировки в виде упорядоченных дочерних узлов, а во избежание риска внезапного отключения после получения блокировки клиентом необходимо использовать временные упорядоченные узлы .

zooKeeper

Например, если три клиента client1, client2 и client3 одновременно получат блокировку /locker/user_1, они будут работать следующим образом:

(1) Три одновременно создают временные упорядоченные дочерние узлы в /locker/user_1/;

(2) Все три успешно созданы соответственно /locker/user1/0000000001, /locker/пользователь1/0000000003, /locker/user_1/0000000002;

(3) Проверить, является ли созданный вами узел наименьшим среди дочерних узлов;

(4) client1 оказывается наименьшим узлом и получает блокировку;

(5) client2 и client3 обнаруживают, что они не самые маленькие узлы, и они не могут получить блокировку;

(6) Узел, созданный client2, называется /locker/user.1/0000000003, который прослушивает предыдущий узел /locker/user1/0000000002 удалить событие;

(7) Узел, созданный client3, называется /locker/user.1/0000000002, который прослушивает предыдущий узел /locker/user1/0000000001 события удаления;

(8) client1 выполняет бизнес-логику в замке;

(9) client1 снимает блокировку и удаляет узел /locker/user_1/0000000001;

(10) client3 прослушивает событие удаления узла /locker/user_1/0000000001 и пробуждается;

(11) client3 снова проверяет, является ли он наименьшим узлом, и если он найден, он получает блокировку;

(12) client3 выполняет бизнес-логику в блокировке [Эта статья была первоначально создана публичной учетной записью «Tong Ge Reading Source Code»];

(13) client3 снимает блокировку и удаляет узел /locker/user_1/0000000002;

(14) client2 прослушивает событие удаления узла /locker/user_1/0000000002 и пробуждается;

(15) client2 выполняет бизнес-логику в замке;

(16) client2 снимает блокировку и удаляет узел /locker/user_1/0000000003;

(17) client2 проверяет /locker/user1/ Есть ли под ним какой-либо дочерний узел, если нет, удалите /locker/user1 узел;

(18) Процесс заканчивается;

По сравнению со схемой 1, эта схема пробуждает только одного клиента каждый раз, когда снимается блокировка, что снижает стоимость пробуждения потока и повышает эффективность.

Реализация собственного API зоопарка

файл pom

В pom вводятся следующие пакеты jar:

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.5.5</version>
</dependency>

Интерфейс шкафчика

Определите интерфейс Locker, используя тот же интерфейс с тем же интерфейсом с распределенной блокировкой MYSQL.

public interface Locker {
    void lock(String key, Runnable command);
}

Реализация распределенной блокировки Zookeeper

Здесь связанные операции zookeeper обрабатываются внутренним классом ZkLockerWatcher, Следует отметить следующие моменты:

(1) Не выполняйте соответствующие операции до установления соединения zk, в противном случае будет сообщено об исключении ConnectionLoss Здесь LockSupport.park() блокирует поток подключения и пробуждает обработку в потоке прослушивания;

(2) Клиентский поток и прослушивающий поток не являются одним и тем же потоком, поэтому они могут быть обработаны с помощью LockSupport.park() и LockSupport.unpark(thread);;

(3) Многие шаги в середине не являются атомарными (ямами), поэтому их необходимо проверить еще раз, подробности см. в комментариях в коде;

@Slf4j
@Component
public class ZkLocker implements Locker {
    @Override
    public void lock(String key, Runnable command) {
        ZkLockerWatcher watcher = ZkLockerWatcher.conn(key);
        try {
            if (watcher.getLock()) {
                command.run();
            }
        } finally {
            watcher.releaseLock();
        }
    }

    private static class ZkLockerWatcher implements Watcher {
        public static final String connAddr = "127.0.0.1:2181";
        public static final int timeout = 6000;
        public static final String LOCKER_ROOT = "/locker";

        ZooKeeper zooKeeper;
        String parentLockPath;
        String childLockPath;
        Thread thread;

        public static ZkLockerWatcher conn(String key) {
            ZkLockerWatcher watcher = new ZkLockerWatcher();
            try {
                ZooKeeper zooKeeper = watcher.zooKeeper = new ZooKeeper(connAddr, timeout, watcher);
                watcher.thread = Thread.currentThread();
                // 阻塞等待连接建立完毕
                LockSupport.park();
                // 根节点如果不存在,就创建一个(并发问题,如果两个线程同时检测不存在,两个同时去创建必须有一个会失败)
                if (zooKeeper.exists(LOCKER_ROOT, false) == null) {
                    try {
                        zooKeeper.create(LOCKER_ROOT, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                    } catch (KeeperException e) {
                        // 如果节点已存在,则创建失败,这里捕获异常,并不阻挡程序正常运行
                        log.info("创建节点 {} 失败", LOCKER_ROOT);
                    }
                }
                // 当前加锁的节点是否存在
                watcher.parentLockPath = LOCKER_ROOT + "/" + key;
                if (zooKeeper.exists(watcher.parentLockPath, false) == null) {
                    try {
                        zooKeeper.create(watcher.parentLockPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                    } catch (KeeperException e) {
                        // 如果节点已存在,则创建失败,这里捕获异常,并不阻挡程序正常运行
                        log.info("创建节点 {} 失败", watcher.parentLockPath);
                    }
                }

            } catch (Exception e) {
                log.error("conn to zk error", e);
                throw new RuntimeException("conn to zk error");
            }
            return watcher;
        }

        public boolean getLock() {
            try {
                // 创建子节点【本篇文章由公众号“彤哥读源码”原创】
                this.childLockPath = zooKeeper.create(parentLockPath + "/", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                // 检查自己是不是最小的节点,是则获取成功,不是则监听上一个节点
                return getLockOrWatchLast();
            } catch (Exception e) {
                log.error("get lock error", e);
                throw new RuntimeException("get lock error");
            } finally {
//                System.out.println("getLock: " + childLockPath);
            }
        }

        public void releaseLock() {
            try {
                if (childLockPath != null) {
                    // 释放锁,删除节点
                    zooKeeper.delete(childLockPath, -1);
                }
                // 最后一个释放的删除锁节点
                List<String> children = zooKeeper.getChildren(parentLockPath, false);
                if (children.isEmpty()) {
                    try {
                        zooKeeper.delete(parentLockPath, -1);
                    } catch (KeeperException e) {
                        // 如果删除之前又新加了一个子节点,会删除失败
                        log.info("删除节点 {} 失败", parentLockPath);
                    }
                }
                // 关闭zk连接
                if (zooKeeper != null) {
                    zooKeeper.close();
                }
            } catch (Exception e) {
                log.error("release lock error", e);
                throw new RuntimeException("release lock error");
            } finally {
//                System.out.println("releaseLock: " + childLockPath);
            }
        }

        private boolean getLockOrWatchLast() throws KeeperException, InterruptedException {
            List<String> children = zooKeeper.getChildren(parentLockPath, false);
            // 必须要排序一下,这里取出来的顺序可能是乱的
            Collections.sort(children);
            // 如果当前节点是第一个子节点,则获取锁成功
            if ((parentLockPath + "/" + children.get(0)).equals(childLockPath)) {
                return true;
            }

            // 如果不是第一个子节点,就监听前一个节点
            String last = "";
            for (String child : children) {
                if ((parentLockPath + "/" + child).equals(childLockPath)) {
                    break;
                }
                last = child;
            }

            if (zooKeeper.exists(parentLockPath + "/" + last, true) != null) {
                this.thread = Thread.currentThread();
                // 阻塞当前线程
                LockSupport.park();
                // 唤醒之后重新检测自己是不是最小的节点,因为有可能上一个节点断线了
                return getLockOrWatchLast();
            } else {
                // 如果上一个节点不存在,说明还没来得及监听就释放了,重新检查一次
                return getLockOrWatchLast();
            }
        }

        @Override
        public void process(WatchedEvent event) {
            if (this.thread != null) {
                // 唤醒阻塞的线程(这是在监听线程,跟获取锁的线程不是同一个线程)
                LockSupport.unpark(this.thread);
                this.thread = null;
            }
        }
    }
}

тестовый код

Мы начинаем две партии нитей здесь, и одна партия получает пользователям1 Этот замок, пакет пользователей приобретения2 этот замок.

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class ZkLockerTest {

    @Autowired
    private Locker locker;

    @Test
    public void testZkLocker() throws IOException {
        for (int i = 0; i < 1000; i++) {
            new Thread(()->{
                locker.lock("user_1", ()-> {
                    try {
                        System.out.println(String.format("user_1 time: %d, threadName: %s", System.currentTimeMillis(), Thread.currentThread().getName()));
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }, "Thread-"+i).start();
        }
        for (int i = 1000; i < 2000; i++) {
            new Thread(()->{
                locker.lock("user_2", ()-> {
                    try {
                        System.out.println(String.format("user_2 time: %d, threadName: %s", System.currentTimeMillis(), Thread.currentThread().getName()));
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }, "Thread-"+i).start();
        }

        System.in.read();
    }
}

результат операции:

Видно, что результат печати двух замков стабилен на уровне около 500 мс.

user_1 time: 1568973299578, threadName: Thread-10
user_2 time: 1568973299579, threadName: Thread-1780
user_1 time: 1568973300091, threadName: Thread-887
user_2 time: 1568973300091, threadName: Thread-1542
user_1 time: 1568973300594, threadName: Thread-882
user_2 time: 1568973300594, threadName: Thread-1539
user_2 time: 1568973301098, threadName: Thread-1592
user_1 time: 1568973301098, threadName: Thread-799
user_1 time: 1568973301601, threadName: Thread-444
user_2 time: 1568973301601, threadName: Thread-1096
user_1 time: 1568973302104, threadName: Thread-908
user_2 time: 1568973302104, threadName: Thread-1574
user_2 time: 1568973302607, threadName: Thread-1515
user_1 time: 1568973302607, threadName: Thread-80
user_1 time: 1568973303110, threadName: Thread-274
user_2 time: 1568973303110, threadName: Thread-1774
user_1 time: 1568973303615, threadName: Thread-324
user_2 time: 1568973303615, threadName: Thread-1621

Реализация куратора

Над реализацией собственного API легче понять логику Zookeeper для достижения распределенной блокировки, но это неизбежно для обеспечения отсутствия проблем, таких как блокировка без повторного входа, не поддерживает блокировку чтения-записи.

Давайте посмотрим, как реализован существующий колесный куратор.

файл pom

В файл pom вводятся следующие пакеты jar:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.0.0</version>
</dependency>

Код

Ниже приведена реализация мьютекса:

@Component
@Slf4j
public class ZkCuratorLocker implements Locker {
    public static final String connAddr = "127.0.0.1:2181";
    public static final int timeout = 6000;
    public static final String LOCKER_ROOT = "/locker";

    private CuratorFramework cf;

    @PostConstruct
    public void init() {
        this.cf = CuratorFrameworkFactory.builder()
                .connectString(connAddr)
                .sessionTimeoutMs(timeout)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .build();

        cf.start();
    }

    @Override
    public void lock(String key, Runnable command) {
        String path = LOCKER_ROOT + "/" + key;
        InterProcessLock lock = new InterProcessMutex(cf, path);
        try {
            // 【本篇文章由公众号“彤哥读源码”原创】
            lock.acquire();
            command.run();
        } catch (Exception e) {
            log.error("get lock error", e);
            throw new RuntimeException("get lock error", e);
        } finally {
            try {
                lock.release();
            } catch (Exception e) {
                log.error("release lock error", e);
                throw new RuntimeException("release lock error", e);
            }
        }
    }
}

В дополнение к блокировкам мьютекса куратор также предоставляет такие реализации, как блокировки чтения-записи, множественные блокировки и семафоры, и они являются реентерабельными блокировками.

Суммировать

(1) Зоокольник узла Существует четыре типа: длительные, долговечные и упорядоченные, временные, временные и упорядоченные;

(2) zookeeper предоставляет очень важную функцию — механизм мониторинга, который можно использовать для отслеживания изменений в узлах;

(3) Распределенная блокировка zookeeper основана на временном упорядоченном узле + механизме мониторинга;

(4) Zookeeper создает временный упорядоченный узел под путем блокировки, когда распределенная блокировка заблокирована;

(5) Если вы первый узел, получите блокировку;

(6) Если вы не первый узел, следите за предыдущим узлом и блокируйте текущий поток;

(7) При прослушивании события удаления предыдущего узла разбудить поток текущего узла и снова проверить, является ли он первым узлом;

(8) Используйте временные упорядоченные узлы вместо долговременного упорядоченного узла, чтобы автоматически снимать блокировку, когда клиент открыт без причины.

пасхальные яйца

Каковы преимущества распределенных замков zookeeper?

Ответ: 1) сам zookeeper можно развернуть в кластере, что надежнее, чем одиночная точка mysql;

2) Не будет занимать количество соединений mysql и не будет увеличивать нагрузку на mysql;

3) Используйте механизм мониторинга, чтобы уменьшить количество переключений контекста потока;

4) Клиент отключен автоматически отпустите замок, очень безопасно;

5) можно использовать обычный куратор колеса;

6) реализация куратора является реентерабельной, а стоимость преобразования существующего кода невелика;

Каковы недостатки распределенных блокировок zookeeper?

Ответ: 1) Блокировка будет часто "записывать" зоокипера, увеличивая нагрузку на зоокипера;

2) При написании zookeeper будет синхронизироваться в кластере, чем больше нод, тем медленнее синхронизация и медленнее процесс получения блокировок;

3) Необходимо полагаться на zookeeper, а большинство сервисов не будут использовать zookeeper, что усложняет систему;

4) по сравнению с распределенным замком Redis, производительность немного хуже;

Рекомендуемое чтение

1,Открытие серии java-синхронизации мертвых приседаний

2,Небезопасный анализ мертвого магического класса Java

3.JMM (модель памяти Java) из мертвой серии синхронизации Java

4.Неустойчивый анализ мертвой серии синхронизации Java

5.Синхронный анализ мертвой серии синхронизации Java

6.Напишите блокировку самостоятельно в серии «Синхронизация Java»

7.Серия Death Java Synchronous AQS

8,ReentrantLock анализ исходного кода тупиковой серии синхронизации Java (1) - справедливая блокировка, нечестная блокировка

9,ReentrantLock Анализ исходного кода мертвой серии синхронизации Java (2) — условная блокировка

10.ReentrantLock VS синхронизирован в серии синхронизации java

11.Анализ исходного кода исходного кода DEETRANTRANTRANTRANTREADREAD

12.Анализ исходного кода семафора серии Dead Java Synchronization

13.Анализ исходного кода CountDownLatch серии Dead Java Synchronization

14.Заключительная глава AQS в серии о синхронизации Java.

15.Анализ исходного кода StampedLock мертвой серии синхронизации Java

16.Анализ исходного кода CyclicBarrier мертвой серии синхронизации java

17.Анализ исходного кода Phaser для серии синхронизации Java с мертвым стуком

18.MakeLock Java Synchronization Series Mysql распределенный замок

Добро пожаловать, чтобы обратить внимание на мою общедоступную учетную запись «Брат Тонг читает исходный код», проверить больше статей из серии исходного кода и поплавать в океане исходного кода с братом Тонгом.

qrcode