Оригинальная ссылка:Куратор клиента ZooKeeper
ZooKeeper — это распределенная служба координации распределенных приложений с открытым исходным кодом, реализация Chubby от Google с открытым исходным кодом. Это менеджер кластера, который отслеживает состояние каждого узла в кластере и выполняет следующую разумную операцию в соответствии с отзывами, отправленными узлом. В конечном итоге пользователям предоставляется простой в использовании интерфейс и система с высокой производительностью и стабильными функциями.
Curator — это набор клиентских фреймворков Zookeeper с открытым исходным кодом от Netflix, который решает многие очень низкоуровневые детали разработки клиентов Zookeeper, включая переподключение, повторную регистрацию исключений Watcher и NodeExistsException и т. д. Куратор содержит несколько пакетов:
- curator-framework: некоторая инкапсуляция базового API Zookeeper.
- куратор-клиент: Предоставляет некоторые операции на стороне клиента, такие как стратегии повторных попыток и т. д.
- кураторские рецепты: инкапсулирует некоторые расширенные функции, такие как: мониторинг событий кэша, выборы, распределенная блокировка, распределенный счетчик, распределенный барьер и т. д.
Проблемы с версией куратора и зоопарка
В настоящее время Curator имеет две серийные версии, 2.x.x и 3.x.x, и поддерживает разные версии Zookeeper. Среди них Curator 2.x.x совместим с Zookeeper 3.4.x и 3.5.x. Curator 3.x.x совместим только с Zookeeper 3.5.x и предоставляет некоторые новые функции, такие как динамическая реконфигурация, удаление часов и т. д.
Curator 2.x.x - compatible with both ZooKeeper 3.4.x and ZooKeeper 3.5.x
Curator 3.x.x - compatible only with ZooKeeper 3.5.x and includes support for new
Если есть проблемы с совместимостью между версиями, очень вероятно, что работа узла не будет работать.В то время я наступил на эту яму и выдал следующее исключение:
KeeperErrorCode = Unimplemented for /***
Curator API
Здесь не будет сравниваться разница с нативным API, API куратора просматривается напрямую через интерфейс org.apache.curator.framework.CuratorFramework и используется в сочетании с соответствующим кейсом для дальнейшего использования.
Чтобы визуально видеть информацию узла Zookeeper, вы можете рассмотреть возможность получения интерфейса управления zk, распространенными являются zkui и zkweb.
Чжун Куй:GitHub.com/Ди М открыть/Это…
зквеб:GitHub.com/Только Том/Тяжелый...
Zkweb, который я использую, хотя интерфейс выглядит не таким обтекаемым, как zkui, он немного лучше, чем zkui, с точки зрения иерархического отображения и некоторых деталей.
Подготовка окружающей среды
Ранее написанноеZookeeper Установка и развертывание на LinuxОбратите внимание, что для других операционных систем, пожалуйста, погуглите руководство самостоятельно.
Проект кейса в этой статье был синхронизирован с github,портал.
PS: Конкретного исходного кода Curator я пока не видел, так что анализ исходного кода и принципов реализации не будет, эта статья в основном о некоторых записях реального использования для будущего использования. Если в тексте есть ошибки, укажите на них.
Инициализация и время инициализации клиента Куратора
В реальном проекте инициализация клиента Zookeeper выполняется во время запуска программы.
время инициализации
Чаще всего в проектах Spring или SpringBoot используется привязка к жизненному циклу запуска контейнера или жизненному циклу запуска приложения:
- Слушайте событие ContextRefreshedEvent и инициализируйте Zookeeper после завершения обновления контейнера.
- Прослушайте событие ApplicationReadyEvent/ApplicationStartedEvent и инициализируйте клиент Zookeeper.
В дополнение к вышеперечисленным методам существует также распространенный способ привязки к жизненному циклу бина.
- Реализуйте интерфейс InitializingBean и завершите инициализацию клиента Zookeeper в afterPropertiesSet.
О механизме SpringBoot в случае написания статьи можно сослаться на:Механизм событий в Springboot-Springboot.
Инициализация куратора
Здесь используется этот способ InitializingBean, и код выглядит следующим образом:
public class ZookeeperCuratorClient implements InitializingBean {
private CuratorFramework curatorClient;
@Value("${glmapper.zookeeper.address:localhost:2181}")
private String connectString;
@Value("${glmapper.zookeeper.baseSleepTimeMs:1000}")
private int baseSleepTimeMs;
@Value("${glmapper.zookeeper.maxRetries:3}")
private int maxRetries;
@Value("${glmapper.zookeeper.sessionTimeoutMs:6000}")
private int sessionTimeoutMs;
@Value("${glmapper.zookeeper.connectionTimeoutMs:6000}")
private int connectionTimeoutMs;
@Override
public void afterPropertiesSet() throws Exception {
// custom policy
RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
// to build curatorClient
curatorClient = CuratorFrameworkFactory.builder().connectString(connectString)
.sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs)
.retryPolicy(retryPolicy).build();
curatorClient.start();
}
public CuratorFramework getCuratorClient() {
return curatorClient;
}
}
glmapper.zookeeper.xxx — это некоторые параметры zookeeper, которые необходимо настроить в файле конфигурации в этом примере.Параметры объясняются следующим образом:
- baseSleepTimeMs: начальное время ожидания между повторными попытками
- maxRetries: максимальное количество попыток
- Подключение: список серверов для подключения
- sessionTimeoutMs: время ожидания сеанса
- connectionTimeoutMs: время ожидания соединения
Кроме того, при инициализации клиента Curator необходимо указать политику повторных попыток. Интерфейс RetryPolicy — это интерфейс верхнего уровня политики повторных попыток подключения (используется, когда zookeeper теряет соединение) в Curator. Его система наследования классов показана на следующем рисунке. фигура:
- RetryOneTime: повторное подключение только один раз
- RetryNTime: укажите количество повторных подключений N
- RetryUtilElapsed: укажите максимальное время ожидания повторного подключения и интервал времени повторного подключения, а также периодическое повторное подключение до тех пор, пока не истечет время ожидания или соединение не будет успешным.
- ExponentialBackoffRetry: на основе варианта переподключения с отсрочкой, разница заключается в том, что интервал переподключения RetryUtilElapsed является динамическим.
- BoundedExponentialBackoffRetry: отличие от ExponentialBackoffRetry в том, что добавлено управление максимальным количеством повторных попыток.
Помимо вышеперечисленного, в некоторых сценариях необходимо изолировать разные сервисы.В этом случае это решается установкой пространства имен.Пространство имен фактически является корневым путем указанного zookeeper.После установки все последующие операции будут основываться на этом корневом каталоге.
Использование базового API куратора
Проверить, существует ли узел
Метод checkExists возвращает конструктор ExistsBuilder, который вернет объект Stat, как если бы был вызван org.apache.zookeeper.ZooKeeper.exists(). null означает, что он не существует, в то время как фактический объект Stat означает, что он существует.
public void checkNodeExist(String path) throws Exception {
Stat stat = curatorClient.checkExists().forPath(path);
if (stat != null){
throw new RuntimeException("path = "+path +" has bean exist.");
}
}
В практических приложениях рекомендуется выполнять checkExists на узлах, которые необходимо использовать при работе с узлами.
новый узел
-
Создание узлов нерекурсивно
curatorClient.create().forPath("/glmapper"); curatorClient.create().forPath("/glmapper/test");
Сначала создайте /glmapper, а затем создайте /test в /glmapper.Если вы используете /glmapper/test напрямую без предварительного создания /glmapper, будет выдано исключение:
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /glmapper/test
Если вам нужно указать данные в узле при создании узла, вы можете сделать это:
curatorClient.create().forPath("/glmapper","data".getBytes());
Укажите тип узла (временный узел EPHEMERAL)
curatorClient.create().withMode(CreateMode.EPHEMERAL).forPath("/glmapper","data".getBytes());
-
Создание узлов рекурсивно
Существует два метода рекурсивного создания узлов: createParentsIfNeeded и createParentContainersIfNeeded. Между двумя рекурсивными методами создания в новой версии zookeeper есть различия: createParentContainersIfNeeded() рекурсивно создает узлы в режиме контейнера, если в старой версии zookeeper этот метод эквивалентен createParentsIfNeeded().
В случае нерекурсивного режима, если вы создадите /glmapper/test напрямую, будет сообщено об ошибке, тогда возможно в рекурсивном режиме
curatorClient.create().creatingParentContainersIfNeeded().forPath("/glmapper/test");
В рекурсивном вызове, если CreateMode не указан, значение по умолчанию
PERSISTENT
, если указан как временный узел, конечный узел будет временным узлом, а родительский узел по-прежнему будетPERSISTENT
удалить узел
-
Удалить узлы не рекурсивно
curatorClient.delete().forPath("/glmapper/test");
Укажите конкретную версию
curatorClient.delete().withVersion(-1).forPath("/glmapper/test");
Используйте гарантированный метод для удаления, гарантированный гарантирует, что в случае действительного сеанса фон будет продолжать удалять узел, пока он не будет удален.
curatorClient.delete().guaranteed().withVersion(-1).forPath("/glmapper/test");
-
Рекурсивно удалить текущий узел и его дочерние элементы
curatorClient.delete().deletingChildrenIfNeeded().forPath("/glmapper/test");
Получить данные узла
Получить данные узла
byte[] data = curatorClient.getData().forPath("/glmapper/test");
Данные распаковываются в соответствии с настроенным провайдером сжатия
byte[] data = curatorClient.getData().decompressed().forPath("/glmapper/test");
Чтение данных и получение статистической информации
Stat stat = new Stat();
byte[] data = curatorClient.getData().storingStatIn(stat).forPath("/glmapper/test");
Обновить данные узла
установить указанное значение
curatorClient.setData().forPath("/glmapper/test","newData".getBytes());
Настроить данные и сжать их с помощью настроенного поставщика сжатия
curatorClient.setData().compressed().forPath("/glmapper/test","newData".getBytes());
Установить данные и указать версию
curatorClient.setData().withVersion(-1).forPath("/glmapper/test","newData".getBytes());
получить подсписок
List<String> childrenList = curatorClient.getChildren().forPath("/glmapper");
событие
Куратор также инкапсулирует мониторинг событий в типичных сценариях Zookeeper, Эта часть возможности находится в пакете curator-recipes.
тип события
При использовании разных методов будут разные события.
public enum CuratorEventType
{
//Corresponds to {@link CuratorFramework#create()}
CREATE,
//Corresponds to {@link CuratorFramework#delete()}
DELETE,
//Corresponds to {@link CuratorFramework#checkExists()}
EXISTS,
//Corresponds to {@link CuratorFramework#getData()}
GET_DATA,
//Corresponds to {@link CuratorFramework#setData()}
SET_DATA,
//Corresponds to {@link CuratorFramework#getChildren()}
CHILDREN,
//Corresponds to {@link CuratorFramework#sync(String, Object)}
SYNC,
//Corresponds to {@link CuratorFramework#getACL()}
GET_ACL,
//Corresponds to {@link CuratorFramework#setACL()}
SET_ACL,
//Corresponds to {@link Watchable#usingWatcher(Watcher)} or {@link Watchable#watched()}
WATCHED,
//Event sent when client is being closed
CLOSING
}
прослушиватель событий
Разовый метод мониторинга: Watcher
Использование Watcher для мониторинга узлов можно рассматривать в типичных бизнес-сценариях, но в целом это не рекомендуется.
byte[] data = curatorClient.getData().usingWatcher(new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("监听器 watchedEvent:" + watchedEvent);
}
}).forPath("/glmapper/test");
System.out.println("监听节点内容:" + new String(data));
// 第一次变更节点数据
curatorClient.setData().forPath("/glmapper/test","newData".getBytes());
// 第二次变更节点数据
curatorClient.setData().forPath("/glmapper/test","newChangedData".getBytes());
Приведенный выше код регистрирует событие прослушивателя Watcher для узла /glmapper/test и возвращает содержимое текущего узла. Два изменения данных были внесены позже, фактически, когда было внесено второе изменение, срок действия мониторинга истек, и событие изменения узла не могло быть получено повторно. Информация, выводимая консолью во время теста, выглядит следующим образом:
监听节点内容:data
watchedEvent:WatchedEvent state:SyncConnected type:NodeDataChanged path:/glmapper/test
Метод CuratorListener
Слушатель CuratorListener, этот слушатель в основном предназначен для фоновых уведомлений и уведомлений об ошибках. После использования этого прослушивателя вызов метода inBackground асинхронно получит прослушиватель, и событие прослушивателя не будет инициировано для создания или модификации узла.
CuratorListener listener = new CuratorListener(){
@Override
public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("event : " + event);
}
};
// 绑定监听器
curatorClient.getCuratorListenable().addListener(listener);
// 异步获取节点数据
curatorClient.getData().inBackground().forPath("/glmapper/test");
// 更新节点数据
curatorClient.setData().forPath("/glmapper/test","newData".getBytes());
Информация, выводимая консолью во время теста, выглядит следующим образом:
event : CuratorEventImpl{type=GET_DATA, resultCode=0, path='/glmapper/test', name='null', children=null, context=null, stat=5867,5867,1555140974671,1555140974671,0,0,0,0,4,0,5867
, data=[100, 97, 116, 97], watchedEvent=null, aclList=null}
Здесь запускается только один обратный вызов прослушивателя, который называется getData .
Куратор представил механизм прослушивания событий кеша
Куратор представляет Cache для реализации мониторинга событий на сервере Zookeeper.Мониторинг событий кэша можно понимать как процесс сравнения между представлением локального кэша и удаленным представлением Zookeeper. В кэше предусмотрена функция повторной регистрации. Кэш делится на два типа регистрации: прослушиватель узла и прослушиватель дочернего узла.
-
NodeCache
Слушайте изменения в самом узле данных. Мониторинг узла должен взаимодействовать с функцией обратного вызова для обработки бизнес-процессов после получения события мониторинга. NodeCache завершает последующую обработку через NodeCacheListener.
String path = "/glmapper/test"; final NodeCache nodeCache = new NodeCache(curatorClient,path); //如果设置为true则在首次启动时就会缓存节点内容到Cache中。 nodeCache.start(true); nodeCache.start(); nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("触发监听回调,当前节点数据为:" + new String(nodeCache.getCurrentData().getData())); } }); curatorClient.setData().forPath(path,"1".getBytes()); curatorClient.setData().forPath(path,"2".getBytes()); curatorClient.setData().forPath(path,"3".getBytes()); curatorClient.setData().forPath(path,"4".getBytes()); curatorClient.setData().forPath(path,"5".getBytes()); curatorClient.setData().forPath(path,"6".getBytes());
Примечание: Во время теста, когда nodeCache.start() и NodeCache несколько раз изменяли содержимое прослушивающего узла, возникал феномен отсутствия событий.За 5 раз выполнения варианта использования все события отслеживались только один раз, если nodeCache.start (true), NodeCache не потеряет содержимое при многократном изменении содержимого прослушивающего узла.
NodeCache не только отслеживает изменение содержимого узла, но также прослушивает существование указанного узла. Если исходный узел не существует, то Cache инициирует мониторинг при создании узла.Если узел удален, он не сможет инициировать событие мониторинга.
-
PathChildrenCache
PathChildrenCache не будет слушать два дочерних узла, дочерние узлы будут слушать.
String path = "/glmapper"; PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorClient,path,true); // 如果设置为true则在首次启动时就会缓存节点内容到Cache中。 nodeCache.start(true); pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception { System.out.println("-----------------------------"); System.out.println("event:" + event.getType()); if (event.getData()!=null){ System.out.println("path:" + event.getData().getPath()); } System.out.println("-----------------------------"); } }); zookeeperCuratorClient.createNode("/glmapper/test","data".getBytes(),CreateMode.PERSISTENT); Thread.sleep(1000); curatorClient.setData().forPath("/glmapper/test","1".getBytes()); Thread.sleep(1000); curatorClient.setData().forPath("/glmapper/test","2".getBytes()); Thread.sleep(1000); zookeeperCuratorClient.createNode("/glmapper/test/second","data".getBytes(),CreateMode.PERSISTENT); Thread.sleep(1000); curatorClient.setData().forPath("/glmapper/test/second","1".getBytes()); Thread.sleep(1000); curatorClient.setData().forPath("/glmapper/test/second","2".getBytes()); Thread.sleep(1000);
Примечание: В ходе тестирования было обнаружено, что если между двумя последовательными операциями нет определенного временного интервала, следующее событие не может быть отслежено. Таким образом, отслеживаются только дочерние узлы, поэтому операции под дочерним узлом второго уровня /second не могут отслеживаться. Информация, выводимая консолью во время теста, выглядит следующим образом:
----------------------------- event:CHILD_ADDED path:/glmapper/test ----------------------------- ----------------------------- event:INITIALIZED ----------------------------- ----------------------------- event:CHILD_UPDATED path:/glmapper/test ----------------------------- ----------------------------- event:CHILD_UPDATED path:/glmapper/test -----------------------------
-
TreeCache
TreeCache использует внутренний класс
TreeNode
для поддержания этой древовидной структуры. И сопоставьте эту древовидную структуру с узлами ZK. Таким образом, TreeCache может отслеживать события всех узлов текущего узла.String path = "/glmapper"; TreeCache treeCache = new TreeCache(curatorClient,path); treeCache.getListenable().addListener((client,event)-> { System.out.println("-----------------------------"); System.out.println("event:" + event.getType()); if (event.getData()!=null){ System.out.println("path:" + event.getData().getPath()); } System.out.println("-----------------------------"); }); treeCache.start(); zookeeperCuratorClient.createNode("/glmapper/test","data".getBytes(),CreateMode.PERSISTENT); Thread.sleep(1000); curatorClient.setData().forPath("/glmapper/test","1".getBytes()); Thread.sleep(1000); curatorClient.setData().forPath("/glmapper/test","2".getBytes()); Thread.sleep(1000); zookeeperCuratorClient.createNode("/glmapper/test/second","data".getBytes(),CreateMode.PERSISTENT); Thread.sleep(1000); curatorClient.setData().forPath("/glmapper/test/second","1".getBytes()); Thread.sleep(1000); curatorClient.setData().forPath("/glmapper/test/second","2".getBytes()); Thread.sleep(1000);
Информация, выводимая консолью во время теста, выглядит следующим образом:
----------------------------- event:NODE_ADDED path:/glmapper ----------------------------- ----------------------------- event:NODE_ADDED path:/glmapper/test ----------------------------- ----------------------------- event:NODE_UPDATED path:/glmapper/test ----------------------------- ----------------------------- event:NODE_UPDATED path:/glmapper/test ----------------------------- ----------------------------- event:NODE_ADDED path:/glmapper/test/second ----------------------------- ----------------------------- event:NODE_UPDATED path:/glmapper/test/second ----------------------------- ----------------------------- event:NODE_UPDATED path:/glmapper/test/second -----------------------------
транзакционная операция
Экземпляр CuratorFramework содержит метод интерфейса inTransaction(), который вызывается для запуска транзакции ZooKeeper. Вы можете комбинировать операции создания, установки данных, проверки и/или удаления, а затем вызывать commit() для фиксации в качестве атомарной операции.
// 开启事务
CuratorTransaction curatorTransaction = curatorClient.inTransaction();
Collection<CuratorTransactionResult> commit =
// 操作1
curatorTransaction.create().withMode(CreateMode.EPHEMERAL).forPath("/glmapper/transaction")
.and()
// 操作2
.delete().forPath("/glmapper/test")
.and()
// 操作3
.setData().forPath("/glmapper/transaction", "data".getBytes())
.and()
// 提交事务
.commit();
Iterator<CuratorTransactionResult> iterator = commit.iterator();
while (iterator.hasNext()){
CuratorTransactionResult next = iterator.next();
System.out.println(next.getForPath());
System.out.println(next.getResultPath());
System.out.println(next.getType());
}
Здесь отладчик смотрит на информацию о коллекции, панель выглядит следующим образом:
Асинхронная работа
Добавления, удаления, изменения и запросы, упомянутые выше, являются синхронными, но Curator также предоставляет асинхронный интерфейс и вводит интерфейс BackgroundCallback для обработки информации о результатах, возвращаемой сервером после вызова асинхронного интерфейса. Важным значением обратного вызова в интерфейсе BackgroundCallback является CuratorEvent, который содержит сведения о типе события, ответе и узле.
Его также очень просто использовать, просто вызовите функцию Background() следующим образом:
curatorClient.getData().inBackground().forPath("/glmapper/test");
Посмотрев на определение метода inBackground, вы увидите, что inBackground поддерживает пользовательский пул потоков для обработки бизнес-логики после возврата результата.
public T inBackground(BackgroundCallback callback, Executor executor);
Здесь код не будет опубликован.
резюме
В этой статье рассказывается об основном API Curator записи для основной и исходной части, не задействованной. Если у вас есть время на эту часть, изучайте ее медленно. Кроме того, как распределенная блокировка, распределенная последовательность приращения и, таким образом, достижение пребывания на теоретической стадии, нет практики, боясь прыгать, а затем использовать код.