предисловие
ПредыдущийВысокий параллелизм, начиная с 0 (1) --- основная концепция zookeeperВ конце мы оставили распределенную блокировку, которая гарантирует, что мы можем запланировать многоузловое приложение и решить проблему согласованности данных в распределенной среде.
Например, у нас сейчас есть такой кластер.В кластере есть служба кеша.Каждая программа в кластере будет использовать этот кеш.Если срок действия одного из кешей в это время истечет, в большой параллельной среде многие все службы приходят к получить доступ к кешу, получить данные в кеше и обнаружить, что кеш устарел, затем перейти к базе данных, чтобы получить его, а затем обновить его в службе кеша. Но на самом деле нам нужен только запрос в базу данных для обновления кеша, а дальше при таком раскладе что делать?
Мы ссылаемся на этот метод блокировки в многопоточном сценарии, в текущем параллельном сценарии нам также необходимо реализовать блокировку.
Используйте Zookeeper для разработки
1. Особенности замков и родного зоозащитника
① Каковы характеристики обычных замков?
排他(互斥)性:只有一个线程能获取到
文件系统(同一个文件不支持多个人去修改)
数据库:主键唯一约束 for update
缓存:redis setnx命令
zookeeper:类似文件系统
阻塞性:其他未抢到的线程阻塞,直到锁被释放再进行抢这个行为
可重入性:线程获取锁后,后续是否可重复获得该锁
② Почему zookeeper можно использовать для реализации замков
同一个父目录下面不能有相同的子节点,这就是zookeeper的排他性
通过JDK的栅栏来实现阻塞性
可重入性我们可以通过计数器来实现
③ Какие проблемы с родным зоопарком
1.接口难以使用
2.连接zookeeper超时不支持自动重连
3.watch注册一次会失效,需要反复注册
4.不支持递归创建节点(递归创建的话,比方说我要创建一个文件,假如我在idea创建,那我可以连带着包一起创建,但是在window我就做不到,这种整一个路径一并创建下来的就可以视为递归创建)
5.需要手动设置序列化的问题
④ Создайте основной класс клиента: Zookeeper
org.apache.zookeeper
org.apache.zookeeper.data
connect---连接到zookeeper集合
create---创建znode
exist---检查znode是否存在及其信息
getData---从特定的znode获取数据
setData---从特定的znode设置数据
getChildren---获取特定znode中的所有子节点
delete===删除特定znode及其所有子项
close---关闭连接
2. Используйте сторонний клиент zkClient для упрощения работы
① Реализовать интерфейс сериализации ZkSerializer.
MyZkSerializer.java
public class MyZkSerializer implements ZkSerializer {
//正常来说我们还需要进行一个非空判断,这里为了省事没做,不过严格来说是需要做的
//就是简单的转换
@Override
public byte[] serialize(Object data) throws ZkMarshallingError {
String d = (String) data;
try {
return d.getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return null;
}
@Override
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
try {
return new String(bytes, "UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return null;
}
}
② Простое использование zkclient
ZkClientDemo.java
public class ZkClientDemo {
public static void main(String[] args) {
// 创建一个zk客户端
ZkClient client = new ZkClient("localhost:2181");
//实现序列化接口
client.setZkSerializer(new MyZkSerializer());
//创建一个节点zk,在zk节点下再创建一个子节点app6,赋值123
//在之前也已经提到了,zookeeper中的节点既是文件夹也是文件
//源码中CreateMode是一个枚举,CreateMode.PERSISTENT---当客户端断开连接时,znode不会自动删除
client.create("/zk/app6", "123", CreateMode.PERSISTENT);
client.subscribeChildChanges("/zk/app6", new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println(parentPath+"子节点发生变化:"+currentChilds);
}
});
//这里开始是创建一个watch,但是为什么这个方法会命名为subscribeDataChanges()呢,原因是:
//原本watch的设置然后获取是仅一次性的,现在我们使用subscribe这个英文,代表订阅,代表这个watch一直存在
//使用这个方法我们可以轻易实现持续监听的效果,比原生zookeeper方便
client.subscribeDataChanges("/zk/app6", new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println(dataPath+"节点被删除");
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.println(dataPath+"发生变化:"+data);
}
});
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
результат операции
Вызовите ls /zk --- вы обнаружите, что приложение 6 было создано,
Значение 123, которое мы установили, можно получить через get /zk/app6---
Это означает, что наша программа не имеет проблем и может быть успешно выполнена.
Этот тест прослушивает события
При создании /zk/app6/tellYourDream --- консоль выводит изменения дочернего узла /zk/app6: [tellYourDream]
удалить /zk/app6/tellYourDream --- консоль выводит изменения дочернего узла /zk/app6: [], в настоящее время узла нет, поэтому он пуст
установить /zk/app6 123456 --- /zk/app6 изменено: 123456
удалить /zk/app6 --- Одновременно запускаются два события прослушивания, дочерний узел /zk/app6 изменяется: null и узел /zk/app6 удаляется
③ Дополнение CreateMode
1. Постоянный узел: узел существует вечно без его удаления. и может создавать дочерние узлы
/**
* The znode will not be automatically deleted upon client's disconnect.
* 持久无序
*/
PERSISTENT (0, false, false),
/**
* The znode will not be automatically deleted upon client's disconnect,
* and its name will be appended with a monotonically increasing number.
* 持久有序
*/
PERSISTENT_SEQUENTIAL (2, false, true),
2. Непостоянный узел, другими словами, временный узел. Временный узел создается при подключении клиента. Когда клиент приостанавливает работу, временный узел автоматически удаляется. не может создавать дочерние узлы
/**
* The znode will be deleted upon the client's disconnect.
* 临时无序
*/
EPHEMERAL (1, true, false),
/**
* The znode will be deleted upon the client's disconnect, and its name
* will be appended with a monotonically increasing number.
* 临时有序
*/
EPHEMERAL_SEQUENTIAL (3, true, true);
Есть и другие методы мониторинга, которые мы можем попробовать сами.
3. Zookeeper реализует распределенные блокировки
① Zookeeper реализует первый метод распределенной блокировки
Ранее мы упоминали, что имена узлов одного и того же дочернего узла в zookeeper не могут совпадать, и мы можем использовать это взаимное исключение для реализации инструмента распределенной блокировки.
Временный узел существует при его создании.При его исчезновении узел автоматически удаляется.При потере клиентом связи, нестабильности сети или сбоях блокировка, созданная временным узлом, будет устранена сама собой. Таким образом, можно полностью избежать проблемы взаимоблокировки. Поэтому мы используем эту функцию для достижения наших нужд.
Принцип собственно в том, что узел нельзя переименовать + смотреть механизм.
Например, если наша программа имеет несколько экземпляров службы, какой экземпляр службы создаст узел блокировки, тот, кто его создаст, получит блокировку, а остальные приложения, которые мы не создали, будут отслеживать узел блокировки, если узел блокировки Если он удаляется, в это время могут быть две ситуации: во-первых, клиент не может подключиться, а во-вторых, клиент снимает блокировку и удаляет узел блокировки.
ZkDistributeLock.java (обратите внимание, что методы, которые не нужно переопределять, были удалены)
public class ZkDistributeLock implements Lock {
//我们需要一个锁的目录
private String lockPath;
//我们需要一个客户端
private ZkClient client;
//刚刚我们的客户端和锁的目录,这两个参数怎么传进来?
//那就需要我们的构造函数来进行传值
public ZkDistributeLock(String lockPath) {
if(lockPath ==null || lockPath.trim().equals("")) {
throw new IllegalArgumentException("patch不能为空字符串");
}
this.lockPath = lockPath;
client = new ZkClient("localhost:2181");
client.setZkSerializer(new MyZkSerializer());
}
Методы, которые необходимо переписать для реализации интерфейса блокировки (включая попытку создания временного узла tryLock(), разблокировку разблокировки(), блокировку блокировки() и ожиданиеForLock() для получения функциональных методов блокировки и пробуждения)
// trylock方法我们是会尝试创建一个临时节点
@Override
public boolean tryLock() { // 不会阻塞
// 创建节点
try {
client.createEphemeral(lockPath);
} catch (ZkNodeExistsException e) {
return false;
}
return true;
}
@Override
public void unlock() {
client.delete(lockPath);
}
@Override
public void lock() {
// 如果获取不到锁,阻塞等待
if (!tryLock()) {
// 没获得锁,阻塞自己
waitForLock();
// 从等待中唤醒,再次尝试获得锁
lock();
}
}
private void waitForLock() {
final CountDownLatch cdl = new CountDownLatch(1);
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("----收到节点被删除了-------------");
//唤醒阻塞线程
cdl.countDown();
}
@Override
public void handleDataChange(String dataPath, Object data)
throws Exception {
}
};
client.subscribeDataChanges(lockPath, listener);
// 阻塞自己
if (this.client.exists(lockPath)) {
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 取消注册
client.unsubscribeDataChanges(lockPath, listener);
}
}
ZkDistributeLock Теперь подведем итоги процесса
获取锁,创建节点后
1.成功获取到的---执行业务---然后释放锁
|
|
|
2.获取失败,注册节点的watch---阻塞等待---取消watch---再回到获取锁,创建节点的判断
Этот дизайн будет иметь недостаток.Например, есть бесчисленное количество моих экземпляров.В это время каждый раз, когда наша блокировка создается, после того, как кто-то получает блокировку, другие люди будут уведомлены и заблокированы.В это время мы теряем много сетевых ресурсов, то есть шокирующий стадный эффект.
На данный момент мы должны оптимизировать
② Zookeeper реализует метод распределенной блокировки 2
Как znode, наш Lock также может создавать принадлежащие ему дочерние узлы.Мы используем lock для создания временных последовательных узлов.Мы находимся вВысокий параллелизм, начиная с 0 (1) --- основная концепция zookeeperКак упоминалось выше, zookeeper упорядочен, и временные последовательные узлы будут автоматически сортироваться от малых к большим.В это время мы назначаем экземпляры этим последовательным дочерним узлам, а затем получаем блокировку с наименьшим номером. Это очень похоже на нашу концепцию честных блокировок и также следует принципу FIFO.
Принцип: взять номер + минимальное количество взять замок + смотреть
Также на основе реализации интерфейса Lock
ZkDistributeImproveLock.java (обратите внимание, что методы, которые не нужно переопределять, были удалены)
public class ZkDistributeImproveLock implements Lock {
/*
* 利用临时顺序节点来实现分布式锁
* 获取锁:取排队号(创建自己的临时顺序节点),然后判断自己是否是最小号,如是,则获得锁;不是,则注册前一节点的watcher,阻塞等待
* 释放锁:删除自己创建的临时顺序节点
*/
//同样的锁目录
private String lockPath;
//同样的客户端
private ZkClient client;
private ThreadLocal<String> currentPath = new ThreadLocal<String>();
private ThreadLocal<String> beforePath = new ThreadLocal<String>();
// 锁重入计数器
private ThreadLocal<Integer> reenterCount = ThreadLocal.withInitial(()->0);
public ZkDistributeImproveLock(String lockPath) {
if(lockPath == null || lockPath.trim().equals("")) {
throw new IllegalArgumentException("patch不能为空字符串");
}
this.lockPath = lockPath;
client = new ZkClient("localhost:2181");
client.setZkSerializer(new MyZkSerializer());
if (!this.client.exists(lockPath)) {
try {
this.client.createPersistent(lockPath, true);
} catch (ZkNodeExistsException e) {
}
}
}
@Override
public boolean tryLock() {
System.out.println(Thread.currentThread().getName() + "-----尝试获取分布式锁");
if (this.currentPath.get() == null || !client.exists(this.currentPath.get())) {
//这里就是先去创建了一个临时顺序节点,在lockpath那里创建
//用银行取号来表示这个行为吧,相当于每个实例程序先去取号,然后排队等着叫号的场景
String node = this.client.createEphemeralSequential(lockPath + "/", "locked");
//记录第一个节点编号
currentPath.set(node);
reenterCount.set(0);
}
// 获得所有的号
List<String> children = this.client.getChildren(lockPath);
// 把这些号进行排序
Collections.sort(children);
// 判断当前节点是否是最小的,和第一个节点编号做对比
if (currentPath.get().equals(lockPath + "/" + children.get(0))) {
// 锁重入计数
reenterCount.set(reenterCount.get() + 1);
System.out.println(Thread.currentThread().getName() + "-----获得分布式锁");
return true;
} else {
// 取到前一个
// 得到字节的索引号
int curIndex = children.indexOf(currentPath.get().substring(lockPath.length() + 1));
String node = lockPath + "/" + children.get(curIndex - 1);
beforePath.set(node);
}
return false;
}
@Override
public void lock() {
if (!tryLock()) {
// 阻塞等待
waitForLock();
// 再次尝试加锁
lock();
}
}
private void waitForLock() {
final CountDownLatch cdl = new CountDownLatch(1);
// 注册watcher
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println(Thread.currentThread().getName() + "-----监听到节点被删除,分布式锁被释放");
cdl.countDown();
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
}
};
client.subscribeDataChanges(this.beforePath.get(), listener);
// 怎么让自己阻塞
if (this.client.exists(this.beforePath.get())) {
try {
System.out.println(Thread.currentThread().getName() + "-----分布式锁没抢到,进入阻塞状态");
cdl.await();
System.out.println(Thread.currentThread().getName() + "-----释放分布式锁,被唤醒");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 醒来后,取消watcher
client.unsubscribeDataChanges(this.beforePath.get(), listener);
}
@Override
public void unlock() {
System.out.println(Thread.currentThread().getName() + "-----释放分布式锁");
if(reenterCount.get() > 1) {
// 重入次数减1,释放锁
reenterCount.set(reenterCount.get() - 1);
return;
}
// 删除节点
if(this.currentPath.get() != null) {
this.client.delete(this.currentPath.get());
this.currentPath.set(null);
this.reenterCount.set(0);
}
}
ps: не беспокойтесь о заполнении памяти, JVM выполнит сборку мусора
4. Более простой сторонний клиент --- Куратор
Не буду здесь распространяться о кураторе.Если вам интересно, то можете поиграть сами.
адрес:curator.apache.org/curator - отвратительно...
Есть реализации для избрания лидеров, блокировки, добавления, удаления, модификации и проверки фреймворков и т.д.
finally
Кажется, что прошло много времени с момента последнего обновления, в том числе потому, что я был занят на прошлой неделе и не мог найти времени, и я буду продолжать обновлять его каждую неделю (стараюсь изо всех сил)
Далее: Высокий параллелизм с нуля (3) --- Выборы лидера кластера Zookeeper