Инструмент zkCli подходит для отладки, не рекомендуется использовать инструмент zkCli для сборки системы.
В реальной разработке Java API ZooKeeper обычно не используется напрямую, но используется библиотека инкапсуляции более высокого уровня Curator, но изучение Java API по-прежнему дает много преимуществ.
В этой статье представлены функции создания сеансов и точек мониторинга с помощью Java API ZooKeeper, а также демонстрируется режим ведущий-подчиненный.
добавить зависимости
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.9</version>
</dependency>
установить сеанс
Запустите сервер ZooKeeper и установите сеанс через Java API.
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
ConnectString содержит имя хоста и номер порта, sessionTimeout — время ожидания сеанса, а объект-наблюдатель используется для получения событий сеанса.
Watcher — это интерфейс, его нужно переписать для реализации Watchervoid process(WatchedEvent event)
метод.
При возникновении сбоев в сети клиенты ZooKeeper автоматически переподключаются, если соединение потеряно.
Получение прав на управление
Ниже приведен простой алгоритм выбора лидера группы, реализованный с помощью Java API ZooKeeper, чтобы гарантировать, что только один процесс главного узла активен одновременно.
package com.ulyssesss.zookeeper;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Random;
public class Master implements Watcher {
private ZooKeeper zk;
private String serviceId = Integer.toString(new Random().nextInt());
private boolean isLeader = false;
private void startZk() throws IOException {
zk = new ZooKeeper("localhost:2181", 5000, this);
}
private void stopZk() throws InterruptedException {
zk.close();
}
public void process(WatchedEvent watchedEvent) {
System.out.println("event: " + watchedEvent);
}
public static void main(String[] args) throws Exception {
Master master = new Master();
master.startZk();
master.runForMaster();
System.out.println("serviceId: " + master.serviceId);
if (master.isLeader) {
System.out.println("master");
Thread.sleep(10000);
} else {
System.out.println("not master");
}
master.stopZk();
}
private boolean checkMaster() throws InterruptedException {
while (true) {
try {
Stat stat = new Stat();
byte data[] = zk.getData("/master", false, stat);
isLeader = new String(data).equals(serviceId);
return true;
} catch (KeeperException.NoNodeException e) {
return false;
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
private void runForMaster() throws InterruptedException {
while (true) {
try {
zk.create("/master", serviceId.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
isLeader = true;
break;
} catch (KeeperException.NodeExistsException e) {
isLeader = false;
break;
} catch (KeeperException e) {
e.printStackTrace();
}
if (checkMaster()) {
break;
}
}
}
}
Основная функция выполняется для создания демо-экземпляра, экземпляр назначит случайное целое число в качестве идентификатора и попытается создать мастер мастер-узла после установления соединения с ZooKeeper.
Если главный главный узел успешно создан, экземпляр является лидером кластера; если узел уже существует, другие экземпляры являются лидерами; при возникновении исключения, такого как отключение, информация об ответе теряется, и невозможно определить, является ли текущий процесс является главным узлом. ) способ перепроверить главный статус.
Выполните основную функцию несколько раз, при этом мастер будет напечатан при первом выполнении. В течение 10 секунд до отключения мастера он будет печатать не мастер, если он будет выполнен снова. Когда мастер выполняется в первый раз время отключается, снова будет выполняться основная функция функция, мастер печати.
Создайте нужный каталог асинхронно
Всем синхронным операциям в ZooKeeper соответствуют асинхронные операции, асинхронные вызовы не будут блокировать приложение и упростят реализацию приложения.
Модель master-slave требует использования трех каталогов: /tasks, /assign и /workers, которые могут быть созданы некоторыми конфигурациями системы. В следующем примере кода требуемый каталог создается асинхронно.
private void bootstrap() {
createParent("/workers", new byte[0]);
createParent("/assign", new byte[0]);
createParent("/tasks", new byte[0]);
}
private void createParent(String path, byte[] data) {
zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, createParentCallback, data);
}
AsyncCallback.StringCallback createParentCallback = new AsyncCallback.StringCallback() {
public void processResult(int rc, String path, Object ctx, String name) {
switch (KeeperException.Code.get(rc)) {
case OK:
System.out.println("parent " + path + " created");
break;
case NODEEXISTS:
System.out.println("parent " + path + " already registered");
break;
case CONNECTIONLOSS:
createParent(path, (byte[]) ctx);
break;
default:
System.out.println("create " + path + " error");
}
}
};
зарегистрировать подчиненный узел
В предыдущем разделе уже был главный узел, поэтому для того, чтобы главный узел мог управлять, пришло время настроить подчиненные узлы, создав временные узлы в /workers.
package com.ulyssesss.zookeeper;
import org.apache.zookeeper.*;
import java.io.IOException;
import java.util.Random;
public class Worker implements Watcher {
private ZooKeeper zk;
private String serviceId = Integer.toString(new Random().nextInt());
private void startZk() throws IOException {
zk = new ZooKeeper("localhost:2181", 5000, this);
}
@Override
public void process(WatchedEvent event) {
System.out.println("event: " + event);
}
private void register() {
zk.create("/workers/worker-" + serviceId, "Idle".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE
, CreateMode.EPHEMERAL, createWorkerCallback, null);
}
private AsyncCallback.StringCallback createWorkerCallback = new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
switch (KeeperException.Code.get(rc)) {
case OK:
System.out.println("registered successfully: " + serviceId);
break;
case NODEEXISTS:
System.out.println("already registered: " + serviceId);
break;
case CONNECTIONLOSS:
register();
break;
default:
System.out.println("error");
}
}
};
private String status;
public void setStatus(String status) {
this.status = status;
updateStatus(status);
}
synchronized private void updateStatus(String status) {
if (status.equals(this.status)) {
zk.setData("/workers/worker-" + serviceId, status.getBytes(), -1, statusUpdateCallback, status);
}
}
AsyncCallback.StatCallback statusUpdateCallback = new AsyncCallback.StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
switch (KeeperException.Code.get(rc)) {
case CONNECTIONLOSS:
updateStatus((String) ctx);
break;
default:
}
}
};
public static void main(String[] args) throws Exception {
Worker worker = new Worker();
worker.startZk();
worker.register();
Thread.sleep(30000);
}
}
Основная функция создает рабочий экземпляр, открывает сеанс, выполняет логику регистрации и снова выполняет логику регистрации, если соединение потеряно при создании узла.Узел, созданный при регистрации, является временным узлом.
При запуске некоторых задач с узла необходимо обновить состояние узла с помощью метода setStatus().
очередь задач
Клиентский компонент в системе используется для добавления задач для выполнения задач с узлов. Ниже приведен код клиента:
package com.ulyssesss.zookeeper;
import org.apache.zookeeper.*;
import java.io.IOException;
public class Client implements Watcher {
private ZooKeeper zk;
private void startZk() throws IOException {
zk = new ZooKeeper("localhost:2181", 5000, this);
}
@Override
public void process(WatchedEvent event) {
System.out.println("event: " + event);
}
private String queueCommand(String command) {
while (true) {
try {
String name = zk.create("/tasks/task-", command.getBytes()
, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
return name;
} catch (Exception e) {
System.out.println("error");
}
}
}
public static void main(String[] args) throws Exception {
Client client = new Client();
client.startZk();
String name = client.queueCommand("command-1");
System.out.println("created " + name);
}
}
Клиент использует упорядоченную задачу, помеченную узлом, TASK-back будет следовать инкрементному целому числу, если вы выполните CREATE, а затем повторите операцию CREATE, примените стратегию [хотя бы один раз]. Чтобы использовать политики [для выполнения один раз], вы можете добавить уникальный идентификатор задачи к имени узла.
Управление клиентами
Клиент управления AdminClient используется для отображения рабочего состояния системы.Код выглядит следующим образом:
package com.ulyssesss.zookeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Date;
public class AdminClient implements Watcher {
private ZooKeeper zk;
private void startZk() throws IOException {
zk = new ZooKeeper("localhost:2181", 5000, this);
}
@Override
public void process(WatchedEvent event) {
System.out.println("event: " + event);
}
private void listState() throws KeeperException, InterruptedException {
try {
Stat stat = new Stat();
byte[] masterData = zk.getData("/master", false, stat);
Date startDate = new Date(stat.getCtime());
System.out.println("master: " + new String(masterData) + " since " + startDate);
} catch (KeeperException.NoNodeException e) {
System.out.println("no master");
}
System.out.println("workers: \n");
for (String worker : zk.getChildren("/workers", false)) {
byte[] data = zk.getData("/workers/" + worker, false, null);
String state = new String(data);
System.out.println("worker: " + state);
}
// ...
}
public static void main(String[] args) throws Exception {
AdminClient adminClient = new AdminClient();
adminClient.startZk();
adminClient.listState();
}
}
Приведенный выше код просто перечислит информацию о каждом узле.
Программирование через Java API очень похоже на команду zkCli, разница в том, что zkCli часто используется для отладки и обычно используется в относительно стабильной среде. Программы, написанные с помощью Java API, должны учитывать исключения, особенно исключения ConnectionLossException, и должны проверять состояние и правильно восстанавливаться.