Оригинал: Miss Sister Taste (идентификатор публичной учетной записи WeChat: xjjdog), добро пожаловать, пожалуйста, сохраните источник для перепечатки.
zookeeper не предназначен для обеспечения высокой доступности, но используетZAB
Протокол обеспечивает чрезвычайно высокую согласованность. Поэтому его часто выбирают в качестве центра реестра, центра конфигурации, распределенной блокировки и других сценариев.
Его производительность очень ограничена, а API не очень удобен в использовании. xjjdog склонен использоватьRaft
протоколEtcd
илиConsul
, они более легкие.
Curator — это набор клиентов zookeeper с открытым исходным кодом от компании netflix, который в настоящее время является проектом верхнего уровня Apache. По сравнению с собственным клиентом Zookeeper, Curator имеет более высокий уровень абстракции, что упрощает разработку клиента Zookeeper. Curator решает многие очень низкоуровневые детали работы по разработке клиента zookeeper, включая переподключение, повторную регистрацию наблюдателей и исключения NodeExistsException.
Куратор состоит из серии модулей.Для обычных разработчиков обычно используются куратор-фреймворк и куратор-рецепты, которые будут представлены по очереди ниже.
1. maven-зависимости
Последняя версия куратора 4.3.0 поддерживает zookeeper 3.4.x и 3.5, но нам нужно обратить внимание на зависимости, переданные куратором, которые должны соответствовать версии, используемой фактическим сервером. в настоящее время используют в качестве примера.
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
2.curator-framework
Ниже приведены некоторые распространенные операции, связанные с zk.
public static CuratorFramework getClient() {
return CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.connectionTimeoutMs(15 * 1000) //连接超时时间,默认15秒
.sessionTimeoutMs(60 * 1000) //会话超时时间,默认60秒
.namespace("arch") //设置命名空间
.build();
}
public static void create(final CuratorFramework client, final String path, final byte[] payload) throws Exception {
client.create().creatingParentsIfNeeded().forPath(path, payload);
}
public static void createEphemeral(final CuratorFramework client, final String path, final byte[] payload) throws Exception {
client.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
}
public static String createEphemeralSequential(final CuratorFramework client, final String path, final byte[] payload) throws Exception {
return client.create().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, payload);
}
public static void setData(final CuratorFramework client, final String path, final byte[] payload) throws Exception {
client.setData().forPath(path, payload);
}
public static void delete(final CuratorFramework client, final String path) throws Exception {
client.delete().deletingChildrenIfNeeded().forPath(path);
}
public static void guaranteedDelete(final CuratorFramework client, final String path) throws Exception {
client.delete().guaranteed().forPath(path);
}
public static String getData(final CuratorFramework client, final String path) throws Exception {
return new String(client.getData().forPath(path));
}
public static List<String> getChildren(final CuratorFramework client, final String path) throws Exception {
return client.getChildren().forPath(path);
}
3.curator-recipes
curator-recipes предоставляет некоторые ссылки для типичных сценариев использования zk. Далее в основном представлены компоненты, обычно используемые при разработке.
прослушиватель событий
Zookeeper изначально поддерживает мониторинг событий путем регистрации наблюдателей, но его использование не особенно удобно, поскольку от разработчиков требуется многократно регистрировать наблюдателей, что обременительно.
Curator представляет Cache для мониторинга транзакций сервера zookeeper. Cache — это оболочка для мониторинга событий в Curator, и его мониторинг событий можно аппроксимировать как процесс сравнения между представлением локального кэша и удаленным представлением Zookeeper. В то же время Curator может автоматически обрабатывать повторную регистрацию и мониторинг для разработчиков, что значительно упрощает утомительный процесс разработки нативного API.
1) Кэш узла
public static void nodeCache() throws Exception {
final String path = "/nodeCache";
final CuratorFramework client = getClient();
client.start();
delete(client, path);
create(client, path, "cache".getBytes());
final NodeCache nodeCache = new NodeCache(client, path);
nodeCache.start(true);
nodeCache.getListenable()
.addListener(() -> System.out.println("node data change, new data is " + new String(nodeCache.getCurrentData().getData())));
setData(client, path, "cache1".getBytes());
setData(client, path, "cache2".getBytes());
Thread.sleep(1000);
client.close();
}
NodeCache может отслеживать указанный узел.После регистрации слушателя изменение узла уведомит соответствующий слушатель.
2) Кэш пути
Кэш путей используется для мониторинга событий дочерних узлов ZNode, включая добавление, обновление и удаление.Кэш путей будет синхронизировать состояние дочерних узлов, а сгенерированные события будут передаваться в зарегистрированный PathChildrenCacheListener.
public static void pathChildrenCache() throws Exception {
final String path = "/pathChildrenCache";
final CuratorFramework client = getClient();
client.start();
final PathChildrenCache cache = new PathChildrenCache(client, path, true);
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
cache.getListenable().addListener((client1, event) -> {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("CHILD_ADDED:" + event.getData().getPath());
break;
case CHILD_REMOVED:
System.out.println("CHILD_REMOVED:" + event.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("CHILD_UPDATED:" + event.getData().getPath());
break;
case CONNECTION_LOST:
System.out.println("CONNECTION_LOST:" + event.getData().getPath());
break;
case CONNECTION_RECONNECTED:
System.out.println("CONNECTION_RECONNECTED:" + event.getData().getPath());
break;
case CONNECTION_SUSPENDED:
System.out.println("CONNECTION_SUSPENDED:" + event.getData().getPath());
break;
case INITIALIZED:
System.out.println("INITIALIZED:" + event.getData().getPath());
break;
default:
break;
}
});
// client.create().withMode(CreateMode.PERSISTENT).forPath(path);
Thread.sleep(1000);
client.create().withMode(CreateMode.PERSISTENT).forPath(path + "/c1");
Thread.sleep(1000);
client.delete().forPath(path + "/c1");
Thread.sleep(1000);
client.delete().forPath(path); //监听节点本身的变化不会通知
Thread.sleep(1000);
client.close();
}
3) Кэш дерева
«Комбинация» Path Cache и Node Cache отслеживает события создания, обновления и удаления по пути и кэширует данные всех дочерних узлов по пути.
public static void treeCache() throws Exception {
final String path = "/treeChildrenCache";
final CuratorFramework client = getClient();
client.start();
final TreeCache cache = new TreeCache(client, path);
cache.start();
cache.getListenable().addListener((client1, event) -> {
switch (event.getType()){
case NODE_ADDED:
System.out.println("NODE_ADDED:" + event.getData().getPath());
break;
case NODE_REMOVED:
System.out.println("NODE_REMOVED:" + event.getData().getPath());
break;
case NODE_UPDATED:
System.out.println("NODE_UPDATED:" + event.getData().getPath());
break;
case CONNECTION_LOST:
System.out.println("CONNECTION_LOST:" + event.getData().getPath());
break;
case CONNECTION_RECONNECTED:
System.out.println("CONNECTION_RECONNECTED:" + event.getData().getPath());
break;
case CONNECTION_SUSPENDED:
System.out.println("CONNECTION_SUSPENDED:" + event.getData().getPath());
break;
case INITIALIZED:
System.out.println("INITIALIZED:" + event.getData().getPath());
break;
default:
break;
}
});
client.create().withMode(CreateMode.PERSISTENT).forPath(path);
Thread.sleep(1000);
client.create().withMode(CreateMode.PERSISTENT).forPath(path + "/c1");
Thread.sleep(1000);
setData(client, path, "test".getBytes());
Thread.sleep(1000);
client.delete().forPath(path + "/c1");
Thread.sleep(1000);
client.delete().forPath(path);
Thread.sleep(1000);
client.close();
}
выборы
куратор предоставляет два способа, а именно «фиксация лидера» и «выборы лидера».
1) Лидер защелки
Случайным образом выберите одного из кандидатов в качестве лидера.После выбора, если только не будет вызвана функция close() для освобождения лидера, другой пост-выбор не может стать лидером.
public class LeaderLatchTest {
private static final String PATH = "/demo/leader";
public static void main(String[] args) {
List<LeaderLatch> latchList = new ArrayList<>();
List<CuratorFramework> clients = new ArrayList<>();
try {
for (int i = 0; i < 10; i++) {
CuratorFramework client = getClient();
client.start();
clients.add(client);
final LeaderLatch leaderLatch = new LeaderLatch(client, PATH, "client#" + i);
leaderLatch.addListener(new LeaderLatchListener() {
@Override
public void isLeader() {
System.out.println(leaderLatch.getId() + ":I am leader. I am doing jobs!");
}
@Override
public void notLeader() {
System.out.println(leaderLatch.getId() + ":I am not leader. I will do nothing!");
}
});
latchList.add(leaderLatch);
leaderLatch.start();
}
Thread.sleep(1000 * 60);
} catch (Exception e) {
e.printStackTrace();
} finally {
for (CuratorFramework client : clients) {
CloseableUtils.closeQuietly(client);
}
for (LeaderLatch leaderLatch : latchList) {
CloseableUtils.closeQuietly(leaderLatch);
}
}
}
public static CuratorFramework getClient() {
return CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.connectionTimeoutMs(15 * 1000) //连接超时时间,默认15秒
.sessionTimeoutMs(60 * 1000) //会话超时时间,默认60秒
.namespace("arch") //设置命名空间
.build();
}
}
2) Выборы лидера
Лидерство можно контролировать с помощью LeaderSelectorListener, а лидерство можно освободить в нужное время, чтобы каждый узел мог получить лидерство. LeaderLatch всегда удерживает лидерство, если только не вызывается метод close, в противном случае он не освобождает лидерство.
public class LeaderSelectorTest {
private static final String PATH = "/demo/leader";
public static void main(String[] args) {
List<LeaderSelector> selectors = new ArrayList<>();
List<CuratorFramework> clients = new ArrayList<>();
try {
for (int i = 0; i < 10; i++) {
CuratorFramework client = getClient();
client.start();
clients.add(client);
final String name = "client#" + i;
LeaderSelector leaderSelector = new LeaderSelector(client, PATH, new LeaderSelectorListenerAdapter() {
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
System.out.println(name + ":I am leader.");
Thread.sleep(2000);
}
});
leaderSelector.autoRequeue();
leaderSelector.start();
selectors.add(leaderSelector);
}
Thread.sleep(Integer.MAX_VALUE);
} catch (Exception e) {
e.printStackTrace();
} finally {
for (CuratorFramework client : clients) {
CloseableUtils.closeQuietly(client);
}
for (LeaderSelector selector : selectors) {
CloseableUtils.closeQuietly(selector);
}
}
}
public static CuratorFramework getClient() {
return CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.connectionTimeoutMs(15 * 1000) //连接超时时间,默认15秒
.sessionTimeoutMs(60 * 1000) //会话超时时间,默认60秒
.namespace("arch") //设置命名空间
.build();
}
}
Распределенная блокировка
1) Общая блокировка повторного входа
Общий означает, что блокировка видна глобально, и клиенты могут запросить блокировку. Reentrant похож на ReentrantLock JDK, что означает, что один и тот же клиент может получить блокировку несколько раз, не будучи заблокированным. Он реализуется классом InterProcessMutex. Его конструктор:
public InterProcessMutex(CuratorFramework client, String path)
Получите блокировку через получение и предоставьте механизм тайм-аута:
/**
* Acquire the mutex - blocking until it's available. Note: the same thread can call acquire
* re-entrantly. Each call to acquire must be balanced by a call to release()
*/
public void acquire();
/**
* Acquire the mutex - blocks until it's available or the given time expires. Note: the same thread can
* call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call to release()
* Parameters:
* time - time to wait
* unit - time unit
* Returns:
* true if the mutex was acquired, false if not
*/
public boolean acquire(long time, TimeUnit unit);
Снимите блокировку с помощью метода release(). Экземпляры InterProcessMutex можно использовать повторно. В вики-странице рецептов отзыва ZooKeeper определен механизм отзыва, который можно согласовать. Чтобы отменить мьютекс, вызовите следующий метод:
/**
* 将锁设为可撤销的. 当别的进程或线程想让你释放锁时Listener会被调用。
* Parameters:
* listener - the listener
*/
public void makeRevocable(RevocationListener<T> listener)
2) Блокировка без повторного входа Shared Lock
При использовании InterProcessSemaphoreMutex метод вызова аналогичен, разница в том, что блокировка не реентерабельна и не может реентерабельна в том же потоке
3) Общая реентерабельная блокировка чтения и записи
Подобно ReentrantReadWriteLock в JDK, блокировка чтения-записи управляет парой связанных блокировок. Один отвечает за операции чтения, а другой отвечает за операции записи. Операции чтения могут использоваться несколькими процессами одновременно, когда блокировка записи не используется, а чтение (блокировка) не разрешено, когда используется блокировка записи. Эта блокировка является повторно используемой. Поток, которому принадлежит блокировка записи, может повторно войти в блокировку чтения, но блокировка чтения не может войти в блокировку записи. Это также означает, что блокировку записи можно понизить до блокировки чтения, например запрос блокировки записи --> блокировка чтения --> снятие блокировки записи. Невозможно обновить блокировку чтения до блокировки записи. В основном реализуется двумя классами:
InterProcessReadWriteLock
InterProcessLock
4) Общий семафор семафора
Счетный семафор похож на семафор JDK. Набор лицензий, поддерживаемый Semaphore в JDK, который в Кубаторе называется арендой. Обратите внимание, что все экземпляры должны использовать одно и то же значение numberOfLeases. Вызов Acquire возвращает объект аренды. Клиенты должны окончательно закрыть эти объекты аренды, иначе эти договоры аренды будут потеряны. Однако, если сеанс клиента потерян по какой-либо причине, например, из-за сбоя, аренда, удерживаемая этими клиентами, будет автоматически закрыта, чтобы другие клиенты могли продолжать использовать эту аренду. Арендные платежи также можно вернуть следующими способами:
public void returnAll(Collection<Lease> leases)
public void returnLease(Lease lease)
Обратите внимание, что вы можете запрашивать несколько аренд одновременно, и если Semaphore не имеет достаточного количества текущих лизингов, поток запроса будет заблокирован. Он также предоставляет перегруженные методы для тайм-аута:
public Lease acquire()
public Collection<Lease> acquire(int qty)
public Lease acquire(long time, TimeUnit unit)
public Collection<Lease> acquire(int qty, long time, TimeUnit unit)
Основные классы:
InterProcessSemaphoreV2
Lease
SharedCountReader
5) Несколько объектов блокировки Multi Shared Lock
Multi Shared Lock — это контейнер замков. Когда вызывается методAcquire, все блокировки будут получены, а если запрос не выполнен, все блокировки будут сняты. Также все блокировки снимаются при вызове освобождения (сбои игнорируются). По сути, это представитель групповой блокировки, и операция снятия запроса на него будет передана всем содержащимся в нем блокировкам. В основном участвуют два класса:
InterProcessMultiLock
InterProcessLock
Его конструктор принимает набор содержащихся блокировок или набор путей ZooKeeper.
public InterProcessMultiLock(List<InterProcessLock> locks)
public InterProcessMultiLock(CuratorFramework client, List<String> paths)
забор барьер
1) Параметр барьера в конструкторе DistributedBarrier используется для определения барьера, если параметр барьера тот же (путь тот же), это тот же барьер. Обычно забор используется следующим образом:
1. Ведение клиента к установке забора
2. Другие клиенты будут вызывать waitOnBarrier(), чтобы дождаться удаления барьера, и поток обработки программы будет заблокирован.
3. Ведущий клиент снимает забор, а обработчики других клиентов продолжают работать одновременно.
Основные методы класса DistributedBarrier следующие:
setBarrier() - установить барьер
waitOnBarrier() - дождаться снятия барьера
removeBarrier() — удаляет барьер
2) Двойной барьер
Двойные барьеры позволяют клиентам синхронизироваться в начале и в конце вычислений. Когда к двойному ограждению добавляется достаточное количество процессов, процесс начинает вычисление, а когда вычисление завершено, он покидает ограждение. Класс двойного забора — DistributedDoubleBarrier.
Класс DistributedDoubleBarrier реализует функцию двойного барьера. Его конструктор выглядит следующим образом:
// client - the client
// barrierPath - path to use
// memberQty - the number of members in the barrier
public DistributedDoubleBarrier(CuratorFramework client, String barrierPath, int memberQty)
memberQty — это количество участников, когда вызывается метод ввода, члены блокируются до тех пор, пока все члены не вызовут ввод. Когда вызывается метод leave, он также блокирует вызывающий поток до тех пор, пока все члены не вызовут метод leave.
Примечание. Значение параметра memberQty является только пороговым, а не предельным значением. Когда количество ожидающих ограждений больше или равно этому значению, ограждение откроется!
Как и забор (DistributedBarrier), параметр барьераPath двойного забора также используется для определения того, является ли он одним и тем же забором.Двойной забор используется следующим образом:
1. Создайте DistributedDoubleBarrier на одном и том же пути от нескольких клиентов, затем вызовите метод enter() и подождите, пока число барьеров не достигнет memberQty, чтобы войти в барьер.
2. Когда количество ограничений достигает memberQty, несколько клиентов прекращают блокировку одновременно и продолжают работать до тех пор, пока не будет выполнен метод leave(), ожидая, пока число ограничений memberQty не заблокируется в методе leave() одновременно. время.
3. Количество заборов MemberQty блокируется в методе leave() одновременно, а методы leave() нескольких клиентов перестают блокироваться и продолжают работать.
Основные методы класса DistributedDoubleBarrier следующие:
enter(), enter(long maxWait, TimeUnit unit) - ждать входа в забор
leave(), leave(long maxWait, TimeUnit unit) - ожидание выхода из забора
Обработка исключений: DistributedDoubleBarrier будет отслеживать состояние соединения, а методы enter() и leave будут генерировать исключения при разрыве соединения.
Счетчики
С помощью ZooKeeper можно реализовать общий счетчик кластера. Пока используется один и тот же путь, можно получить последнее значение счетчика, что гарантируется согласованностью ZooKeeper. Куратор имеет два счетчика, один считается по int, а другой по long.
1) Общее количество
Этот класс использует тип int для подсчета. В основном задействованы три категории.
* SharedCount
* SharedCountReader
* SharedCountListener
SharedCount представляет счетчик, и к нему можно добавить SharedCountListener.При изменении счетчика этот Listener может прослушивать измененное событие, а SharedCountReader может считывать последнее значение, включая буквальное значение и значение VersionedValue с информацией о версии.
2) Распределенный атомиклонг
За исключением того, что диапазон счетчика больше, чем SharedCount, он сначала пытается установить счетчик с помощью оптимистической блокировки.Если это не удается (например, счетчик был обновлен другими клиентами в течение периода), он использует метод InterProcessMutex для обновить значение счетчика. Этот счетчик имеет ряд операций:
- get(): получить текущее значение
- increment(): добавить один
- decrement(): уменьшить на единицу
- add(): добавить конкретное значение
- subtract(): вычитание определенного значения
- trySet(): попробуйте установить значение счетчика
- forceSet(): принудительно установить значение счетчика
Вы должны проверить возвращаемый результат Successed(), который показывает, была ли операция успешной или нет. Если операция выполнена успешно, preValue() представляет значение до операции, а postValue() представляет значение после операции.
End
Куратор абстрагирует и упрощает многие сложные операции зоопарка, что является благом для пользователей zk. А чтобы быть полностью счастливым, нужно перестать им пользоваться.
Я не знаю, куда другие люди кладут zk, но после того, как я прикоснулся к протоколу paxos, мне было сложно проявить к нему сильный интерес. Обычно в техническом отборе он будет в конце моего списка альтернатив, и я даже не могу уловить непонятную логику в исходном коде.
Но инженерное строительство никогда не измеряется нашими предпочтениями. никогда не было.
Об авторе:Мисс сестра вкус(xjjdog), публичная учетная запись, которая не позволяет программистам идти в обход. Сосредоточьтесь на инфраструктуре и Linux. Десять лет архитектуры, десятки миллиардов ежедневного трафика, обсуждение с вами мира высокой параллелизма, дающие вам другой вкус. Мой личный WeChat xjjdog0, добро пожаловать в друзья для дальнейшего общения.