ZooKeeper использует Java API

Java задняя часть алгоритм ZooKeeper

Инструмент zkCli подходит для отладки, не рекомендуется использовать инструмент zkCli для сборки системы.

В реальной разработке Java API ZooKeeper обычно не используется напрямую, но используется библиотека инкапсуляции более высокого уровня Curator, но изучение Java API по-прежнему дает много преимуществ.

В этой статье представлены функции создания сеансов и точек мониторинга с помощью Java API ZooKeeper, а также демонстрируется режим ведущий-подчиненный.

добавить зависимости

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.9</version>
</dependency>

установить сеанс

Запустите сервер ZooKeeper и установите сеанс через Java API.

ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)

ConnectString содержит имя хоста и номер порта, sessionTimeout — время ожидания сеанса, а объект-наблюдатель используется для получения событий сеанса.

Watcher — это интерфейс, его нужно переписать для реализации Watchervoid process(WatchedEvent event)метод.

При возникновении сбоев в сети клиенты ZooKeeper автоматически переподключаются, если соединение потеряно.

Получение прав на управление

Ниже приведен простой алгоритм выбора лидера группы, реализованный с помощью Java API ZooKeeper, чтобы гарантировать, что только один процесс главного узла активен одновременно.

package com.ulyssesss.zookeeper;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Random;

public class Master implements Watcher {
    
    private ZooKeeper zk;
    private String serviceId = Integer.toString(new Random().nextInt());
    private boolean isLeader = false;

    private void startZk() throws IOException {
        zk = new ZooKeeper("localhost:2181", 5000, this);
    }

    private void stopZk() throws InterruptedException {
        zk.close();
    }

    public void process(WatchedEvent watchedEvent) {
        System.out.println("event: " + watchedEvent);
    }

    public static void main(String[] args) throws Exception {
		Master master = new Master();
		master.startZk();

		master.runForMaster();

        System.out.println("serviceId: " + master.serviceId);

		if (master.isLeader) {
            System.out.println("master");
            Thread.sleep(10000);
        } else {
            System.out.println("not master");
        }

		master.stopZk();
    }

    private boolean checkMaster() throws InterruptedException {
        while (true) {
            try {
                Stat stat = new Stat();
                byte data[] = zk.getData("/master", false, stat);
                isLeader = new String(data).equals(serviceId);
                return true;
            } catch (KeeperException.NoNodeException e) {
                return false;
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        }
    }

    private void runForMaster() throws InterruptedException {
        while (true) {
            try {
                zk.create("/master", serviceId.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                isLeader = true;
                break;
            } catch (KeeperException.NodeExistsException e) {
                isLeader = false;
                break;
            } catch (KeeperException e) {
                e.printStackTrace();
            }
            if (checkMaster()) {
                break;
            }
        }
    }
}

Основная функция выполняется для создания демо-экземпляра, экземпляр назначит случайное целое число в качестве идентификатора и попытается создать мастер мастер-узла после установления соединения с ZooKeeper.

Если главный главный узел успешно создан, экземпляр является лидером кластера; если узел уже существует, другие экземпляры являются лидерами; при возникновении исключения, такого как отключение, информация об ответе теряется, и невозможно определить, является ли текущий процесс является главным узлом. ) способ перепроверить главный статус.

Выполните основную функцию несколько раз, при этом мастер будет напечатан при первом выполнении. В течение 10 секунд до отключения мастера он будет печатать не мастер, если он будет выполнен снова. Когда мастер выполняется в первый раз время отключается, снова будет выполняться основная функция функция, мастер печати.

Создайте нужный каталог асинхронно

Всем синхронным операциям в ZooKeeper соответствуют асинхронные операции, асинхронные вызовы не будут блокировать приложение и упростят реализацию приложения.

Модель master-slave требует использования трех каталогов: /tasks, /assign и /workers, которые могут быть созданы некоторыми конфигурациями системы. В следующем примере кода требуемый каталог создается асинхронно.

private void bootstrap() {
    createParent("/workers", new byte[0]);
    createParent("/assign", new byte[0]);
    createParent("/tasks", new byte[0]);
}

private void createParent(String path, byte[] data) {
    zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, createParentCallback, data);
}

AsyncCallback.StringCallback createParentCallback = new AsyncCallback.StringCallback() {
    public void processResult(int rc, String path, Object ctx, String name) {
        switch (KeeperException.Code.get(rc)) {
            case OK:
                System.out.println("parent " + path + " created");
                break;
            case NODEEXISTS:
                System.out.println("parent " + path + " already registered");
                break;
            case CONNECTIONLOSS:
                createParent(path, (byte[]) ctx);
                break;
            default:
                System.out.println("create " + path + " error");
        }
    }
};

зарегистрировать подчиненный узел

В предыдущем разделе уже был главный узел, поэтому для того, чтобы главный узел мог управлять, пришло время настроить подчиненные узлы, создав временные узлы в /workers.

package com.ulyssesss.zookeeper;

import org.apache.zookeeper.*;
import java.io.IOException;
import java.util.Random;

public class Worker implements Watcher {

    private ZooKeeper zk;
    private String serviceId = Integer.toString(new Random().nextInt());

    private void startZk() throws IOException {
        zk = new ZooKeeper("localhost:2181", 5000, this);
    }

    @Override
    public void process(WatchedEvent event) {
        System.out.println("event: " + event);
    }

    private void register() {
        zk.create("/workers/worker-" + serviceId, "Idle".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE
                , CreateMode.EPHEMERAL, createWorkerCallback, null);
    }

    private AsyncCallback.StringCallback createWorkerCallback = new AsyncCallback.StringCallback() {
        @Override
        public void processResult(int rc, String path, Object ctx, String name) {
            switch (KeeperException.Code.get(rc)) {
                case OK:
                    System.out.println("registered successfully: " + serviceId);
                    break;
                case NODEEXISTS:
                    System.out.println("already registered: " + serviceId);
                    break;
                case CONNECTIONLOSS:
                    register();
                    break;
                default:
                    System.out.println("error");
            }
        }
    };

    private String status;

    public void setStatus(String status) {
        this.status = status;
        updateStatus(status);
    }

    synchronized private void updateStatus(String status) {
        if (status.equals(this.status)) {
            zk.setData("/workers/worker-" + serviceId, status.getBytes(), -1, statusUpdateCallback, status);
        }
    }

    AsyncCallback.StatCallback statusUpdateCallback = new AsyncCallback.StatCallback() {
        @Override
        public void processResult(int rc, String path, Object ctx, Stat stat) {
            switch (KeeperException.Code.get(rc)) {
                case CONNECTIONLOSS:
                    updateStatus((String) ctx);
                    break;
                default:
            }
        }
    };
    
    public static void main(String[] args) throws Exception {
		Worker worker = new Worker();
		worker.startZk();
		worker.register();
		Thread.sleep(30000);
    }
}

Основная функция создает рабочий экземпляр, открывает сеанс, выполняет логику регистрации и снова выполняет логику регистрации, если соединение потеряно при создании узла.Узел, созданный при регистрации, является временным узлом.

При запуске некоторых задач с узла необходимо обновить состояние узла с помощью метода setStatus().

очередь задач

Клиентский компонент в системе используется для добавления задач для выполнения задач с узлов. Ниже приведен код клиента:

package com.ulyssesss.zookeeper;

import org.apache.zookeeper.*;
import java.io.IOException;

public class Client implements Watcher {

    private ZooKeeper zk;

    private void startZk() throws IOException {
        zk = new ZooKeeper("localhost:2181", 5000, this);
    }

    @Override
    public void process(WatchedEvent event) {
        System.out.println("event: " + event);
    }

    private String queueCommand(String command) {
        while (true) {
            try {
                String name = zk.create("/tasks/task-", command.getBytes()
                        , ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
                return name;
            } catch (Exception e) {
                System.out.println("error");
            }
        }
    }


    public static void main(String[] args) throws Exception {
		Client client = new Client();
		client.startZk();
		String name = client.queueCommand("command-1");
        System.out.println("created " + name);
    }
}

Клиент использует упорядоченную задачу, помеченную узлом, TASK-back будет следовать инкрементному целому числу, если вы выполните CREATE, а затем повторите операцию CREATE, примените стратегию [хотя бы один раз]. Чтобы использовать политики [для выполнения один раз], вы можете добавить уникальный идентификатор задачи к имени узла.

Управление клиентами

Клиент управления AdminClient используется для отображения рабочего состояния системы.Код выглядит следующим образом:

package com.ulyssesss.zookeeper;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Date;

public class AdminClient implements Watcher {

    private ZooKeeper zk;

    private void startZk() throws IOException {
        zk = new ZooKeeper("localhost:2181", 5000, this);
    }

    @Override
    public void process(WatchedEvent event) {
        System.out.println("event: " + event);
    }

    private void listState() throws KeeperException, InterruptedException {
        try {
            Stat stat = new Stat();
            byte[] masterData = zk.getData("/master", false, stat);
            Date startDate = new Date(stat.getCtime());
            System.out.println("master: " + new String(masterData) + " since " + startDate);
        } catch (KeeperException.NoNodeException e) {
            System.out.println("no master");
        }

        System.out.println("workers: \n");
        for (String worker : zk.getChildren("/workers", false)) {
            byte[] data = zk.getData("/workers/" + worker, false, null);
            String state = new String(data);
            System.out.println("worker: " + state);
        }

        // ...
    }

    public static void main(String[] args) throws Exception {
		AdminClient adminClient = new AdminClient();
		adminClient.startZk();
		adminClient.listState();
    }
}

Приведенный выше код просто перечислит информацию о каждом узле.

Программирование через Java API очень похоже на команду zkCli, разница в том, что zkCli часто используется для отладки и обычно используется в относительно стабильной среде. Программы, написанные с помощью Java API, должны учитывать исключения, особенно исключения ConnectionLossException, и должны проверять состояние и правильно восстанавливаться.

исходный адрес