Понимание механизма Zookeeper's Watch

ZooKeeper
Понимание механизма Zookeeper's Watch

В этой статье будут проанализированы основные принципы механизма Zookeeper Watch на уровне исходного кода и объяснено максимально подробно.Если есть какая-либо ошибка или неясное место, пожалуйста, поправьте меня, спасибо!

ZK в этой статье относится только к Zookeeper.

Мы знаем, что временные узлы Zookeeper можно использовать для реализации распределенных блокировок. Несколько клиентов создают узел каждый. Если создание прошло успешно, блокировка успешно получена. Клиенты, которым не удалось создать, будут прослушивать этот временный узел, а клиент, который получает блокировка снимет блокировку.После блокировки(удаления временной ноды) или отключения от сервера ZK(ZK удалит временную ноду) другие клиенты получат уведомление от Watch.Братья, она сняла блокировку.Приходи и хватай замок.

Выше приведена самая примитивная распределенная блокировка (не используйте ее в производственной среде, т.к.шокирующий стадный эффект), мы видим, что здесь используется механизм Watch для реализации уведомления клиенту.

Эффект шокирующего стада: когда многие процессы ожидают события, эти процессы пробуждаются после того, как событие происходит, но только один процесс может получить право на выполнение ЦП, а другие процессы должны быть заблокированы, что приводит к серьезным затратам на переключение системного контекста. (отВикипедия)

В некоторых распределенных сценариях нам обычно нужна только определенная машина для участия в распределенной работе (фактически это создание временной ноды к серверу ZK. Этот процесс аналогичен описанному выше упрощенному варианту распределенной блокировки. машина успешно создана. После того, как временный узел выбран в качестве рабочей машины, если другие узлы не могут быть созданы, узел отслеживается). создайте временные узлы, и только одна машина может быть создана.Если это удастся, другие машины будут снова слушать этот узел, и машина, которая успешно создала узел, будет предоставлять внешние услуги вместо машины, которая ранее была отключена.

Это пример использования ZK для решения одной проблемы в распределенном сценарии.

Из предыдущего описания нетрудно увидеть, что механизм Watch в ZK является относительно базовой и широко используемой функцией, поэтому давайте проанализируем, как механизм Watch реализован внутри ZK.

Как реализован механизм Watch?

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.6.1</version>
</dependency>

Во-первых, вам нужно ввести зависимости pom (собственный клиентский API ZK)

Как пользоваться часовым механизмом

ZooKeeperкласс для установления соединения

В этом разделе описывается процесс установления соединения между классом ZooKeeper и сервером ZK, который будет включать инициализацию объекта Watch Конкретные основные принципы будут объяснены в следующих главах.

Клиенты ZooKeeper могут подписаться, когда сервер работает через механизм наблюдения.Данные или состояние узла изменяютсяполучить соответствующее уведомление

 public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException {
      this(connectString, sessionTimeout, watcher, false);
 }
  • connectString : адрес сервера ZK
  • sessionTimeout : время ожидания соединения
  • наблюдатель: отслеживать события

этоWatcherБудет использоваться в качестве контекста для всего сеанса ZooKeeper, хранящегося на стороне клиента.ZKWatchManagerизdefaultWatcherсередина.

protected final ZKWatchManager watchManager;
private final ZKClientConfig clientConfig; 
protected final HostProvider hostProvider;

protected final ClientCnxn cnxn;

public ZooKeeper(
        String connectString,
        int sessionTimeout,
        Watcher watcher,
        boolean canBeReadOnly,
        HostProvider aHostProvider,
        ZKClientConfig clientConfig) throws IOException {
        
        ...

        if (clientConfig == null) {
            // 1. 提供一下默认配置
            clientConfig = new ZKClientConfig();
        }
        this.clientConfig = clientConfig;
    	
    	// 2. 根据ZKClientConfig创建一个ZKWatchManager对象
        watchManager = defaultWatchManager();
        watchManager.defaultWatcher = watcher;
    	
        // 3. 创建ConnectStringParser对象
        ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
        hostProvider = aHostProvider;
		
        //  4. 建立连接
        cnxn = createConnection(
            connectStringParser.getChrootPath(),
            hostProvider,
            sessionTimeout,
            this,
            watchManager,
            getClientCnxnSocket(),
            canBeReadOnly);
    	
        cnxn.start();
    }
ПостроитьZKClientConfig
public ZKClientConfig() {
    super();
    initFromJavaSystemProperties();
}
private final Map<String, String> properties = new HashMap<String, String>();

public static final String ZOOKEEPER_REQUEST_TIMEOUT = "zookeeper.request.timeout";
public static final String ZOOKEEPER_SERVER_PRINCIPAL = "zookeeper.server.principal";

// ZKConfig
protected void handleBackwardCompatibility() {
        properties.put(JUTE_MAXBUFFER, System.getProperty(JUTE_MAXBUFFER));
        properties.put(KINIT_COMMAND, System.getProperty(KINIT_COMMAND));
        properties.put(JGSS_NATIVE, System.getProperty(JGSS_NATIVE));

        try (ClientX509Util clientX509Util = new ClientX509Util()) {
            putSSLProperties(clientX509Util);
            properties.put(clientX509Util.getSslAuthProviderProperty(), System.getProperty(clientX509Util.getSslAuthProviderProperty()));
        }

        try (X509Util x509Util = new QuorumX509Util()) {
            putSSLProperties(x509Util);
        }
    }
/**
     * Initialize all the ZooKeeper client properties which are configurable as
     * java system property
     */
private void initFromJavaSystemProperties() {
      setProperty(ZOOKEEPER_REQUEST_TIMEOUT, System.getProperty(ZOOKEEPER_REQUEST_TIMEOUT));
      setProperty(ZOOKEEPER_SERVER_PRINCIPAL,System.getProperty(ZOOKEEPER_SERVER_PRINCIPAL));
}
public void setProperty(String key, String value) {
       if (null == key) {
            throw new IllegalArgumentException("property key is null.");
       }
       String oldValue = properties.put(key, value);
       if (null != oldValue && !oldValue.equals(value)) {
            LOG.debug("key {}'s value {} is replaced with new value {}", key, oldValue, value);
       }
 }

сборка по умолчаниюZKClientConfig

  • ZKConfig#handleBackwardCompatibility

    • private final Map<String, String> properties = new HashMap<String, String>();

      public static final String JUTE_MAXBUFFER = "jute.maxbuffer";
      public static final String KINIT_COMMAND = "zookeeper.kinit";
      public static final String JGSS_NATIVE = "sun.security.jgss.native";
      
      ...
      properties.put(JUTE_MAXBUFFER, System.getProperty(JUTE_MAXBUFFER));
      properties.put(KINIT_COMMAND, System.getProperty(KINIT_COMMAND));
      properties.put(JGSS_NATIVE, System.getProperty(JGSS_NATIVE));
      ...
      

      установить свойства

  • ZKClientConfig#initFromJavaSystemProperties

    public static final String ZOOKEEPER_REQUEST_TIMEOUT = "zookeeper.request.timeout";
    public static final String ZOOKEEPER_SERVER_PRINCIPAL = "zookeeper.server.principal";
    setProperty(ZOOKEEPER_REQUEST_TIMEOUT, System.getProperty(ZOOKEEPER_REQUEST_TIMEOUT));
    setProperty(ZOOKEEPER_SERVER_PRINCIPAL, System.getProperty(ZOOKEEPER_SERVER_PRINCIPAL));
    
СоздайтеZKWatchManager
 watchManager = defaultWatchManager();
 // 设置defaultWatcher
 watchManager.defaultWatcher = watcher;
static class ZKWatchManager implements ClientWatchManager {
       private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();
        private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();
        private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();
        private final Map<String, Set<Watcher>> persistentWatches = new HashMap<String, Set<Watcher>>();
        private final Map<String, Set<Watcher>> persistentRecursiveWatches = new HashMap<String, Set<Watcher>>();
    
        private boolean disableAutoWatchReset;

        ZKWatchManager(boolean disableAutoWatchReset) {
            this.disableAutoWatchReset = disableAutoWatchReset;
        }

        protected volatile Watcher defaultWatcher;
 ...   
}
ConnectStringParser

путем разделения клиентаconnectStringдля разрешения хоста и порта и поддерживает литералы IPv6

private static final int DEFAULT_PORT = 2181;

private final String chrootPath;

private final ArrayList<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>();


public ConnectStringParser(String connectString) {
        // parse out chroot, if any
        int off = connectString.indexOf('/');
        if (off >= 0) {
            String chrootPath = connectString.substring(off);
            // ignore "/" chroot spec, same as null
            if (chrootPath.length() == 1) {
                this.chrootPath = null;
            } else {
                PathUtils.validatePath(chrootPath);
                this.chrootPath = chrootPath;
            }
            connectString = connectString.substring(0, off);
        } else {
            this.chrootPath = null;
        }

        List<String> hostsList = split(connectString, ",");
        for (String host : hostsList) {
            int port = DEFAULT_PORT;
            try {
                String[] hostAndPort = ConfigUtils.getHostAndPort(host);
                host = hostAndPort[0];
                if (hostAndPort.length == 2) {
                    port = Integer.parseInt(hostAndPort[1]);
                }
            } catch (ConfigException e) {
                e.printStackTrace();
            }

            serverAddresses.add(InetSocketAddress.createUnresolved(host, port));
        }
    }
ClientCnxn

Этот класс управляет вводом-выводом сокетов для клиентов. ClientCnxn поддерживает список доступных серверов, к которым он может подключиться, и «прозрачно» меняет серверы, к которым он подключается, по мере необходимости. Грубо говоря, этот класс управляет соединением между клиентом и сервером ZK.

 public ClientCnxn(
        String chrootPath,
        HostProvider hostProvider,
        int sessionTimeout,
        ZooKeeper zooKeeper,
        ClientWatchManager watcher,
        ClientCnxnSocket clientCnxnSocket,
        long sessionId,
        byte[] sessionPasswd,
        boolean canBeReadOnly) {
        this.zooKeeper = zooKeeper;
        this.watcher = watcher;
        this.sessionId = sessionId;
        this.sessionPasswd = sessionPasswd;
        this.sessionTimeout = sessionTimeout;
        this.hostProvider = hostProvider;
        this.chrootPath = chrootPath;

        connectTimeout = sessionTimeout / hostProvider.size();
        readTimeout = sessionTimeout * 2 / 3;
        readOnly = canBeReadOnly;

        sendThread = new SendThread(clientCnxnSocket);
        eventThread = new EventThread();
        this.clientConfig = zooKeeper.getClientConfig();
        initRequestTimeout();
    }

Инициализируйте некоторые свойства. sendThread/eventThread важнее

Установить два потока как потоки демона

 SendThread(ClientCnxnSocket clientCnxnSocket) {
          super(makeThreadName("-SendThread()"));
          state = States.CONNECTING;
          this.clientCnxnSocket = clientCnxnSocket;
          setDaemon(true);
  }
  EventThread() {
          super(makeThreadName("-EventThread"));
          setDaemon(true);
 }
final SendThread sendThread;

final EventThread eventThread; 

public void start() {
    sendThread.start();
    eventThread.start();
 }

Запустите поток отправки и поток событий.

Мы виделиZooKeeperУстановление соединения между классом и сервером ZK фактически сначала подготавливает параметры, а затем запускает два потока, поэтому установление соединенияасинхронныйПуть.

путь API

В дополнение к этому, клиенты ZooKeeper также могутgetData,existsа такжеgetChildrenТри интерфейса для регистрации Watchers на сервере ZooKeeper, упрощающие добавление событий Watch в различных ситуациях:

  /**
     * The asynchronous version of getData.
     *
     * @see #getData(String, boolean, Stat)
     */
   public void getData(String path, boolean watch, DataCallback cb, Object ctx) {
       getData(path, watch ? watchManager.defaultWatcher : null, cb, ctx);
   }

    public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException {
        return exists(path, watch ? watchManager.defaultWatcher : null);
    }

    public List<String> getChildren(String path, boolean watch) throws KeeperException, InterruptedException {
        return getChildren(path, watch ? watchManager.defaultWatcher : null);
    } 

статус и события

interface Event {

        /**
         * Enumeration of states the ZooKeeper may be at the event
         */
        @InterfaceAudience.Public
        enum KeeperState {
          
            @Deprecated
            Unknown(-1),

            Disconnected(0),

            @Deprecated
            NoSyncConnected(1),

            SyncConnected(3),

            AuthFailed(4),

            ConnectedReadOnly(5),

            SaslAuthenticated(6),

            Expired(-112),

            Closed(7);
        }    
  ...
}
  • Disconnected
    • EventType.None
  • SyncConnected
    • EventType.NodeCreated
    • EventType.NodeDeleted
    • EventType.NodeDataChanged
    • EventType.NodeChildrenChanged
  • AuthFailed
    • EventType.None
  • Expired
    • EventType.None

В разных состояниях сеанса клиента соответствующие типы событий поддерживаются узлом сервера. Например, когда клиент подключается к серверу, он можетСоздание узла данных, удаление, изменение данных и обновление дочерних узловПодождите, пока операция будет проверена.

Предыдущая статья представляет собой описание механизма наблюдения ZK на прикладном уровне (API), а затем давайте рассмотрим основные принципы механизма наблюдения.

Основной принцип часового механизма

Механизм наблюдения очень похож на «шаблон наблюдателя» в шаблоне проектирования: в этом шаблоне целевой объект управляет всеми объектами-наблюдателями, которые от него зависят, и активно отправляет уведомления при изменении своего собственного состояния. Обычно это достигается путем вызова методов, предоставляемых каждым наблюдателем. Этот шаблон часто используется в системах обработки событий в реальном времени.

Мы можем думать о механизме наблюдения как о «шаблоне наблюдателя» в распределенных сценариях.

Обычно, когда мы реализуем шаблон наблюдателя, ядром или ключевым кодом является создание списка для хранения наблюдателей. В ZooKeeper реализованы два списка наблюдателей хранилища на стороне клиента и на стороне сервера соответственно, а именно:ZKWatchManagerа такжеWatchManager.

Процесс внедрения регистрации Client Watch

Возьмем на примере метод getData, чтобы понять принцип регистрации клиента Watch.

ZooKeeper#getData(String path, Watcher watcher, Stat stat)

При отправке запроса с Watch-событием клиент сначала помечает сессию как событие-запрос с Watch-мониторингом, а затем передаетDataWatchRegistrationкласс для сохраненияСоответствие между событиями наблюдателя и узлами

public byte[] getData(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException {
        final String clientPath = path;
        PathUtils.validatePath(clientPath);

        // the watch contains the un-chroot path
        WatchRegistration wcb = null;
        if (watcher != null) {
            // 创建DataWatchRegistration对象
            wcb = new DataWatchRegistration(watcher, clientPath);
        }

        final String serverPath = prependChroot(clientPath);

        RequestHeader h = new RequestHeader();
        h.setType(ZooDefs.OpCode.getData);
        GetDataRequest request = new GetDataRequest();
        request.setPath(serverPath);
        request.setWatch(watcher != null);
    
        GetDataResponse response = new GetDataResponse();
        // 发送请求
        ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
        if (r.getErr() != 0) {
            throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath);
        }
        if (stat != null) {
            DataTree.copyStat(response.getStat(), stat);
        }
    	
        return response.getData();
    }

Когда клиент отправляет запрос на сервер, он инкапсулирует запрос какPacketобъект и добавить его в очередь, ожидающую отправкиoutgoingQueue

public ReplyHeader submitRequest(
        RequestHeader h,
        Record request,
        Record response,
        WatchRegistration watchRegistration,
        WatchDeregistration watchDeregistration) throws InterruptedException {
    
        ReplyHeader r = new ReplyHeader();
    	
        
        Packet packet = queuePacket(
            h,
            r,
            request,
            response,
            null,
            null,
            null,
            null,
            watchRegistration,
            watchDeregistration);
    
        synchronized (packet) {
            if (requestTimeout > 0) {
                // Wait for request completion with timeout
                waitForPacketFinish(r, packet);
            } else {
                // Wait for request completion infinitely
                while (!packet.finished) {
                    packet.wait();
                }
            }
        }
        if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) {
            sendThread.cleanAndNotifyState();
        }
        return r;
    }
ClientCnxn#queuePacket(...)
public Packet queuePacket(...){
    Packet packet = null;
    packet = new Packet(h, r, request, response, watchRegistration);
    ...
    packet.watchDeregistration = watchDeregistration;
    ...     
    outgoingQueue.add(packet);
    ...
    sendThread.getClientCnxnSocket().packetAdded();
}

Наконец, клиент ZooKeeper отправит запрос (Java NIO/Netty) на сервер после отправки запроса. Вызовите метод readResponse в классе потока SendThread, отвечающий за обработку ответа сервера.Получить обратный вызов от сервера, и выполнить в концеfinishPacket()способ зарегистрировать часы наZKWatchManagerсередина.

Нижний уровень клиента ZooKeeper использует NIO и Netty для Java для отправки запросов (детали отправки запросов не объясняются, читатели могут обратиться к коду)

SendThread#readResponse
// org.apache.zookeeper.ClientCnxn.SendThread#readResponse
 void readResponse(ByteBuffer incomingBuffer) throws IOException {
     ...
     finishPacket(packet);
 }
ClientCnxn#finishPacket
 protected void finishPacket(Packet p) {
        int err = p.replyHeader.getErr();
        if (p.watchRegistration != null) {
            p.watchRegistration.register(err);
        }
     ...
 }        
ZooKeeper.WatchRegistration#register
public void register(int rc) {
    
    if (shouldAddWatch(rc)) {
        // 获取Watcher Map
        Map<String, Set<Watcher>> watches = getWatches(rc);
        synchronized (watches) {
            Set<Watcher> watchers = watches.get(clientPath);
            if (watchers == null) {
                watchers = new HashSet<Watcher>();
                watches.put(clientPath, watchers);
            }
            // 注册Watch
            watchers.add(watcher);
        }
    }
}

DataWatchRegistration#getWatches

class DataWatchRegistration extends WatchRegistration {

        public DataWatchRegistration(Watcher watcher, String clientPath) {
            super(watcher, clientPath);
        }

        @Override
        protected Map<String, Set<Watcher>> getWatches(int rc) {
            return watchManager.dataWatches;
        }

    }

watchManagerОтZookeeperДобрый

protected final ZKWatchManager watchManager;

Давайте посмотримZKWatchManagerЭтот класс содержит свойства карты:

 static class ZKWatchManager implements ClientWatchManager {
		
        // 获取的Map
        private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();
        private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();
        private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();
        private final Map<String, Set<Watcher>> persistentWatches = new HashMap<String, Set<Watcher>>();
        private final Map<String, Set<Watcher>> persistentRecursiveWatches = new HashMap<String, Set<Watcher>>();
        private boolean disableAutoWatchReset;
  ...
 }      

Эти Карты содержат различные категории информации Watch: dataWatches, existsWatches, childWatches и так далее.

Суммировать
  1. Когда клиент отправляет запрос, он сначала инкапсулирует отправленную информацию вPacketобъект

  2. При отправке будет использоваться метод Java NIO/Netty.Если Watch зарегистрирован, информация Watch будет отправлена ​​на сервер.

  3. После завершения отправки клиент получает ее через метод обратного вызоваZKWatchManagerВ соответствующей карте наблюдения зарегистрируйте соответствующее наблюдение на карте.

    Обратите внимание, что процесс getWatch не заблокирован, зарегистрируйтесьWatchпройти, когдаsynchronizedключевое слово, чтобы гарантировать атомарность. Потому что только операция записи может вызвать проблемы безопасности потоков.

Процесс реализации регистрации наблюдения на стороне сервера

  • Когда сервер ZooKeeper получает запрос клиента, он сначала анализирует запрос, чтобы определить, содержит ли запрос событие наблюдения, и если он содержит событие наблюдения, он сохраняет событие наблюдения вWatchManagerсередина

    Внизу ZooKeeper сквознойFinalRequestProcessor#processRequestметод достижения

    for (Op readOp : multiReadRecord) {
                        try {
                            Record rec;
                            switch (readOp.getType()) {
                            case OpCode.getChildren:
                                rec = handleGetChildrenRequest(readOp.toRequestRecord(), cnxn, request.authInfo);
                                subResult = new GetChildrenResult(((GetChildrenResponse) rec).getChildren());
                                break;
                            case OpCode.getData:
                                // 处理请求
                                rec = handleGetDataRequest(readOp.toRequestRecord(), cnxn, request.authInfo);
                                GetDataResponse gdr = (GetDataResponse) rec;
                                subResult = new GetDataResult(gdr.getData(), gdr.getStat());
                                break;
                            default:
                                throw new IOException("Invalid type of readOp");
                            }
                        } catch (KeeperException e) {
                            subResult = new ErrorResult(e.code().intValue());
                        }
                        ((MultiResponse) rsp).add(subResult);
                    }
                    break;
             ...
    
FinalRequestProcessor#handleGetDataRequest
  private Record handleGetDataRequest(Record request, ServerCnxn cnxn, List<Id> authInfo) throws KeeperException, IOException {
      
        GetDataRequest getDataRequest = (GetDataRequest) request;
        // 获取path
        String path = getDataRequest.getPath();
        DataNode n = zks.getZKDatabase().getNode(path);
        if (n == null) {
            throw new KeeperException.NoNodeException();
        }
        zks.checkACL(cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, authInfo, path, null);
        Stat stat = new Stat();
        
        byte[] b = zks.getZKDatabase().getData(path, stat, getDataRequest.getWatch() ? cnxn : null);
        return new GetDataResponse(b, stat);
    }

getDataRequest.getWatch()Когда он возвращает true, это означает, что запрос должен быть зарегистрирован для мониторинга Watch и переданzks.getZKDatabase().getDataРегистрация событий Watch на сервереWatchManagerсередина.

мы можем поставитьServerCnxnДумайте об этом как о соединении клиента с сервером, я понимаю это как событие Watch клиента.

public abstract class ServerCnxn implements Stats, Watcher {

ZKDatabase#getData
 public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException {
        return dataTree.getData(path, stat, watcher);
  }
// DataTree#getData
private IWatchManager dataWatches;

public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException {
        DataNode n = nodes.get(path);
        byte[] data = null;
        if (n == null) {
            throw new KeeperException.NoNodeException();
        }
        synchronized (n) {
            n.copyStat(stat);
            if (watcher != null) {
                // 注册Watch
                dataWatches.addWatch(path, watcher);
            }
            data = n.data;
        }
        updateReadStat(path, data == null ? 0 : data.length);
        return data;
    }

Время запуска события наблюдения за сервером

выполнение на стороне сервераsetDataПосле того, как метод изменит данные узла, он вызоветWatchManager.triggerWatchтриггер методасобытие изменения данных.

public Stat setData(String path, byte data[], ...){
        Stat s = new Stat();
        DataNode n = nodes.get(path);
        ...
        dataWatches.triggerWatch(path, EventType.NodeDataChanged);
        return s;
    }
WatchManager#triggerWatch
  • Во-первых, он инкапсулирует объект WatchedEvent с тремя атрибутами: состоянием сеанса, типом события и узлом данных.

  • Затем запросите зарегистрированное узлом событие Watch.Если оно пустое, это означает, что узел не зарегистрировал событие Watch. Если есть событие Watch, оно добавляется в определенную коллекцию Watchers и удаляется в управлении WatchManager.

  • Наконец, уведомление отправляется клиенту путем вызова метода процесса.

public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) {
        
      	// 封装WatchedEvent对象 type:事件类型  
        // KeeperState.SyncConnected: 会话状态 path:数据节点
        WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
        Set<Watcher> watchers = new HashSet<>();
        PathParentIterator pathParentIterator = getPathParentIterator(path);
        synchronized (this) {
            for (String localPath : pathParentIterator.asIterable()) {
                // 获取Watchers Set 
                Set<Watcher> thisWatchers = watchTable.get(localPath);
                if (thisWatchers == null || thisWatchers.isEmpty()) {
                    continue;
                }
                // 遍历
                Iterator<Watcher> iterator = thisWatchers.iterator();
                while (iterator.hasNext()) {
                    Watcher watcher = iterator.next();
                    WatcherMode watcherMode = watcherModeManager.getWatcherMode(watcher, localPath);
                    if (watcherMode.isRecursive()) {
                        if (type != EventType.NodeChildrenChanged) {
                            watchers.add(watcher);
                        }
                    } else if (!pathParentIterator.atParentPath()) {
                        watchers.add(watcher);
                        if (!watcherMode.isPersistent()) {
                            iterator.remove();
                            Set<String> paths = watch2Paths.get(watcher);
                            if (paths != null) {
                                paths.remove(localPath);
                            }
                        }
                    }
                }
                if (thisWatchers.isEmpty()) {
                    watchTable.remove(localPath);
                }
            }
        }
        if (watchers.isEmpty()) {
            ...
            return null;
        }
		// 遍历添加的Watcher 
        for (Watcher w : watchers) {
            if (supress != null && supress.contains(w)) {
                continue;
            }
            // 向客户端发送通知
            w.process(e);
        }

        ...
        return new WatcherOrBitSet(watchers);
    }

Процесс обратного звонка клиента

Использование клиентаSendThread.readResponse()метод для единообразной обработки ответов на стороне сервера.

  • Сначала сервер десериализации отправляет информацию заголовка запроса responseHdr.deserialize(bbia, "header") и определяет, что значение соответствующего атрибутивного поля xid равно -1, указывая, что ответ на запрос относится к типу уведомления. При обработке типа уведомления сначала преобразуйте полученный поток байтовдесериализоватьПеревести вWatcherEventобъект.
  • Затем оцените, настроен ли клиент с атрибутом chrootPath, если онtrueУказывает, что клиент настроил атрибут chrootPath. Полученный путь узла должен быть обработан chrootPath.
  • последний звонокeventThread.queueEvent()Метод передает полученное событие вEventThreadнить для обработки.
if (replyHdr.getXid() == -1) {
    ...
    WatcherEvent event = new WatcherEvent();
    // 反序列化服务器发送请求头信
    event.deserialize(bbia, "response");
    ...
    if (chrootPath != null) {
        String serverPath = event.getPath();
        if(serverPath.compareTo(chrootPath)==0)
            event.setPath("/");
            ...
            event.setPath(serverPath.substring(chrootPath.length()));
            ...
    }
    WatchedEvent we = new WatchedEvent(event);
    ...
    eventThread.queueEvent( we );
}

нам просто нужно обратить вниманиеeventThread.queueEvent()метод.

EventThread.queueEvent
private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {
    
            if (event.getType() == EventType.None && sessionState == event.getState()) {
                return;
            }
            sessionState = event.getState();
    		
            // 获取注册过的Watcher事件
            final Set<Watcher> watchers;
            if (materializedWatchers == null) {
                // materialize the watchers based on the event
                watchers = watcher.materialize(event.getState(), event.getType(), event.getPath());
            } else {
                watchers = new HashSet<Watcher>();
                watchers.addAll(materializedWatchers);
            }
    
    
            WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
            // queue the pair (watch set & event) for later processing
    		// 将Watcher事件添加到waitingEvents中
            waitingEvents.add(pair);
}
ZooKeeper.ZKWatchManager#materialize
public Set<Watcher> materialize(...)
{
	Set<Watcher> result = new HashSet<Watcher>();
	...
    // type: 事件类型
	switch (type) {
    ...
	case NodeDataChanged:
	case NodeCreated:
	    synchronized (dataWatches) {
	        addTo(dataWatches.remove(clientPath), result);
	    }
	    synchronized (existWatches) {
	        addTo(existWatches.remove(clientPath), result);
	    }
	    break;
    ....
	}
	return result;
}

Шаг 1 В зависимости от типа события уведомления, отZKWatchManagerЗапрос зарегистрированного клиента Смотреть информацию в формате . После того, как клиент найдет соответствующую информацию Watch, она будет удалена из управления ZKWatchManager.

Механизм Watcher клиента является одноразовым и будет удален после его срабатывания.

class EventThread extends ZooKeeperThread {

        private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue<Object>();
    ...

waitingEventsдаEventThreadБлокирующая очередь вZKWatchManagerЗапрос зарегистрированного клиента Информация о просмотре будет добавлена ​​вwaitingEventsсередина.

а такжеEventThreadКласс метода будет запускать цикл oksevents, взятых в очереди события, ожидая сторонника обработки.

Это можно понимать как непрерывное получение обработки события Watcher для обработки в цикле.

public void run() {
	try {
	  isRunning = true;
       
      // 无限循环
	  while (true) {
         // 取出事件
	     Object event = waitingEvents.take();
	     if (event == eventOfDeath) {
	        wasKilled = true;
	     } else {
            // 处理事件 
	        processEvent(event);
	     }
	     if (wasKilled)
	        synchronized (waitingEvents) {
	           if (waitingEvents.isEmpty()) {
	              isRunning = false;
	              break;
	           }
	        }
	  }
     ...
}

**processEvent(event) **

private void processEvent(Object event) {
  ...
  if (event instanceof WatcherSetEventPair) {
      
      WatcherSetEventPair pair = (WatcherSetEventPair) event;
      for (Watcher watcher : pair.watchers) {
          try {
              // 回调事件监听的处理
              watcher.process(pair.event);
          } catch (Throwable t) {
              LOG.error("Error while calling watcher ", t);
          }
      }
  }
}

Суммировать

ZooKeeper реализует механизм Watch, создавая список информации с наблюдателями через клиент и сервер соответственно.

  • Когда клиент вызывает такие интерфейсы, как getData и exists, он сначала помещает соответствующее событие Watch в локальныйZKWatchManagerв управлении.
  • После получения запроса от клиента сервер оценивает, содержит ли онWatchсобытие и поместите соответствующее событие вWatchManagerв управлении.

существуеттриггер событияКогда сервер запрашивает соответствующее уведомление о событии наблюдения у клиента через информацию о пути узла, после получения уведомления клиент сначала запрашивает локальный ZKWatchManager для получения соответствующей операции обратного вызова обработки информации наблюдения.

Этот дизайн не только реализует режим наблюдателя в распределенной среде, но также сохраняет дополнительную информацию, необходимую клиенту и серверу для обработки событий наблюдения на обоих концах, уменьшая содержание связи друг с другом и значительно повышая производительность службы. .

Watch机制原理

Применение часового механизма

Центр конфигурации

Мы можем хранить такую ​​информацию, как элементы конфигурации базы данных, в узлах данных ZooKeeper. Клиент кластера серверов добавляет к узлу мониторинг событий Watch.Когда служба в кластере запускается, она считывает данные узла, чтобы получить информацию о конфигурации данных. И когда данные узла меняются,Сервер ZooKeeper будет отправлять события Watch каждому клиенту (push), после того как клиент в кластере получит уведомление,Перечитать информацию о конфигурации базы данных узла (pull).

ZooKeeper реализует механизм двухтактной комбинации.

Регистрационный центр

ZooKeeper часто используется в качестве центра регистрации (ЦП) в проекте Dubbo, по принципу аналогичен центру конфигурации, что резко контрастирует с дизайном ЦД Eureka.