Введение в ЗК
что такое зк
Zookeeper — это высокопроизводительная служба координации распределенных приложений с открытым исходным кодом, которая предоставляет простые и примитивные функции.Распределенные приложения могут реализовывать на ее основе более продвинутые службы, такие как синхронизация (распределенные блокировки), управление конфигурацией и управление кластером. Его легко изменить, используя дерево каталогов файловой системы в качестве модели данных. Сервер написан на языке Java и предоставляет клиентов на языках Java и C.
модель данных zk
zk представляет собой иерархическую древовидную структуру
- Каждый узел в древовидной структуре называется Znode.
- Каждый Znode может иметь данные (тип byte[]) и дочерние узлы
- Путь Znode разделен косой чертой, например /Zoo/Duck.В zk нет относительного пути, то есть путь всех узлов должен быть Ctrip абсолютным путем.
- Когда данные узла в zk изменяются, номер версии увеличивается.
- Данные в Znode можно читать и записывать.
Сценарии применения zk
Данные публиковать/подписываться
Публикация/подписка на данные — это так называемый центр конфигурации: издатель публикует данные в один или несколько узлов столбца zk, а подписчик подписывается на данные и может быть уведомлен об изменении данных в режиме реального времени.
Приложение A публикует данные на узле zkServer, а приложения B и C сначала регистрируют наблюдатели на zkServer для прослушивания узла (эквивалент прослушивателя, основанный на реализации RPC).Об изменениях наблюдателя уведомляются, а затем получаются последние данные с zkсервера.балансировки нагрузки
Реализация балансировки нагрузки с помощью zk — это, по сути, использование функции управления конфигурацией zk для выполнения шагов балансировки нагрузки:
- Поставщик услуг регистрирует собственное доменное имя и сопоставление IP-портов в zk.
- Потребитель услуги получает соответствующий IP и порт от zk через доменное имя.IP и портов может быть несколько, но получается только один из них.
- Когда поставщик услуг выходит из строя, соответствующее доменное имя и IP-соответствие будут сокращены на одно сопоставление.
- Платформа обслуживания dubbo от Ali основана на zk для реализации маршрутизации и загрузки служб.
Распределенная координация/уведомление
Внедрение распределенных блокировок и распределенных транзакций с помощью zk-наблюдателя и механизма уведомлений.
Управление кластером
Получите количество машин в текущем кластере, рабочее состояние машин в кластере, сетевые и автономные операции узлов в кластере и унифицированную конфигурацию узлов кластера.
Базовые концепты
Кластерная роль
- Лидер: Предоставляет услуги чтения и записи для клиентов.
- Последователь: предоставляет клиенту услугу чтения.Запрос на запись от клиента к ведомому будет переадресован на роль ведущего, а ведомый будет участвовать в выборах ведущего.
- Наблюдатель: предоставляет услуги чтения для клиентов и не участвует в выборах лидеров, как правило, для улучшения возможности параллелизма запросов на чтение в кластере zk.
беседа
- Сессия — это длительное соединение, установленное между клиентом и сервером zk.
- zk пульсирует сеанс, чтобы определить живость клиентских соединений.
- Клиент zk получает уведомления о событиях наблюдения от сервера в сеансе.
- zk может установить тайм-аут для сеанса.
узел данных zk
1. Znode — это узел данных в древовидной структуре zk для хранения данных 2. Znode делится на два типа: постоянный узел и временный узел: Постоянный узел: после создания он всегда хранится в zk, если только не будет активно вызвана операция удаления. Временный узел: привязан к сеансу клиента, после сбоя клиента все временные узлы, созданные клиентом, будут удалены. 3. Свойство Sequential может быть установлено для постоянных узлов или временных узлов.Если это свойство установлено, к имени узла будет автоматически добавляться целое число.
версия в зк
В zk есть три типа версий 1. версия: представляет версию текущего Znode. 2.Cversion: представляет версию дочернего узла текущего Znode.При изменении дочернего узла значение номера версии будет увеличено. 3. Отклонение: представляет собой версию ACL (управление доступом) текущего Znode.Изменение полномочий управления доступом узла увеличит значение номера версии.
наблюдатель ЗК
Наблюдатель прослушивает узел Znode.Когда данные узла обновляются или статус дочернего узла изменяется, наблюдатель клиента будет уведомлен.
ACL (Контроль доступа) в zk
Подобно управлению разрешениями в Linux/Unix, существуют следующие разрешения на управление доступом: 1. CREATE: разрешение на создание дочерних узлов 2. УДАЛИТЬ: разрешение на удаление дочерних узлов 3. ЧТЕНИЕ: Разрешение на получение данных узла и списка дочерних узлов 4. WRITE: Разрешение на обновление данных узла 5. ADMIN: Разрешение на установку ACL узла
Основные рабочие команды zk
1. Команда помощи помощь 2. путь ls [смотреть] Путь указывает путь к узлу данных, а параметр наблюдения означает отслеживание изменений всех дочерних узлов по указанному пути. Функция команды ls состоит в том, чтобы вывести список всех дочерних узлов указанного узла, а ls может просматривать только все дочерние узлы первого уровня.
3. создать [-s] [-e] команда acl данных пути Функция этой команды заключается в создании узла zk, -s означает, что созданный узел имеет последовательные атрибуты, -e означает, что создается временный узел, а постоянный узел создается по умолчанию, путь - это полный путь к узлу. , а data — это созданный узел data, acl используется для управления разрешениями, По умолчанию управление разрешениями не выполняется.
4. команда get path [watch] Получите содержимое данных и информацию об атрибутах узла пути, опция наблюдения работает так же, как команда ls
5. команда set path data [version] Функция этой команды заключается в обновлении содержимого данных узла пути, данные — это обновленные данные, а версия — версия указанных данных для обновления.Если версия меньше, чем текущая версия данных, будет сообщено об ошибке. .
6. команда удаления пути [версии] Удалите узел, путь которого указан как путь, а версия указывает версию удаленных данных. Как правило, она не указывается, что означает, что удаляется последняя версия данных. Если версия является старой версией, будет сообщено об ошибке.
Контроль доступа для Znodes
Полное название ACL — Access Control List, список контроля доступа, ACL в zk состоит из трех частей, а именно Scheme:id:permission. в: 1. Схема — это стратегия проверки, используемая в процессе проверки. 2. id — это объект, которому предоставлены полномочия, например, ip или пользователь 3. Разрешение — это действующее разрешение с пятью значениями: crdwa, которые представляют соответственно создание/чтение/удаление/запись/администрирование Конкретные значения описаны выше. Используйте команду setAcl path acl для установки полномочий доступа узла, где path — это путь к узлу, а acl — устанавливаемые полномочия. Вы можете просмотреть информацию о разрешениях узла через путь getAcl. Следует отметить, что acl узла не имеет отношения наследования. Существует пять типов политик проверки авторизации, а именно схемы: world/auth/digest/IP/super, которые описаны ниже:
стратегия мировых испытаний
Формат ACL: мир: любой: разрешение Если выбрана стратегия обнаружения "всемирная", бит фиксированного идентификатора – любой. Если разрешение – crdwa, это означает, что любой пользователь имеет право создавать дочерние узлы, читать данные узлов, удалять дочерние узлы, обновлять данные дочерних узлов и задавать узлы ACL. разрешения. ACL по умолчанию для создания нового узла: world:anyone:crdwa
стратегия проверки авторизации
Формат ACl: auth:id:разрешение Например, auth:username:password:crdwa, политика проверки подлинности указывает, что разрешения ACL установлены для всех пользователей, прошедших проверку подлинности. Пользователей можно добавить с помощью команды addauth дайджест:. Если вы создаете несколько групп пользователей и паролей через addauth, когда вы используете setAcl для изменения разрешений, разрешения всех пользователей и паролей будут соответствующим образом изменены. Вновь созданная группа пользователя и пароля через addauth должна снова вызвать setAcl, чтобы быть добавленной в группу разрешений.
Подводя итог, setAcl в рамках политики проверки подлинности установит разрешения для доступа к узлу для всех пользователей, принадлежащих текущему сеансу. Вновь добавленным пользователям текущего сеанса необходимо сбросить разрешения ACL узла перед добавлением операции нового пользователя. разрешения на узел.
стратегия проверки дайджеста
Формат ACL: дайджест:id:разрешение Дайджест похож на авторизацию, за исключением того, что идентификатор в формате дайджеста должен быть зашифрован с помощью sha1, а zk предоставил нам соответствующие классы шифрования. Вот код для шифрования идентификатора:
public class DigestTest {
public static void main(String[] args) {
try {
System.out.println(DigestAuthenticationProvider.generateDigest("acluser1:111111"));
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
Пример:
=========================客户端1上的操作======================
[zk: localhost:2181(CONNECTED) 86] create /acltest acltestdata // 添加测试节点
Node already exists: /acltest
// 添加三个用户
[zk: localhost:2181(CONNECTED) 87] addauth digest acluser1:111111
[zk: localhost:2181(CONNECTED) 88] addauth digest acluser2:222222
[zk: localhost:2181(CONNECTED) 89] addauth digest acluser3:333333
// 设置digest类型的acl权限
[zk: localhost:2181(CONNECTED) 90] setAcl /acltest digest:acluser1:hHUVzmra9P/TbXlP/4jRhG9jZm8=:crdwa
cZxid = 0x7c
ctime = Fri Sep 14 18:42:17 PDT 2018
mZxid = 0x7c
mtime = Fri Sep 14 18:42:17 PDT 2018
pZxid = 0x7c
cversion = 0
dataVersion = 0
aclVersion = 1
ephemeralOwner = 0x0
dataLength = 11
numChildren = 0
// 发现只有acluser1用户具有对/acltest节点的crdwa权限
[zk: localhost:2181(CONNECTED) 91] getAcl /acltest
'digest,'acluser1:hHUVzmra9P/TbXlP/4jRhG9jZm8=
: cdrwa
======================客户端2上的操作=======================
// 只添加acluser2用户,发现没有权限读取/acltest节点
[zk: localhost:2181(CONNECTED) 14] addauth digest acluser2:222222
[zk: localhost:2181(CONNECTED) 15] get /acltest
Authentication is not valid : /acltest
// 添加acluser1后,能够读取/acltest节点
[zk: localhost:2181(CONNECTED) 16] addauth digest acluser1:111111
[zk: localhost:2181(CONNECTED) 17] get /acltest
acltestdata
cZxid = 0x7c
ctime = Fri Sep 14 18:42:17 PDT 2018
mZxid = 0x7c
mtime = Fri Sep 14 18:42:17 PDT 2018
pZxid = 0x7c
cversion = 0
dataVersion = 0
aclVersion = 1
ephemeralOwner = 0x0
dataLength = 11
numChildren = 0
Резюме: при вызове setAcl, в отличие от auth, будут установлены разрешения всех пользователей в текущем сеансе на доступ к узлу, но будут установлены только права доступа указанного одиночного пользователя к узлу. Идентификатор в настройке setAcl должен быть зашифрован с помощью sha1.
Политика проверки интеллектуальной собственности
Формат ACL: IP: ID: разрешение ID - это IP-адрес, укажите IP-адрес, который может получить доступ Пример:
[zk: localhost:2181(CONNECTED) 96] create -e /acltest acltestdata
Created /acltest
// 设置只有192.168.1.1这台机器可以访问
[zk: localhost:2181(CONNECTED) 97] setAcl /acltest ip:192.168.1.1:crdwa
cZxid = 0x85
ctime = Fri Sep 14 18:59:23 PDT 2018
mZxid = 0x85
mtime = Fri Sep 14 18:59:23 PDT 2018
pZxid = 0x85
cversion = 0
dataVersion = 0
aclVersion = 1
ephemeralOwner = 0x100063d34d5000b
dataLength = 11
numChildren = 0
// 本机地址为127.0.0.1,无法访问
[zk: localhost:2181(CONNECTED) 98] get /acltest
Authentication is not valid : /acltest
супер тестовая стратегия
В основном он предназначен для обслуживающего и ремонтного персонала для обслуживания узлов.Пользователь, исполняемый в acl, имеет полномочия на управление любым узлом.При запуске в сценарии запуска необходимо настроить следующие параметры:
-Dzookeeper.DigestAuthenticationProvider.superDigest=admin:015uTByzA4zSglcmseJsxTo7n3c=
Использование и принцип наблюдения
Проблемы, решаемые наблюдателем
Кластеры серверов приложений могут иметь две проблемы: 1. Поскольку в кластере много машин, при изменении общей конфигурации, как можно сделать так, чтобы конфигурация всех серверов автоматически вступала в силу одновременно? 2. Когда узел в кластере не работает, как сообщить об этом другим узлам в кластере? Чтобы решить эти две проблемы, zk вводит механизм наблюдателя для реализации функции публикации/подписки, которая позволяет нескольким подписчикам одновременно отслеживать определенный объект темы.При изменении статуса самого объекта темы все подписчики будут уведомлен.
Основной принцип наблюдателя
Для реализации наблюдателя zk требуется три части: zk-сервер, zk-клиент и клиентский watchManager. Как показано на рисунке, когда клиент регистрирует наблюдатель с помощью zk, объект наблюдателя клиента сохраняется в клиентском watchManager. После того, как zk-сервер инициирует событие наблюдения, он отправит уведомление клиенту, и клиентский поток достанет соответствующий наблюдатель из watchManager для выполнения.
Как клиент реализует действие уведомления о событии
Клиенту нужно только определить класс, реализующий интерфейс org.apache.zookeeper.Watcher, и реализовать следующие методы интерфейса:
abstract public void process(WatchedEvent event);
Вы можете выполнить соответствующее действие после получения уведомления. Параметр org.apache.zookeeper.WatchedEvent — это событие, отправляемое сервером zk, и состоит из трех элементов:
final private KeeperState keeperState; // 通知状态
final private EventType eventType; //事件类型
private String path ;//那个节点发生的事件
keeperState — это объект перечисления, который представляет состояние связи между клиентом и сервером zk, определяемое следующим образом:
/**
* Enumeration of states the ZooKeeper may be at the event
*/
public enum KeeperState {
/** Unused, this state is never generated by the server */
@Deprecated
Unknown (-1),
/** The client is in the disconnected state - it is not connected
* to any server in the ensemble. */
Disconnected (0),
/** Unused, this state is never generated by the server */
@Deprecated
NoSyncConnected (1),
/** The client is in the connected state - it is connected
* to a server in the ensemble (one of the servers specified
* in the host connection parameter during ZooKeeper client
* creation).
* /
SyncConnected (3),
/**
* Auth failed state
*/
AuthFailed (4),
/**
* The client is connected to a read-only server, that is the
* server which is not currently connected to the majority.
* The only operations allowed after receiving this state is
* read operations.
* This state is generated for read-only clients only since
* read/write clients aren't allowed to connect to r/o servers.
*/
ConnectedReadOnly (5),
/**
* SaslAuthenticated: used to notify clients that they are SASL-authenticated,
* so that they can perform Zookeeper actions with their SASL-authorized permissions.
*/
SaslAuthenticated(6),
/** The serving cluster has expired this session. The ZooKeeper
* client connection (the session) is no longer valid. You must
* create a new client connection (instantiate a new ZooKeeper
* instance) if you with to access the ensemble.
*/
Expired (-112);
private final int intValue; // Integer representation of value
// for sending over wire
KeeperState(int intValue) {
this.intValue = intValue;
}
public int getIntValue() {
return intValue;
}
public static KeeperState fromInt(int intValue) {
switch(intValue) {
case -1: return KeeperState.Unknown;
case 0: return KeeperState.Disconnected;
case 1: return KeeperState.NoSyncConnected;
case 3: return KeeperState.SyncConnected;
case 4: return KeeperState.AuthFailed;
case 5: return KeeperState.ConnectedReadOnly;
case 6: return KeeperState.SaslAuthenticated;
case -112: return KeeperState.Expired;
default:
throw new RuntimeException("Invalid integer value for conversion to KeeperState");
}
}
}
eventType также является типом перечисления, представляющим тип событий, происходящих в узле, таких как создание новых дочерних узлов, изменение данных узла и т. д. Определяется следующим образом:
/**
* Enumeration of types of events that may occur on the ZooKeeper
*/
public enum EventType {
None (-1),
NodeCreated (1),
NodeDeleted (2),
NodeDataChanged (3),
NodeChildrenChanged (4),
DataWatchRemoved (5),
ChildWatchRemoved (6);
private final int intValue; // Integer representation of value
// for sending over wire
EventType(int intValue) {
this.intValue = intValue;
}
public int getIntValue() {
return intValue;
}
public static EventType fromInt(int intValue) {
switch(intValue) {
case -1: return EventType.None;
case 1: return EventType.NodeCreated;
case 2: return EventType.NodeDeleted;
case 3: return EventType.NodeDataChanged;
case 4: return EventType.NodeChildrenChanged;
case 5: return EventType.DataWatchRemoved;
case 6: return EventType.ChildWatchRemoved;
default:
throw new RuntimeException("Invalid integer value for conversion to EventType");
}
}
}
Соответствие между keeperState и eventType следующее:
Для события NodeDataChanged: оно срабатывает независимо от того, изменяются ли данные узла или версия данных (даже если обновленные данные совпадают с новыми данными, версия данных изменится). Для события NodeChildrenChanged: добавление и удаление дочерних узлов вызовет этот тип события. Следует отметить, что WatchedEvent является только уведомлением, связанным с событием, и не соответствует исходному содержимому данных узла данных и новому содержимому данных после изменения, поэтому, если вам нужно знать данные до изменения или новые данные после изменения, Бизнесу необходимо сохранить данные перед обновлением и вызвать интерфейс для получения новых данных.
Как зарегистрировать наблюдателя
API регистрации наблюдателя
Вы можете зарегистрировать наблюдателя при создании экземпляра клиента zk (зарегистрируйте наблюдателя в конструкторе)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,ZKClientConfig conf)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider,ZKClientConfig clientConfig)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, ZKClientConfig conf)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly, HostProvider aHostProvider)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)
Наблюдатель, переданный в конструктор zk, будет использоваться как наблюдатель по умолчанию в течение всего сеанса zk. Наблюдатель всегда будет сохраняться как член defaultWatcher клиента ZKWatchManager. Если есть другие настройки, наблюдатель будет перезаписан. Помимо регистрации наблюдателя через конструктор класса ZK, вы также можете зарегистрировать наблюдателя через какой-либо другой API в классе zk, но наблюдатель, зарегистрированный этими API, не является наблюдателем по умолчанию.
public List<String> getChildren(final String path, Watcher watcher)
// boolean watch表示是否使用上下文中默认的watcher,即创建zk实例时设置的watcher
public List<String> getChildren(String path, boolean watch)
// boolean watch表示是否使用上下文中默认的watcher,即创建zk实例时设置的watcher
public byte[] getData(String path, boolean watch, Stat stat)
public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx)
// boolean watch表示是否使用上下文中默认的watcher,即创建zk实例时设置的watcher
public Stat exists(String path, boolean watch)
public Stat exists(final String path, Watcher watcher)
пример кода регистрации наблюдателя
В этом примере встроенный клиент zk используется для демонстрации использования наблюдателя.О встроенном клиенте zk следует отметить одну вещь: после того, как наблюдатель установлен, как только он сработает один раз, он будет недействительно. Если вам нужно постоянно контролировать, вам нужно зарегистрироваться снова. Определите наблюдателя по умолчанию:
/**
* 测试默认watcher
*/
public class DefaultWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {
System.out.println("==========DefaultWatcher start==============");
System.out.println("DefaultWatcher state: " + event.getState().name());
System.out.println("DefaultWatcher type: " + event.getType().name());
System.out.println("DefaultWatcher path: " + event.getPath());
System.out.println("==========DefaultWatcher end==============");
}
}
Определите наблюдателя, который прослушивает изменения в дочерних узлах:
/**
* 用于监听子节点变化的watcher
*/
public class ChildrenWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {
System.out.println("==========ChildrenWatcher start==============");
System.out.println("ChildrenWatcher state: " + event.getState().name());
System.out.println("ChildrenWatcher type: " + event.getType().name());
System.out.println("ChildrenWatcher path: " + event.getPath());
System.out.println("==========ChildrenWatcher end==============");
}
}
Определите наблюдателя, который прослушивает изменения узлов:
public class DataWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {
System.out.println("==========DataWatcher start==============");
System.out.println("DataWatcher state: " + event.getState().name());
System.out.println("DataWatcher type: " + event.getType().name());
System.out.println("DataWatcher path: " + event.getPath());
System.out.println("==========DataWatcher end==============");
}
}
тестовый код наблюдателя:
public class WatcherTest {
/**
* 链接zk服务端的地址
*/
private static final String CONNECT_STRING = "192.168.0.113:2181";
public static void main(String[] args) {
// 除了默认watcher外其他watcher一旦触发就会失效,需要充新注册,本示例中因为
// 还未想到比较好的重新注册watcher方式(考虑到如果在Watcher中持有一个zk客户端的
// 实例可能存在循环引用的问题),因此暂不实现watcher失效后重新注册watcher的问题,
// 后续可以查阅curator重新注册watcher的实现方法。
// 默认watcher
DefaultWatcher defaultWatcher = new DefaultWatcher();
// 监听子节点变化的watcher
ChildrenWatcher childrenWatcher = new ChildrenWatcher();
// 监听节点数据变化的watcher
DataWatcher dataWatcher = new DataWatcher();
try {
// 创建zk客户端,并注册默认watcher
ZooKeeper zooKeeper = new ZooKeeper(CONNECT_STRING, 100000, defaultWatcher);
// 让默认watcher监听 /GetChildren 节点的子节点变化
// zooKeeper.getChildren("/GetChildren", true);
// 让childrenWatcher监听 /GetChildren 节点的子节点变化(默认watcher不再监听该节点子节点变化)
zooKeeper.getChildren("/GetChildren", childrenWatcher);
// 让dataWatcher监听 /GetChildren 节点本省的变化(默认watcher不再监听该节点变化)
zooKeeper.getData("/GetChildren", dataWatcher, null);
TimeUnit.SECONDS.sleep(1000000);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
Процесс тестирования
1. Сначала создайте узел /GetChildren в клиенте командной строки.
[zk: localhost:2181(CONNECTED) 133] create /GetChildren GetChildrenData
Created /GetChildren
Запуск тестового кода WatcherTest выводит следующее:
==========DefaultWatcher start==============
DefaultWatcher state: SyncConnected
DefaultWatcher type: None
DefaultWatcher path: null
==========DefaultWatcher end==============
Можно видеть, что уведомление о событии успешной связи запускается, когда клиент подключается к серверу zk в первый раз, и событие принимается наблюдателем по умолчанию, что приводит к выполнению кода, связанного с наблюдателем по умолчанию. 2. Затем создайте дочерний узел в клиенте командной строки.
[zk: localhost:2181(CONNECTED) 134] create /GetChildren/ChildNode ChildNodeData
Created /GetChildren/ChildNode
ChildrenWatcher уведомляется об изменении дочерних элементов /GetChildren, поэтому выводится следующее:
==========ChildrenWatcher start==============
ChildrenWatcher state: SyncConnected
ChildrenWatcher type: NodeChildrenChanged
ChildrenWatcher path: /GetChildren
==========ChildrenWatcher end==============
3. Наконец, измените данные узла /GetChildren на стороне клиента.
[zk: localhost:2181(CONNECTED) 135] set /GetChildren GetChildrenDataV2
cZxid = 0xab
ctime = Sat Sep 15 03:52:48 PDT 2018
mZxid = 0xb0
mtime = Sat Sep 15 04:06:05 PDT 2018
pZxid = 0xaf
cversion = 1
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 17
numChildren = 1
DataWatcher получает уведомление и выводит следующее:
==========DataWatcher start==============
DataWatcher state: SyncConnected
DataWatcher type: NodeDataChanged
DataWatcher path: /GetChildren
==========DataWatcher end==============
Затем мы можем изменить данные узла /GetChildren на стороне клиента, но никаких выходных данных не будет, что указывает на то, что срок действия DataWatcher истек и его необходимо повторно зарегистрировать, прежде чем его можно будет использовать.
понимание распределенных систем
Характеристики кластеров распределенной системы
1, все узлы в кластере, чтобы поддерживать консистенцию данных 2, все узлы могут обеспечить одинаковые функции обслуживания (не обязательно одновременно) 3, кластеры высокой доступности необходимо защитить систему, узел идет вниз, не повлияет на сервис.
Как обеспечить согласованность данных в кластерной среде
Существует три способа обеспечения согласованности данных в кластерной среде: репликация данных, WNR и централизованное хранилище.
1. Репликация данных: сначала запишите на один узел, а затем реплицируйте на другие узлы, zk объяснил, что это достигается. Или несколько узлов записывают одновременно, но это подходит только для сценариев приложений, где данные, записанные несколькими узлами, не являются одними и теми же данными. В сценарии ведущий-ведомый Синхронная репликация (подчиненное устройство возвращает клиенту успешную запись после того, как все репликации от ведущего устройства завершены) может обеспечить строгую согласованность, но это повлияет на доступность; асинхронная репликация (данные записываются на ведущее устройство и возвращают успешную запись, нет необходимости ждать, пока подчиненная репликация завершена, После этого мастер отправляет данные на ведомый через push или ведомый извлекает данные от ведущего через ull), что может обеспечить доступность, но снижает согласованность.
2. WNR: N представляет собой общее количество реплик, W представляет собой минимальное количество успешных реплик записи, которое должно быть гарантировано для каждой операции записи, а R представляет минимальное количество реплик, прочитанных за одну операцию чтения. можно гарантировать, что каждая по крайней мере одна реплика прочитанных данных имеет самое последнее обновление (Например, номер версии или отметка времени могут использоваться для определения, какая копия данных является последней.) Трудно гарантировать порядок нескольких операций записи, что может привести к несогласованности операций записи в нескольких копиях. часы, чтобы обеспечить окончательную согласованность.
3. Централизованное хранилище: с помощью высоконадежного централизованного хранилища, такого как хранилище NAS, распределенный кэш (Redis) и т. д.
шардирование распределенных систем
Для бизнес-систем, это разделить бизнес, различные подмодули представляют собой отдельный кластер, общая бизнес-система представляет собой большие распределенные системы. Для систем данных данные разделены.
zk-кластер
1. Кластер zk является одноранговым кластером, и все узлы (машины) имеют одинаковые данные. 2. Узлы кластера ощущают существование друг друга по пульсу. 3. Все операции записи выполняются на главном узле, а другие узлы могут только читать.Хотя они также могут получать запросы на запись, они будут внутренне передавать операции записи на главный узел. 4. Главный узел избирается с помощью механизма выборов, что обеспечивает высокую доступность главного узла, так что главный узел не фиксируется, и главный узел может быть переизбран в случае отказа главного узла. 5. Требуется не менее трех узлов, причем количество узлов должно быть нечетным. 6. Когда более половины данных записано успешно, возвращается сообщение об успешной записи, что является окончательной стратегией согласованности. Архитектура кластера zk показана на следующем рисунке.
Запрос на запись, отправленный клиентом ведомому, перенаправляется ведущему, а ведущий выполняет операцию записи, а затем синхронизируется с ведомым.
Теория распределенной системы ценности CAP
Определение CAP
C — согласованность, A — доступность, P — устойчивость к разделам. Определение CAP: Распределенная система не может одновременно удовлетворять трем требованиям согласованности, доступности и устойчивости к разделам, а только двум из них. 1. Согласованность: в распределенной среде согласованность в основном относится к тому, согласуются ли данные между несколькими копиями, то есть результат операции записи определенного узла виден последующим операциям чтения других узлов. обновленный и доступный одновременно, может немедленно воспринять его обновление, Это называется строгой согласованностью. Если часть или все обновление не воспринимается после того, как оно разрешено, это называется слабой согласованностью. Если обновление может быть воспринято через определенный период времени, это называется возможной согласованностью.
2. Доступность: Относится к услуге, предоставляемой системой, должна быть доступна в согласованном состоянии, пользователь всегда может запросить возврат результатов в ограниченное время, ограниченное время подчеркивает приемлемое время, любой отказ узла не должен ограничен Возврат результатов в течение разумного времени.
3. Устойчивость к разделам: когда в кластере есть сетевой раздел, кластер может продолжать обеспечивать определенную степень доступности и согласованности, если только вся сеть не будет недоступна, то есть когда некоторые узлы не работают или не могут взаимодействовать с другими узлами, разделы по-прежнему могут поддерживать функции распределенных систем.
Выполнение только двух пунктов не означает, что другой пункт полностью отсутствует, но требования не такие строгие. Устойчивость к разбиению является обязательным свойством распределенных систем, потому что сеть ненадежна и может быть обменена только между C и A.
Откажитесь от теоремы CAP
1. Отказаться от допуска разделения (P): то есть, если кластер имеет сетевой раздел, весь кластер не может предоставлять услуги доступности и согласованности, в этом случае либо узлы кластера не имеют состояния, либо все данные размещаются на одном узле , который потеряет расширяемый, Это уже нельзя назвать распределенной системой, поэтому приложение Shanji получило хороший CA, потому что отказалось от P.
2. Отказаться от доступности (A): Отказ от доступности не означает, что доступности вообще нет. Это означает, что период ожидания ответа может быть больше. Например, отчет может работать около 10 минут, или даже допустить тайм-аут в некоторых случаи.
3, чтобы дать консистенцию (C): согласованность отказывается от оказания сильной консистенции данных, при сохранении окончательной согласованности данных, данные в конечном итоге точно так же, но окно рассматриваемого времени, которое в соответствии с различным бизнесом нуждается в определении.
Базовая теория распределенных систем
BASE — это аббревиатура трех фраз: «Базовая доступность», «Мягкое состояние» и «Конечно согласованное», что является результатом компромисса между согласованностью и доступностью в CAP. Основная идея BASE заключается в том, что даже если невозможно достичь строгой согласованности, каждое приложение может достичь окончательной согласованности соответствующим образом в соответствии с его собственными бизнес-характеристиками и в то же время получить системную доступность.
Базовая доступность BASE отражается в двух аспектах: 1. Потеря времени отклика: например, некоторые запросы будут отвечать в течение 1 секунды, а некоторые запросы могут отвечать в течение 5 секунд. 2. Функциональная потеря: например, для системы электронной коммерции определенные области могут быть недоступны для покупки определенных продуктов, или некоторые потребители могут быть перенаправлены на страницу с более низким рейтингом во время большой акции.
Слабое состояние BASE: Также известное как мягкое состояние, оно означает, что данные в системе могут находиться в промежуточном состоянии, и считается, что это состояние не повлияет на общую доступность системы, то есть позволяет системе иметь определенную задержку между копии данных разных узлов.
Конечная консистенция BASE: После синхронизации реплик данных в системе в течение определенного периода времени группа может достичь согласованного состояния.
ZK поставляется с собственным клиентским API.
Создать zk-сессию
Конструктор класса org.apache.zookeeper.ZooKeeper используется для создания сеанса между клиентом zk и сервером. Этот класс предоставляет следующие конструкторы
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)
Описание параметра конструктора:
1. connectString: относится к списку серверов zk, разделенному запятой в английском методе ввода host:port, например, 192.168.1.1:2181, 192.168.1.2:2181, за которым также может следовать корневой каталог, чтобы указать работа этого клиента
находятся в этом корневом каталоге.
2. sessionTimeout: тайм-аут сеанса в миллисекундах. Если в течение этого времени не будет получено обнаружение сердцебиения, сеанс будет недействительным.
3. наблюдатель: зарегистрированный наблюдатель, ноль означает, что он не установлен.
4. canBeReadOnly: используется для определения того, поддерживает ли текущий сеанс режим «Только для чтения»: это означает, что когда машина в кластере zk не может связаться с более чем половиной сетевых портов машины в кластере, эта машина не будет получать любое чтение от клиента Запросы на запись, но иногда, мы хотим, чтобы навык предоставлял запросы на чтение, поэтому установите этот параметр в значение true, то есть клиент также может читать данные с машинных узлов, которые не подключены более чем к половине узлы в кластере.
5. sessionId и sessionPasswd: соответственно представляют идентификатор сеанса и секретный ключ сеанса.Вместе эти два параметра могут однозначно определить сеанс, и клиент может реализовать мультиплексирование сеанса клиента с помощью этих двух параметров.
Создать узел zk
Класс org.apache.aookeeper.ZooKeeper предоставляет следующие API для создания узлов zk:
public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode)
public void create(final String path, byte data[], List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx)
Первый метод создает узел синхронно, а второй метод создает узел асинхронно. Следует отметить, что независимо от того, синхронное или асинхронное, рекурсивное создание узла не поддерживается.Когда узел уже существует, будет выброшено исключение NodeExistsException. Создать описание параметра метода: 1. путь: путь созданного узла, например: /zk-book/fock 2. data[]: данные в узле, представляющие собой массив байтов. 3. acl: политика ACL 4, createMode: тип узла, тип перечисления, есть четыре варианта: постоянный (PERSISTENT), постоянная последовательность (PERSISTENT_SEQUENTIAL), временная (EPHEMENRAL) и временная последовательность (EPHEMERAL_SEQUENTIAL). 5. cb: асинхронная функция обратного вызова, которая должна реализовать интерфейс StringCallback.При создании серверной части клиентская сторона автоматически вызовет processResult этого объекта. 6. ctx: используется для передачи объекта, который может использоваться при выполнении метода обратного вызова, обычно используется для передачи контекстной информации о бизнесе.
удалить узел zk
// 以同步的方式删除节点
public void delete(final String path, int version)
throws InterruptedException, KeeperException
// 以异步的方式删除节点,如果写测试代码,客户端主线程不能退出,否则可能请求没有发到服物器或者异步回调不成功
public void delete(final String path, int version, VoidCallback cb, Object ctx)
Описание параметра:
1. путь: путь удаленного узла
2. версия: версия данных узла, если указанная версия не является последней версией, будет сообщено об ошибке
3, cb: асинхронная функция обратного вызова
4. ctx: переданная контекстная информация, то есть информация перед операцией, которая передается в асинхронную функцию обратного вызова после удаления.
Получить zk дочерних узлов
public List<String> getChildren(final String path, Watcher watcher) throws KeeperException, InterruptedException
public List<String> getChildren(String path, boolean watch) throws KeeperException, InterruptedException
public void getChildren(final String path, Watcher watcher, ChildrenCallback cb, Object ctx)
public void getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx)
public List<String> getChildren(final String path, Watcher watcher, Stat stat)
public List<String> getChildren(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException
public void getChildren(final String path, Watcher watcher, Children2Callback cb, Object ctx)
public void getChildren(String path, boolean watch, Children2Callback cb, Object ctx)
Описание параметра:
1. путь: путь узла данных, например /zk-book/foo, получить список дочерних узлов по измененному пути
2. Наблюдатель: набор наблюдателей для узла будет уведомлен, если количество дочерних узлов узла, соответствующего пути, изменится.
3. watch: использовать ли наблюдатель по умолчанию
4. stat: указывает информацию о состоянии узла данных.
5, cb: асинхронная функция обратного вызова
6. ctx: используется для передачи объекта, который можно использовать при выполнении метода обратного вызова, обычно используется для передачи контекстной информации о бизнесе.
Получить данные узла zk
public byte[] getData(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException
public byte[] getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException
public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx)
public void getData(String path, boolean watch, DataCallback cb, Object ctx)
Изменить данные узла zk
public Stat setData(final String path, byte data[], int version) throws KeeperException, InterruptedException
public void setData(final String path, byte data[], int version, StatCallback cb, Object ctx)
Проверьте, существует ли узел zk
public Stat exists(final String path, Watcher watcher) throws KeeperException, InterruptedException
public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException
public void exists(final String path, Watcher watcher, StatCallback cb, Object ctx)
public void exists(String path, boolean watch, StatCallback cb, Object ctx)
пример использования zk API
package com.ctrip.flight.test.zookeeper;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
import java.util.ArrayList;
import java.util.List;
public class ZkClientTest {
private static final String PARENT_PATH = "/zkClientTest";
private static final String CHILD_PATH = "/zkClientTest/childNodeTest";
private static final String IDENTITY = "zhangsan:123456";
public static void main(String[] args) {
try {
DefaultWatcher defaultWatcher = new DefaultWatcher();
ChildrenWatcher childrenWatcher = new ChildrenWatcher();
ParentWatcher parentWatcher = new ParentWatcher();
// 创建会话
ZooKeeper client = new ZooKeeper("192.168.0.102:2181,192.168.0.102:2182,192.168.0.102:2183", 30000, defaultWatcher);
client.addAuthInfo("digest", IDENTITY.getBytes());
Stat stat = client.exists(PARENT_PATH, false);
if (null != stat) {
client.delete(PARENT_PATH, -1);
}
// 创建节点,临时节点不能有子节点,所以父节点是持久节点
client.create(PARENT_PATH, "zkClientTestData_v1".getBytes(), getAcl(), CreateMode.PERSISTENT);
// 创建子节点
client.create(CHILD_PATH, "childNodeData_v1".getBytes(), getAcl(), CreateMode.EPHEMERAL);
// 获取子节点信息
Stat childStat = new Stat();
List<String> childs = client.getChildren(PARENT_PATH, childrenWatcher, childStat);
System.out.println(PARENT_PATH + "'s childs:" + childs);
System.out.println(PARENT_PATH + "'s stat:" + childStat);
Thread.sleep(1000);
// 获取父节点数据
Stat parentStat = new Stat();
byte[] parentData = client.getData(PARENT_PATH, parentWatcher, parentStat);
System.out.println(PARENT_PATH + "'s data: " + new String(parentData));
System.out.println(PARENT_PATH + "'s stat: " + parentStat);
Thread.sleep(1000);
// 设置子节点数据
childStat = client.setData(CHILD_PATH, "childNodeData_v2".getBytes(), -1);
System.out.println(CHILD_PATH + "'s stat:" + childStat);
byte[] childData = client.getData(CHILD_PATH, false, childStat);
System.out.println(CHILD_PATH + "'s data:" + new String(childData));
Thread.sleep(1000);
// 删除子节点
client.delete(CHILD_PATH, -1);
// 判断子节点是否存在
childStat = client.exists(CHILD_PATH, false);
System.out.println(CHILD_PATH + " is exist: " + (childStat != null));
client.delete(PARENT_PATH, -1);
client.close();
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* ACL格式为:schema:id:permission
* @return
*/
private static List<ACL> getAcl() throws Exception {
List<ACL> acls = new ArrayList<>();
// 指定schema
String scheme = "auth";
// 指定id
// String identity = "zhangsan:123456";
Id id = new Id(scheme, DigestAuthenticationProvider.generateDigest(IDENTITY));
// Perms.ALL的权限为crdwa
ACL acl = new ACL(ZooDefs.Perms.ALL, id);
acls.add(acl);
return acls;
}
}
zk куратор клиента с открытым исходным кодом
Недостаточно нативного API zk
1. Создание соединения является асинхронным, и разработчикам нужно самостоятельно писать код, чтобы добиться ожидания. 2. Соединение не имеет механизма автоматического переподключения по таймауту 3. Сам по себе zk не предоставляет механизма сериализации, который должен быть указан разработчиками для реализации сериализации и десериализации данных. 4. Регистрация наблюдателя вступит в силу только один раз, и требуется повторная регистрация. 5. Способ использования самого Watcher не соответствует терминологии самой java, легче понять, если используется способ использования watcher. 6. Рекурсивное создание узлов дерева не поддерживается.
zk сторонний клиент с открытым исходным кодом
Сторонние клиенты zk с открытым исходным кодом в основном включают zkClient и Curator. Среди них zkClient решает проблемы переподключения тайм-аута сессии и повторной регистрации Watcher, а также предоставляет более лаконичный API, но сообщество zkClient неактивно, Документация не идеальна. Куратор — один из топовых проектов Apache Foundation, решает проблемы переподключения тайм-аута сессии, повторной регистрации наблюдателя, исключения NodeExistsException и т. д. Куратор имеет более полный, Документация, поэтому здесь мы только изучаем использование Curator.
Куратор обеспечивает потоковую операцию, аналогичную потоковой передаче в jdk8.
Создать zk-сессию
Класс org.apache.curator.framework.CuratorFrameworkFactory в Curator предоставляет следующие два метода для создания сеансов zk:
public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
1. connectString: пары IP:порт, разделенные запятыми. 2. sessionTimeoutMs: время ожидания сеанса в миллисекундах, по умолчанию 60000 мс, что указывает на то, как долго после установления соединения без получения сигнала обнаружения сердцебиения, превышение этого времени является тайм-аутом сеанса. 3. connectTImeoutMs: тайм-аут создания соединения в миллисекундах, по умолчанию 15000 мс, что означает, сколько времени требуется клиенту, чтобы установить соединение с сервером, и тайм-аут, если соединение отсутствует. 4. retryPolicy: политика повторных попыток, тип retryPolicy определяется следующим образом:
/**
* Abstracts the policy to use when retrying connections
*/
public interface RetryPolicy
{
/**
* Called when an operation has failed for some reason. This method should return
* true to make another attempt.
*
*
* @param retryCount the number of times retried so far (0 the first time),第几次重试
* @param elapsedTimeMs the elapsed time in ms since the operation was attempted,到当前重试时刻总的重试时间
* @param sleeper use this to sleep - DO NOT call Thread.sleep,重试策略
* @return true/false
*/
public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper);
}
allowRetry возвращает true, чтобы продолжить повторную попытку, и возвращает false, чтобы не повторять попытку. Вы можете настроить стратегию, внедрив этот интерфейс, и куратор предоставил нам несколько стратегий повторных попыток. 1. ExponentialBackoffRetry: стратегия повторных попыток увеличивается экспоненциально с увеличением количества повторных попыток, а время ожидания увеличивается экспоненциально.
public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)
public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
Время ожидания повторной попытки retryCount рассчитывается как: baseSleepTimeMs * Math.max(1, random.nextInt(1
2. RetryNTimes: стратегия повторных попыток указывает количество повторных попыток, и каждый спящий режим имеет фиксированное время.Метод построения выглядит следующим образом:
public RetryNTimes(int n, int sleepMsBetweenRetries)
3. RetryOneTime: эта стратегия повтора повторяется только один раз. 4. RetryUntilElapsed: эта политика повторных попыток не ограничивает количество повторных попыток, но ограничивает общее время повторных попыток.
public RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries)
Помимо newClient, CuratorFrameworkFactory также предоставляет шаблон построителя для создания объектов CuratorFramework.
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 5);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.0.102:2181,192.168.0.102:2182,192.168.0.102:2183")
.sessionTimeoutMs(30000).connectionTimeoutMs(15000)
.retryPolicy(retryPolicy)
.namespace("curatorTest")
.build();
Создать узел zk
Независимо от того, какая операция выполняется в кураторе, вы должны сначала получить класс-оболочку (объект Builder), который строит операцию.Чтобы создать узел zk, вы должны сначала получить org.apache.curator.framework.api.CreateBuilder (на самом деле класс реализации CreateBuilderImpl CreateBuilderImpl), а затем использовать этот объект для создания узлов. Общие операции в CreateBuilderImpl следующие:
// 递归创建(持久)父目录
public ProtectACLCreateModeStatPathAndBytesable<String> creatingParentsIfNeeded()
// 设置创建节点的属性
public ACLBackgroundPathAndBytesable<String> withMode(CreateMode mode)
// 设置节点的acl属性
public ACLBackgroundPathAndBytesable<String> withACL(List<ACL> aclList, boolean applyToParents)
// 指定创建节点的路径和节点上的数据
public String forPath(final String givenPath, byte[] data) throws Exception
Пример создания узла показан ниже:
String test1Data = client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath("/curatorTest/test1", "test1".getBytes());
удалить узел zk
Точно так же вызовите delete() из CuratorFramework, чтобы получить DeleteBuilder (фактически DeleteBuilderImpl) для построения операции удаления. DeleteBuilderImpl предоставляет следующие методы:
// 指定要删除数据的版本号
public BackgroundPathable<Void> withVersion(int version)
// 确保数据被删除,本质上就是重试,当删除失败时重新发起删除操作
public ChildrenDeletable guaranteed()
// 指定删除的节点
public Void forPath(String path) throws Exception
// 递归删除子节点
public BackgroundVersionable deletingChildrenIfNeeded()
Чтение данных узла zk
Таким же образом сначала вызовите getData() CuratorFramework, чтобы получить GetDataBuilder (фактически GetDataBuilderImpl), который строит операцию сбора данных.GetDataBuilderImpl предоставляет следующие методы:
// 将节点状态信息保存到stat
public WatchPathable<byte[]> storingStatIn(Stat stat)
// 指定节点路径
public byte[] forPath(String path) throws Exception
В следующем примере показано получение данных узла:
Stat test1Stat = new Stat();
byte[] test1DataBytes = client.getData().storingStatIn(test1Stat).forPath("/curatorTest/test1");
System.out.println("test1 data: " + new String(test1DataBytes));
Обновить данные узла zk
Точно так же сначала вызовите setData() CuratorFramework, чтобы получить setDataBuilder (фактически setDataBuilderImpl) для построения операции обновления данных.setDataBuilderImpl предоставляет следующие методы:
// 指定版本号
public BackgroundPathAndBytesable<Stat> withVersion(int version)
// 指定节点路径和要更新的数据
public Stat forPath(String path, byte[] data) throws Exception
чтение zk дочерних узлов
Таким же образом сначала вызовите getChildren() CuratorFramework, чтобы получить GetChildrenBuilder (фактически GetChildrenBuilderImpl) для построения операции получения данных дочернего узла, GetChildrenBuilderImpl Предусмотрены следующие методы:
// 把服务器端获取到的状态数据存储到stat对象中
public WatchPathable<List<String>> storingStatIn(Stat stat)
// 指定获取子节点数据的节点路径
public List<String> forPath(String path) throws Exception
// 设置watcher,类似于zookeeper本身的api,也只能使用一次
public BackgroundPathable<List<String>> usingWatcher(Watcher watcher)
public BackgroundPathable<List<String>> usingWatcher(CuratorWatcher watcher)
Образец кода:
Stat childStat = new Stat();
List<String> childs = client.getChildren().storingStatIn(childStat).forPath("/curatorTest");
Асинхронная работа куратора
Куратор предоставляет асинхронные версии всех операций, добавляя один из следующих методов в цепочку методов, которая строит операцию.
public ErrorListenerPathable<List<String>> inBackground()
public ErrorListenerPathable<List<String>> inBackground(Object context)
public ErrorListenerPathable<List<String>> inBackground(BackgroundCallback callback)
public ErrorListenerPathable<List<String>> inBackground(BackgroundCallback callback, Object context)
public ErrorListenerPathable<List<String>> inBackground(BackgroundCallback callback, Executor executor)
public ErrorListenerPathable<List<String>> inBackground(BackgroundCallback callback, Object context, Executor executor)
В следующем примере кода используется асинхронная операция удаления:
client.delete()
.guaranteed()
.withVersion(-1)
.inBackground(((client1, event) -> {
System.out.println(event.getPath() + ", data=" + event.getData());
System.out.println("event type=" + event.getType());
System.out.println("event code=" + event.getResultCode());
}))
.forPath("/curatorTest/test1");
NodeCache куратора
NodeCache будет кэшировать копию узла (самого узла) определенного пути локально.Когда узел соответствующего пути в zk будет обновлен, создан или удален, NodeCache получит ответ и подтянет последние данные в локальный кеш . NodeCache прослушивает только изменения самого пути и не прослушивает изменения в дочерних узлах. Мы можем зарегистрировать прослушиватель в NodeCache, чтобы получать уведомления об изменениях. NodeCache предоставляет следующие конструкторы:
public NodeCache(CuratorFramework client, String path)
public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed)
1. клиент: куратор клиент 2. путь: путь к узлу, который необходимо кэшировать. 3. dataIsCompressed: сжимать ли данные под узлом NodeCache предоставляет контейнер прослушивателя следующих типов.Поскольку прослушиватель добавлен в контейнер, при изменении узла прослушиватели в контейнере будут уведомлены:
private final ListenerContainer<NodeCacheListener> listeners = new ListenerContainer<NodeCacheListener>();
Пример кода NodeCache для кэширования данных и добавления прослушивателя выглядит следующим образом:
NodeCache nodeCache = new NodeCache(client, "/curatorTest/test1");
// 是否立即拉取/curatorTest/test1节点下的数据缓存到本地
nodeCache.start(true);
// 添加listener
nodeCache.getListenable().addListener(() -> {
ChildData childData = nodeCache.getCurrentData();
if (null != childData) {
System.out.println("path=" + childData.getPath() + ", data=" + childData.getData() + ";");
}
});
Путь куратораДетиКэш
PathChildrenCache будет кэшировать все дочерние узлы под указанным узлом пути к локальному, но не будет кэшировать информацию о самом узле. Слушатели в PathChildCache будут уведомлены, когда будут выполняться такие операции, как подузлы в указанном узле.
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory)
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ThreadFactory threadFactory)
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService)
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService)
PathChildrenCache может проходить в трех режимах запуска через метод запуска, которые определены в org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode. 1. НОРМАЛЬНЫЙ: асинхронно инициализировать кеш 2. BUILD_INITIAL_CACHE: после синхронной инициализации кеша и создания кеша соответствующие данные извлекаются с сервера. 3. POST_INITIALIZED_EVENT: кеш инициализируется асинхронно, и событие PathChildrenCacheEvent.Type#INITIALIZED запускается после завершения инициализации, и прослушиватель в кеше получит уведомление о событии.
Пример кода PathChildrenCache выглядит следующим образом:
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/curatorTest", true);
// startMode为BUILD_INITIAL_CACHE,cache是初始化完成会发送INITIALIZED事件
pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
System.out.println(pathChildrenCache.getCurrentData().size());
pathChildrenCache.getListenable().addListener(((client1, event) -> {
ChildData data = event.getData();
switch (event.getType()) {
case INITIALIZED:
System.out.println("子节点cache初始化完成(StartMode为POST_INITIALIZED_EVENT的情况)");
System.out.println("INITIALIZED: " + pathChildrenCache.getCurrentData().size());
break;
case CHILD_ADDED:
System.out.println("添加子节点,path=" + data.getPath() + ", data=" + new String(data.getData()));
break;
case CHILD_UPDATED:
System.out.println("更新子节点,path=" + data.getPath() + ", data=" + new String(data.getData()));
break;
case CHILD_REMOVED:
System.out.println("删除子节点,path=" + data.getPath());
break;
default:
System.out.println(event.getType());
}
}));
Выбор ZK и согласованность данных
ZAB-протокол
Протокол ZAB (Zookeeper Atomic Broadcast), протокол трансляции атомарных сообщений zk, примерно таков: все запросы транзакций должны координироваться и обрабатываться глобально уникальным сервером, такой сервер называется сервером-лидером, Остальные серверы называются серверами-последователями.Сервер-лидер отвечает за преобразование запроса клиента в предложение транзакции (proposal) и распространение предложения на все серверы-последователи в кластере. После этого сервер-лидер должен дождаться отзыва от всех серверов-последователей.Как только более половины серверов-последователей дадут правильную обратную связь, лидер снова разошлет сообщение о фиксации всем серверам-последователям. Попросите его представить предыдущее Предложение.
Выборы лидера в кластере zk основаны на этом протоколе, и процесс записи данных также должен опираться на этот протокол.Ядро ZAB заключается в определении метода обработки запросов транзакций, которые будут изменять состояние данных сервера zk .
Роли узлов в кластере ZK
Узлы в кластере zk имеют три роли:
1. Лидер: Единственный планировщик и обработчик запросов транзакций, обеспечивающий порядок обработки транзакций в кластере, а также планировщик каждого сервера в составе кластера.
2. Последователь: обрабатывает нетранзакционные запросы от клиентов, перенаправляет транзакционные запросы на ведущий сервер, участвует в голосовании транзакционных запросов за предложение и участвует в голосовании на выборах лидера.
3. Наблюдатель: обрабатывает нетранзакционные запросы от клиентов, перенаправляет запросы транзакций на ведущий сервер и не участвует ни в каком голосовании.Эта роль обычно существует для повышения производительности чтения.
Состояние нод в ZK кластере
Узлы в кластере ZK имеют следующие состояния:
1. ИЩУ: Ищем статус лидера.Когда сервер в этом статусе, это значит, что лидера нет, и нужно войти в процесс выборов;
2. FOLLOWING: Статус подписчика, указывающий, что текущая роль сервера — подписчик.
3. НАБЛЮДАТЕЛЬ: Статус наблюдателя, указывающий, что текущий сервер является наблюдателем.
4. ВЕДУЩИЙ: Статус лидера, указывающий, что текущая роль сервера — Лидер.
Связь между узлами в кластере ZK
Узлы в кластере ZK взаимодействуют на основе протокола TCP.Чтобы избежать многократного создания TCP-соединения между двумя узлами, ZK устанавливает соединение в соответствии со значением myid, то есть узел с маленьким myid инициирует соединение с узел с большим myid, например, myid равен 1 узлу до Узел с myid 2 инициирует соединение. Между узлами настраиваются два коммуникационных порта, например, в элементе конфигурации server.1=localhost:2888:3888 первый порт 2888 — это порт для связи и синхронизации данных, а второй порт 3888 — это порт для голосования.
Алгоритм выбора лидера в ЗК
Начиная с версии 3.4.0, zk поддерживает только алгоритм выборов org.apache.zookeeper.server.quorum.FastLeaderElection, основанный на протоколе TCP.
Триггерное время выборов лидера в Zk
Выборы лидера запускаются в следующих двух ситуациях:
1. Запуск кластера: когда кластер только запущен, нода находится в состоянии LOOKING, лидера нет, ему нужно войти в процесс выборов.
2. Восстановление после сбоя: например, лидер не работает или пульсация между более чем половиной узлов и лидером прерывается по сетевым причинам.
Факторы, влияющие на то, чтобы узлы стали лидерами
ZK использует три фактора, чтобы определить, может ли узел стать лидером:
1. Свежесть данных: Только последний узел данных имеет возможность стать лидером.В zk размер id транзакции (zxid) указывает на новизну данных.Чем больше данных, тем новее данные.
2. myid: при запуске кластера файл myid будет настроен в каталоге данных.Число в нем представляет собой номер текущего узла zk.Когда данные узла zk совпадают, большее число в myid будет избран лидером, когда кластер
При наличии лидера вновь добавленный узел не повлияет на исходный кластер.
3. Количество голосов: Только если вы наберете наибольшее количество голосов в кластере, вы сможете стать Лидером, большинство из которых (n/2+1), где n — количество узлов в кластере.
Процесс избрания лидера
Для простоты описания возьмем в качестве примера zk-кластер с тремя узлами, мииды, соответствующие трем узлам, равны 1, 2 и 3. Проанализируем выборы лидера при начальном запуске и выборы нового лидера после лидер не работает во время рабочего процесса.
Выборы лидера при первоначальном запуске
Три узла ZK не имеют данных при первом запуске.Процесс выбора лидера выглядит следующим образом:
1. Первый шаг: запустить ноду, у которой myid равен 1. В это время zxid равен 0, и нода Лидер не может быть выбрана.
2. Шаг 2: Запустите узел, у которого myid равен 2, его zxid также равен 0, но его myid равен 2, что больше, поэтому второй узел становится ведущим узлом.
3. Шаг 3: Запустите узел, чей myid равен 3. Поскольку ведущий узел уже существует, узел 2 остается ведущим после того, как 3 присоединяется к кластеру.
Выбор нового лидера после того, как лидер вышел из строя во время работы
Предположим, что server2 является главным узлом, а сервер server2 не работает, оставляя серверы server1 и server3 для выбора лидера. Процесс выборов выглядит следующим образом:
1. Изменить статус: после выхода из строя лидера статус остальных узлов становится ИЩЕТ.
2. Создание информации о голосовании: каждый сервер выдает голосование за свое собственное голосование, предполагая, что сгенерированная информация о голосовании имеет форму (myid, zxid), информация о голосовании сервера 1 — (1,123) и информация о голосовании отправляется на сервер3,
Информация о голосовании сервера 3 (3, 122), а мысли о голосовании отправляются на сервер 1.
3. Обработка голосования: server3 получает информацию о голосовании (1, 123) от server1 и обнаруживает, что zxid123 голосования больше, чем его собственный 122, затем server3 изменяет свою информацию о голосовании на (1, 123) и затем отправляет ее. на сервер1.
4. Обработка голосования: сервер 1 получает информацию о голосовании (3, 122) сервера 3 и обнаруживает, что zxid122 сервера 3 меньше, чем его собственный 123, поэтому он не меняет свой голос.
5. Статистическая информация о голосовании: сервер 3 подсчитывает полученные голоса (включая собственные голоса), (1 123) — два голоса, сервер 1 подсчитывает полученные голоса (включая собственные голоса), (1 123) — два голоса.
6. Измените состояние сервера: лидер, выбранный сервером 3, имеет номер 1, а сам он равен 3, поэтому он переходит в состояние СЛЕДУЮЩИЙ, то есть в роль Последователя. Лидер, выбранный server1, равен 1, и сам он равен 1, поэтому он переходит в состояние LEADING.
То есть роль Лидера.
Синхронизация данных ЗК
Когда лидер завершает выборы, ведомому необходимо синхронизировать данные с новым лидером.Лидеру необходимо выполнить следующую работу:
1. Лидер сообщает другим Последователям последние данные, то есть zxid.Лидер создаст пакет NEWLEADER, включая текущий наибольший zxid, и отправит его всем Последователям или Наблюдателям.
2. Ведущий создает для каждого Последователя LearnerHandler округа для обработки запроса синхронизации данных каждого Последователя, в то же время основной поток начинает блокироваться, и только более половины Последователей завершают синхронизацию.
Процесс синхронизации завершен, и лидер может стать настоящим лидером.
3. Лидер выполняет операцию синхронизации в соответствии с алгоритмом синхронизации.
На стороне ведомого будут выполнены следующие работы:
1. После завершения выборов попытаться установить синхронное соединение с Лидером, при отсутствии соединения в течение некоторого времени будет сообщено об ошибке тайм-аута и будет возвращено состояние выборов.
2. Отправьте лидеру пакет FOLLOWERINFO и принесите свой самый большой zxid
3. Операция синхронизации выполняется в соответствии с алгоритмом синхронизации.
Конкретный используемый алгоритм синхронизации зависит от текущего самого большого zxid последователя.Лидер будет поддерживать два zxid, минимальный идентификатор транзакции: minCommittedLog и максимальный идентификатор транзакции: maxCommittedLog.
minCommittedLog — это первый файл журнала, который не сохраняется моментальным снимком (файл журнала транзакций будет создаваться повторно после сохранения каждого моментального снимка), а maxCommittedLog — это самый большой журнал в журнале транзакций.
В zk реализованы следующие алгоритмы синхронизации данных:
1. Прямая дифференциальная синхронизация (DIFF-синхронизация)
2. Только синхронизация отката (TRUNC), то есть удаление избыточных журналов транзакций.Например, после выхода из строя исходного узла-лидера, а затем повторного присоединения, могут быть данные, которые он записал и отправил, но другие узлы не успели отправить .
3. Сначала выполнить откат (TRUNC), а затем дифференцировать (DIFF) синхронизацию.
4. Полная синхронизация (SNAP).
Процесс трансляции ZK
Когда zk выборы кластера, и может начать предоставлять услуги после окончания синхронизации данных, получает запросы на чтение и запись, когда Лидер, получающий клиент, запрашивает новую транзакцию, транзакция будет транслировать запрос в кластер Последователь, широковещательный процесс следующее:
1. Лидер сначала генерирует соответствующее предложение модификации транзакции в соответствии с запросом транзакции клиента и отправляет предложение модификации данных всем подписчикам в соответствии с порядком zxid (получено несколько запросов транзакции клиента).
2. Когда Последователь получает предложения модификации данных лидера, он будет обрабатывать эти предложения в соответствии с порядком полученных ограничений продаж, то есть, если получены три предложения модификации данных 1, 2 и 3, если обработано третье, он представляет 1, 2 должны быть успешно обработаны.
3. После того, как Лидер получает правильный отзыв (подтверждение) от Последователя более чем для половины определенного предложения по модификации данных, он инициирует отправку предложения по модификации транзакции, то есть повторно инициирует предложение по отправке транзакции.
4. После того, как Подписчик получает предложение об отправке транзакции, он записывает отправку транзакции и обновляет данные в базе данных в памяти.
публика
Подпишитесь на официальный аккаунт, чтобы публиковать больше интересных статей