В этой статье будут проанализированы основные принципы механизма Zookeeper Watch на уровне исходного кода и объяснено максимально подробно.Если есть какая-либо ошибка или неясное место, пожалуйста, поправьте меня, спасибо!
ZK в этой статье относится только к Zookeeper.
Мы знаем, что временные узлы Zookeeper можно использовать для реализации распределенных блокировок. Несколько клиентов создают узел каждый. Если создание прошло успешно, блокировка успешно получена. Клиенты, которым не удалось создать, будут прослушивать этот временный узел, а клиент, который получает блокировка снимет блокировку.После блокировки(удаления временной ноды) или отключения от сервера ZK(ZK удалит временную ноду) другие клиенты получат уведомление от Watch.Братья, она сняла блокировку.Приходи и хватай замок.
Выше приведена самая примитивная распределенная блокировка (не используйте ее в производственной среде, т.к.шокирующий стадный эффект), мы видим, что здесь используется механизм Watch для реализации уведомления клиенту.
Эффект шокирующего стада: когда многие процессы ожидают события, эти процессы пробуждаются после того, как событие происходит, но только один процесс может получить право на выполнение ЦП, а другие процессы должны быть заблокированы, что приводит к серьезным затратам на переключение системного контекста. (отВикипедия)
В некоторых распределенных сценариях нам обычно нужна только определенная машина для участия в распределенной работе (фактически это создание временной ноды к серверу ZK. Этот процесс аналогичен описанному выше упрощенному варианту распределенной блокировки. машина успешно создана. После того, как временный узел выбран в качестве рабочей машины, если другие узлы не могут быть созданы, узел отслеживается). создайте временные узлы, и только одна машина может быть создана.Если это удастся, другие машины будут снова слушать этот узел, и машина, которая успешно создала узел, будет предоставлять внешние услуги вместо машины, которая ранее была отключена.
Это пример использования ZK для решения одной проблемы в распределенном сценарии.
Из предыдущего описания нетрудно увидеть, что механизм Watch в ZK является относительно базовой и широко используемой функцией, поэтому давайте проанализируем, как механизм Watch реализован внутри ZK.
Как реализован механизм Watch?
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.1</version>
</dependency>
Во-первых, вам нужно ввести зависимости pom (собственный клиентский API ZK)
Как пользоваться часовым механизмом
ZooKeeper
класс для установления соединения
В этом разделе описывается процесс установления соединения между классом ZooKeeper и сервером ZK, который будет включать инициализацию объекта Watch Конкретные основные принципы будут объяснены в следующих главах.
Клиенты ZooKeeper могут подписаться, когда сервер работает через механизм наблюдения.Данные или состояние узла изменяютсяполучить соответствующее уведомление
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException {
this(connectString, sessionTimeout, watcher, false);
}
- connectString : адрес сервера ZK
- sessionTimeout : время ожидания соединения
- наблюдатель: отслеживать события
этоWatcher
Будет использоваться в качестве контекста для всего сеанса ZooKeeper, хранящегося на стороне клиента.ZKWatchManager
изdefaultWatcher
середина.
protected final ZKWatchManager watchManager;
private final ZKClientConfig clientConfig;
protected final HostProvider hostProvider;
protected final ClientCnxn cnxn;
public ZooKeeper(
String connectString,
int sessionTimeout,
Watcher watcher,
boolean canBeReadOnly,
HostProvider aHostProvider,
ZKClientConfig clientConfig) throws IOException {
...
if (clientConfig == null) {
// 1. 提供一下默认配置
clientConfig = new ZKClientConfig();
}
this.clientConfig = clientConfig;
// 2. 根据ZKClientConfig创建一个ZKWatchManager对象
watchManager = defaultWatchManager();
watchManager.defaultWatcher = watcher;
// 3. 创建ConnectStringParser对象
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
hostProvider = aHostProvider;
// 4. 建立连接
cnxn = createConnection(
connectStringParser.getChrootPath(),
hostProvider,
sessionTimeout,
this,
watchManager,
getClientCnxnSocket(),
canBeReadOnly);
cnxn.start();
}
ПостроитьZKClientConfig
public ZKClientConfig() {
super();
initFromJavaSystemProperties();
}
private final Map<String, String> properties = new HashMap<String, String>();
public static final String ZOOKEEPER_REQUEST_TIMEOUT = "zookeeper.request.timeout";
public static final String ZOOKEEPER_SERVER_PRINCIPAL = "zookeeper.server.principal";
// ZKConfig
protected void handleBackwardCompatibility() {
properties.put(JUTE_MAXBUFFER, System.getProperty(JUTE_MAXBUFFER));
properties.put(KINIT_COMMAND, System.getProperty(KINIT_COMMAND));
properties.put(JGSS_NATIVE, System.getProperty(JGSS_NATIVE));
try (ClientX509Util clientX509Util = new ClientX509Util()) {
putSSLProperties(clientX509Util);
properties.put(clientX509Util.getSslAuthProviderProperty(), System.getProperty(clientX509Util.getSslAuthProviderProperty()));
}
try (X509Util x509Util = new QuorumX509Util()) {
putSSLProperties(x509Util);
}
}
/**
* Initialize all the ZooKeeper client properties which are configurable as
* java system property
*/
private void initFromJavaSystemProperties() {
setProperty(ZOOKEEPER_REQUEST_TIMEOUT, System.getProperty(ZOOKEEPER_REQUEST_TIMEOUT));
setProperty(ZOOKEEPER_SERVER_PRINCIPAL,System.getProperty(ZOOKEEPER_SERVER_PRINCIPAL));
}
public void setProperty(String key, String value) {
if (null == key) {
throw new IllegalArgumentException("property key is null.");
}
String oldValue = properties.put(key, value);
if (null != oldValue && !oldValue.equals(value)) {
LOG.debug("key {}'s value {} is replaced with new value {}", key, oldValue, value);
}
}
сборка по умолчаниюZKClientConfig
-
ZKConfig#handleBackwardCompatibility
-
private final Map<String, String> properties = new HashMap<String, String>();
public static final String JUTE_MAXBUFFER = "jute.maxbuffer"; public static final String KINIT_COMMAND = "zookeeper.kinit"; public static final String JGSS_NATIVE = "sun.security.jgss.native"; ... properties.put(JUTE_MAXBUFFER, System.getProperty(JUTE_MAXBUFFER)); properties.put(KINIT_COMMAND, System.getProperty(KINIT_COMMAND)); properties.put(JGSS_NATIVE, System.getProperty(JGSS_NATIVE)); ...
установить свойства
-
-
ZKClientConfig#initFromJavaSystemProperties
public static final String ZOOKEEPER_REQUEST_TIMEOUT = "zookeeper.request.timeout"; public static final String ZOOKEEPER_SERVER_PRINCIPAL = "zookeeper.server.principal"; setProperty(ZOOKEEPER_REQUEST_TIMEOUT, System.getProperty(ZOOKEEPER_REQUEST_TIMEOUT)); setProperty(ZOOKEEPER_SERVER_PRINCIPAL, System.getProperty(ZOOKEEPER_SERVER_PRINCIPAL));
СоздайтеZKWatchManager
watchManager = defaultWatchManager();
// 设置defaultWatcher
watchManager.defaultWatcher = watcher;
static class ZKWatchManager implements ClientWatchManager {
private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> persistentWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> persistentRecursiveWatches = new HashMap<String, Set<Watcher>>();
private boolean disableAutoWatchReset;
ZKWatchManager(boolean disableAutoWatchReset) {
this.disableAutoWatchReset = disableAutoWatchReset;
}
protected volatile Watcher defaultWatcher;
...
}
ConnectStringParser
путем разделения клиента
connectString
для разрешения хоста и порта и поддерживает литералы IPv6
private static final int DEFAULT_PORT = 2181;
private final String chrootPath;
private final ArrayList<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>();
public ConnectStringParser(String connectString) {
// parse out chroot, if any
int off = connectString.indexOf('/');
if (off >= 0) {
String chrootPath = connectString.substring(off);
// ignore "/" chroot spec, same as null
if (chrootPath.length() == 1) {
this.chrootPath = null;
} else {
PathUtils.validatePath(chrootPath);
this.chrootPath = chrootPath;
}
connectString = connectString.substring(0, off);
} else {
this.chrootPath = null;
}
List<String> hostsList = split(connectString, ",");
for (String host : hostsList) {
int port = DEFAULT_PORT;
try {
String[] hostAndPort = ConfigUtils.getHostAndPort(host);
host = hostAndPort[0];
if (hostAndPort.length == 2) {
port = Integer.parseInt(hostAndPort[1]);
}
} catch (ConfigException e) {
e.printStackTrace();
}
serverAddresses.add(InetSocketAddress.createUnresolved(host, port));
}
}
ClientCnxn
Этот класс управляет вводом-выводом сокетов для клиентов. ClientCnxn поддерживает список доступных серверов, к которым он может подключиться, и «прозрачно» меняет серверы, к которым он подключается, по мере необходимости. Грубо говоря, этот класс управляет соединением между клиентом и сервером ZK.
public ClientCnxn(
String chrootPath,
HostProvider hostProvider,
int sessionTimeout,
ZooKeeper zooKeeper,
ClientWatchManager watcher,
ClientCnxnSocket clientCnxnSocket,
long sessionId,
byte[] sessionPasswd,
boolean canBeReadOnly) {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;
connectTimeout = sessionTimeout / hostProvider.size();
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;
sendThread = new SendThread(clientCnxnSocket);
eventThread = new EventThread();
this.clientConfig = zooKeeper.getClientConfig();
initRequestTimeout();
}
Инициализируйте некоторые свойства. sendThread/eventThread важнее
Установить два потока как потоки демона
SendThread(ClientCnxnSocket clientCnxnSocket) {
super(makeThreadName("-SendThread()"));
state = States.CONNECTING;
this.clientCnxnSocket = clientCnxnSocket;
setDaemon(true);
}
EventThread() {
super(makeThreadName("-EventThread"));
setDaemon(true);
}
final SendThread sendThread;
final EventThread eventThread;
public void start() {
sendThread.start();
eventThread.start();
}
Запустите поток отправки и поток событий.
Мы видели
ZooKeeper
Установление соединения между классом и сервером ZK фактически сначала подготавливает параметры, а затем запускает два потока, поэтому установление соединенияасинхронныйПуть.
путь API
В дополнение к этому, клиенты ZooKeeper также могутgetData
,exists
а такжеgetChildren
Три интерфейса для регистрации Watchers на сервере ZooKeeper, упрощающие добавление событий Watch в различных ситуациях:
/**
* The asynchronous version of getData.
*
* @see #getData(String, boolean, Stat)
*/
public void getData(String path, boolean watch, DataCallback cb, Object ctx) {
getData(path, watch ? watchManager.defaultWatcher : null, cb, ctx);
}
public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException {
return exists(path, watch ? watchManager.defaultWatcher : null);
}
public List<String> getChildren(String path, boolean watch) throws KeeperException, InterruptedException {
return getChildren(path, watch ? watchManager.defaultWatcher : null);
}
статус и события
interface Event {
/**
* Enumeration of states the ZooKeeper may be at the event
*/
@InterfaceAudience.Public
enum KeeperState {
@Deprecated
Unknown(-1),
Disconnected(0),
@Deprecated
NoSyncConnected(1),
SyncConnected(3),
AuthFailed(4),
ConnectedReadOnly(5),
SaslAuthenticated(6),
Expired(-112),
Closed(7);
}
...
}
-
Disconnected
EventType.None
-
SyncConnected
EventType.NodeCreated
EventType.NodeDeleted
EventType.NodeDataChanged
EventType.NodeChildrenChanged
-
AuthFailed
EventType.None
-
Expired
EventType.None
В разных состояниях сеанса клиента соответствующие типы событий поддерживаются узлом сервера. Например, когда клиент подключается к серверу, он можетСоздание узла данных, удаление, изменение данных и обновление дочерних узловПодождите, пока операция будет проверена.
Предыдущая статья представляет собой описание механизма наблюдения ZK на прикладном уровне (API), а затем давайте рассмотрим основные принципы механизма наблюдения.
Основной принцип часового механизма
Механизм наблюдения очень похож на «шаблон наблюдателя» в шаблоне проектирования: в этом шаблоне целевой объект управляет всеми объектами-наблюдателями, которые от него зависят, и активно отправляет уведомления при изменении своего собственного состояния. Обычно это достигается путем вызова методов, предоставляемых каждым наблюдателем. Этот шаблон часто используется в системах обработки событий в реальном времени.
Мы можем думать о механизме наблюдения как о «шаблоне наблюдателя» в распределенных сценариях.
Обычно, когда мы реализуем шаблон наблюдателя, ядром или ключевым кодом является создание списка для хранения наблюдателей.
В ZooKeeper реализованы два списка наблюдателей хранилища на стороне клиента и на стороне сервера соответственно, а именно:ZKWatchManager
а такжеWatchManager
.
Процесс внедрения регистрации Client Watch
Возьмем на примере метод getData, чтобы понять принцип регистрации клиента Watch.
ZooKeeper#getData(String path, Watcher watcher, Stat stat)
При отправке запроса с Watch-событием клиент сначала помечает сессию как событие-запрос с Watch-мониторингом, а затем передает
DataWatchRegistration
класс для сохраненияСоответствие между событиями наблюдателя и узлами
public byte[] getData(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException {
final String clientPath = path;
PathUtils.validatePath(clientPath);
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
// 创建DataWatchRegistration对象
wcb = new DataWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getData);
GetDataRequest request = new GetDataRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
GetDataResponse response = new GetDataResponse();
// 发送请求
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath);
}
if (stat != null) {
DataTree.copyStat(response.getStat(), stat);
}
return response.getData();
}
Когда клиент отправляет запрос на сервер, он инкапсулирует запрос какPacket
объект и добавить его в очередь, ожидающую отправкиoutgoingQueue
public ReplyHeader submitRequest(
RequestHeader h,
Record request,
Record response,
WatchRegistration watchRegistration,
WatchDeregistration watchDeregistration) throws InterruptedException {
ReplyHeader r = new ReplyHeader();
Packet packet = queuePacket(
h,
r,
request,
response,
null,
null,
null,
null,
watchRegistration,
watchDeregistration);
synchronized (packet) {
if (requestTimeout > 0) {
// Wait for request completion with timeout
waitForPacketFinish(r, packet);
} else {
// Wait for request completion infinitely
while (!packet.finished) {
packet.wait();
}
}
}
if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) {
sendThread.cleanAndNotifyState();
}
return r;
}
ClientCnxn#queuePacket(...)
public Packet queuePacket(...){
Packet packet = null;
packet = new Packet(h, r, request, response, watchRegistration);
...
packet.watchDeregistration = watchDeregistration;
...
outgoingQueue.add(packet);
...
sendThread.getClientCnxnSocket().packetAdded();
}
Наконец, клиент ZooKeeper отправит запрос (Java NIO/Netty) на сервер после отправки запроса. Вызовите метод readResponse в классе потока SendThread, отвечающий за обработку ответа сервера.Получить обратный вызов от сервера, и выполнить в концеfinishPacket()
способ зарегистрировать часы наZKWatchManager
середина.
Нижний уровень клиента ZooKeeper использует NIO и Netty для Java для отправки запросов (детали отправки запросов не объясняются, читатели могут обратиться к коду)
SendThread#readResponse
// org.apache.zookeeper.ClientCnxn.SendThread#readResponse
void readResponse(ByteBuffer incomingBuffer) throws IOException {
...
finishPacket(packet);
}
ClientCnxn#finishPacket
protected void finishPacket(Packet p) {
int err = p.replyHeader.getErr();
if (p.watchRegistration != null) {
p.watchRegistration.register(err);
}
...
}
ZooKeeper.WatchRegistration#register
public void register(int rc) {
if (shouldAddWatch(rc)) {
// 获取Watcher Map
Map<String, Set<Watcher>> watches = getWatches(rc);
synchronized (watches) {
Set<Watcher> watchers = watches.get(clientPath);
if (watchers == null) {
watchers = new HashSet<Watcher>();
watches.put(clientPath, watchers);
}
// 注册Watch
watchers.add(watcher);
}
}
}
DataWatchRegistration#getWatches
class DataWatchRegistration extends WatchRegistration {
public DataWatchRegistration(Watcher watcher, String clientPath) {
super(watcher, clientPath);
}
@Override
protected Map<String, Set<Watcher>> getWatches(int rc) {
return watchManager.dataWatches;
}
}
watchManagerОтZookeeper
Добрый
protected final ZKWatchManager watchManager;
Давайте посмотримZKWatchManager
Этот класс содержит свойства карты:
static class ZKWatchManager implements ClientWatchManager {
// 获取的Map
private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> persistentWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> persistentRecursiveWatches = new HashMap<String, Set<Watcher>>();
private boolean disableAutoWatchReset;
...
}
Эти Карты содержат различные категории информации Watch: dataWatches, existsWatches, childWatches и так далее.
Суммировать
-
Когда клиент отправляет запрос, он сначала инкапсулирует отправленную информацию в
Packet
объект -
При отправке будет использоваться метод Java NIO/Netty.Если Watch зарегистрирован, информация Watch будет отправлена на сервер.
-
После завершения отправки клиент получает ее через метод обратного вызова
ZKWatchManager
В соответствующей карте наблюдения зарегистрируйте соответствующее наблюдение на карте.Обратите внимание, что процесс getWatch не заблокирован, зарегистрируйтесь
Watch
пройти, когдаsynchronized
ключевое слово, чтобы гарантировать атомарность. Потому что только операция записи может вызвать проблемы безопасности потоков.
Процесс реализации регистрации наблюдения на стороне сервера
-
Когда сервер ZooKeeper получает запрос клиента, он сначала анализирует запрос, чтобы определить, содержит ли запрос событие наблюдения, и если он содержит событие наблюдения, он сохраняет событие наблюдения в
WatchManager
серединаВнизу ZooKeeper сквозной
FinalRequestProcessor#processRequest
метод достиженияfor (Op readOp : multiReadRecord) { try { Record rec; switch (readOp.getType()) { case OpCode.getChildren: rec = handleGetChildrenRequest(readOp.toRequestRecord(), cnxn, request.authInfo); subResult = new GetChildrenResult(((GetChildrenResponse) rec).getChildren()); break; case OpCode.getData: // 处理请求 rec = handleGetDataRequest(readOp.toRequestRecord(), cnxn, request.authInfo); GetDataResponse gdr = (GetDataResponse) rec; subResult = new GetDataResult(gdr.getData(), gdr.getStat()); break; default: throw new IOException("Invalid type of readOp"); } } catch (KeeperException e) { subResult = new ErrorResult(e.code().intValue()); } ((MultiResponse) rsp).add(subResult); } break; ...
FinalRequestProcessor#handleGetDataRequest
private Record handleGetDataRequest(Record request, ServerCnxn cnxn, List<Id> authInfo) throws KeeperException, IOException {
GetDataRequest getDataRequest = (GetDataRequest) request;
// 获取path
String path = getDataRequest.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
zks.checkACL(cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, authInfo, path, null);
Stat stat = new Stat();
byte[] b = zks.getZKDatabase().getData(path, stat, getDataRequest.getWatch() ? cnxn : null);
return new GetDataResponse(b, stat);
}
getDataRequest.getWatch()
Когда он возвращает true, это означает, что запрос должен быть зарегистрирован для мониторинга Watch и переданzks.getZKDatabase().getData
Регистрация событий Watch на сервереWatchManager
середина.мы можем поставить
ServerCnxn
Думайте об этом как о соединении клиента с сервером, я понимаю это как событие Watch клиента.
public abstract class ServerCnxn implements Stats, Watcher {
ZKDatabase#getData
public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException {
return dataTree.getData(path, stat, watcher);
}
// DataTree#getData
private IWatchManager dataWatches;
public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException {
DataNode n = nodes.get(path);
byte[] data = null;
if (n == null) {
throw new KeeperException.NoNodeException();
}
synchronized (n) {
n.copyStat(stat);
if (watcher != null) {
// 注册Watch
dataWatches.addWatch(path, watcher);
}
data = n.data;
}
updateReadStat(path, data == null ? 0 : data.length);
return data;
}
Время запуска события наблюдения за сервером
выполнение на стороне сервераsetData
После того, как метод изменит данные узла, он вызоветWatchManager.triggerWatch
триггер методасобытие изменения данных.
public Stat setData(String path, byte data[], ...){
Stat s = new Stat();
DataNode n = nodes.get(path);
...
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
return s;
}
WatchManager#triggerWatch
Во-первых, он инкапсулирует объект WatchedEvent с тремя атрибутами: состоянием сеанса, типом события и узлом данных.
Затем запросите зарегистрированное узлом событие Watch.Если оно пустое, это означает, что узел не зарегистрировал событие Watch. Если есть событие Watch, оно добавляется в определенную коллекцию Watchers и удаляется в управлении WatchManager.
Наконец, уведомление отправляется клиенту путем вызова метода процесса.
public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) {
// 封装WatchedEvent对象 type:事件类型
// KeeperState.SyncConnected: 会话状态 path:数据节点
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
Set<Watcher> watchers = new HashSet<>();
PathParentIterator pathParentIterator = getPathParentIterator(path);
synchronized (this) {
for (String localPath : pathParentIterator.asIterable()) {
// 获取Watchers Set
Set<Watcher> thisWatchers = watchTable.get(localPath);
if (thisWatchers == null || thisWatchers.isEmpty()) {
continue;
}
// 遍历
Iterator<Watcher> iterator = thisWatchers.iterator();
while (iterator.hasNext()) {
Watcher watcher = iterator.next();
WatcherMode watcherMode = watcherModeManager.getWatcherMode(watcher, localPath);
if (watcherMode.isRecursive()) {
if (type != EventType.NodeChildrenChanged) {
watchers.add(watcher);
}
} else if (!pathParentIterator.atParentPath()) {
watchers.add(watcher);
if (!watcherMode.isPersistent()) {
iterator.remove();
Set<String> paths = watch2Paths.get(watcher);
if (paths != null) {
paths.remove(localPath);
}
}
}
}
if (thisWatchers.isEmpty()) {
watchTable.remove(localPath);
}
}
}
if (watchers.isEmpty()) {
...
return null;
}
// 遍历添加的Watcher
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
// 向客户端发送通知
w.process(e);
}
...
return new WatcherOrBitSet(watchers);
}
Процесс обратного звонка клиента
Использование клиентаSendThread.readResponse()
метод для единообразной обработки ответов на стороне сервера.
- Сначала сервер десериализации отправляет информацию заголовка запроса responseHdr.deserialize(bbia, "header") и определяет, что значение соответствующего атрибутивного поля xid равно -1, указывая, что ответ на запрос относится к типу уведомления. При обработке типа уведомления сначала преобразуйте полученный поток байтовдесериализоватьПеревести в
WatcherEvent
объект. - Затем оцените, настроен ли клиент с атрибутом chrootPath, если он
true
Указывает, что клиент настроил атрибут chrootPath. Полученный путь узла должен быть обработан chrootPath. - последний звонок
eventThread.queueEvent()
Метод передает полученное событие вEventThread
нить для обработки.
if (replyHdr.getXid() == -1) {
...
WatcherEvent event = new WatcherEvent();
// 反序列化服务器发送请求头信
event.deserialize(bbia, "response");
...
if (chrootPath != null) {
String serverPath = event.getPath();
if(serverPath.compareTo(chrootPath)==0)
event.setPath("/");
...
event.setPath(serverPath.substring(chrootPath.length()));
...
}
WatchedEvent we = new WatchedEvent(event);
...
eventThread.queueEvent( we );
}
нам просто нужно обратить внимание
eventThread.queueEvent()
метод.
EventThread.queueEvent
private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {
if (event.getType() == EventType.None && sessionState == event.getState()) {
return;
}
sessionState = event.getState();
// 获取注册过的Watcher事件
final Set<Watcher> watchers;
if (materializedWatchers == null) {
// materialize the watchers based on the event
watchers = watcher.materialize(event.getState(), event.getType(), event.getPath());
} else {
watchers = new HashSet<Watcher>();
watchers.addAll(materializedWatchers);
}
WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
// queue the pair (watch set & event) for later processing
// 将Watcher事件添加到waitingEvents中
waitingEvents.add(pair);
}
ZooKeeper.ZKWatchManager#materialize
public Set<Watcher> materialize(...)
{
Set<Watcher> result = new HashSet<Watcher>();
...
// type: 事件类型
switch (type) {
...
case NodeDataChanged:
case NodeCreated:
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);
}
synchronized (existWatches) {
addTo(existWatches.remove(clientPath), result);
}
break;
....
}
return result;
}
Шаг 1 В зависимости от типа события уведомления, отZKWatchManager
Запрос зарегистрированного клиента Смотреть информацию в формате . После того, как клиент найдет соответствующую информацию Watch, она будет удалена из управления ZKWatchManager.
Механизм Watcher клиента является одноразовым и будет удален после его срабатывания.
class EventThread extends ZooKeeperThread {
private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue<Object>();
...
waitingEvents
даEventThread
Блокирующая очередь вZKWatchManager
Запрос зарегистрированного клиента Информация о просмотре будет добавлена вwaitingEvents
середина.
а такжеEventThread
Класс метода будет запускать цикл oksevents, взятых в очереди события, ожидая сторонника обработки.
Это можно понимать как непрерывное получение обработки события Watcher для обработки в цикле.
public void run() {
try {
isRunning = true;
// 无限循环
while (true) {
// 取出事件
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
// 处理事件
processEvent(event);
}
if (wasKilled)
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
...
}
**processEvent(event) **
private void processEvent(Object event) {
...
if (event instanceof WatcherSetEventPair) {
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {
// 回调事件监听的处理
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
}
}
}
Суммировать
ZooKeeper реализует механизм Watch, создавая список информации с наблюдателями через клиент и сервер соответственно.
- Когда клиент вызывает такие интерфейсы, как getData и exists, он сначала помещает соответствующее событие Watch в локальный
ZKWatchManager
в управлении.- После получения запроса от клиента сервер оценивает, содержит ли он
Watch
событие и поместите соответствующее событие вWatchManager
в управлении.
существуеттриггер событияКогда сервер запрашивает соответствующее уведомление о событии наблюдения у клиента через информацию о пути узла, после получения уведомления клиент сначала запрашивает локальный ZKWatchManager для получения соответствующей операции обратного вызова обработки информации наблюдения.
Этот дизайн не только реализует режим наблюдателя в распределенной среде, но также сохраняет дополнительную информацию, необходимую клиенту и серверу для обработки событий наблюдения на обоих концах, уменьшая содержание связи друг с другом и значительно повышая производительность службы. .
Применение часового механизма
Центр конфигурации
Мы можем хранить такую информацию, как элементы конфигурации базы данных, в узлах данных ZooKeeper. Клиент кластера серверов добавляет к узлу мониторинг событий Watch.Когда служба в кластере запускается, она считывает данные узла, чтобы получить информацию о конфигурации данных. И когда данные узла меняются,Сервер ZooKeeper будет отправлять события Watch каждому клиенту (push), после того как клиент в кластере получит уведомление,Перечитать информацию о конфигурации базы данных узла (pull).
ZooKeeper реализует механизм двухтактной комбинации.
Регистрационный центр
ZooKeeper часто используется в качестве центра регистрации (ЦП) в проекте Dubbo, по принципу аналогичен центру конфигурации, что резко контрастирует с дизайном ЦД Eureka.