предисловие
В последнее время я стал говном, и не могу выучить серии одну за другой.Большинство из них связаны с основополагающими принципами. В прошлые выходные я нашел время, чтобы перечитать часть Чжоу Чжимина об эффективном параллелизме JVM, и каждый раз, когда я читал ее, у меня было другое чувство. Впереди долгий путь, так что давайте взглянем на распределенные блокировки в случае шипа, с которым я играл некоторое время назад.
Введение в дело
Прежде чем пытаться понять распределенные блокировки, вы можете представить, в каких сценариях будут использоваться распределенные блокировки?
В автономной архитектуре приложения в случае seckill используется ReentrantLcok или синхронизировано для обеспечения взаимного исключения товаров seckill. Однако в распределенной системе будет несколько компьютеров, параллельно выполняющих одну и ту же функцию. Другими словами, в многопроцессном, если блокировка процесса, предоставляемая вышеупомянутым JDK, также используется для одновременного доступа к ресурсам базы данных, может возникнуть ситуация перепроданности. Поэтому нам нужно реализовать собственную распределенную блокировку.
Особенности, которыми должна обладать распределенная блокировка:
-
Высокодоступный, высокопроизводительный захват и снятие блокировки
-
В распределенной системной среде метод или переменная могут управляться только одним потоком за раз.
-
С механизмом сбоя блокировки, когда сеть прервана или блокировка не может быть снята, блокировку необходимо удалить, чтобы предотвратить взаимоблокировку.
-
Он имеет характеристику блокирующей блокировки, то есть, если блокировка не получена, он продолжает ожидать ее получения.
-
Он имеет функцию неблокирующей блокировки, то есть, если блокировка не получена, он напрямую возвращает отказ в получении блокировки.
-
Функция реентерабельности позволяет потоку получать одну и ту же блокировку несколько раз. Например, когда поток выполняет метод с блокировкой, а метод вызывает другой метод, требующий такой же блокировки, поток может напрямую выполнить вызывающий метод. без повторного получения блокировки
В предыдущем случае seckill мы представили несколько реализаций распределенных блокировок:
- Распределенная блокировка на основе базы данных
- Распределенная блокировка на основе Redis
- Распределенная блокировка на основе Zookeeper
Первые два особенно не рекомендуются для распределенных производственных сред: производительность блокировки базы данных слишком низкая при высоком параллелизме, а у Redis есть определенные проблемы с ограничением времени блокировки и согласованностью кэша. Здесь мы сосредоточимся на том, как Zookeeper реализует распределенные блокировки.
Принцип реализации
ZooKeeper — это распределенная служба координации распределенных приложений с открытым исходным кодом, которая имеет иерархическую структуру дерева каталогов файловой системы внутри, которая предусматривает, что в одном каталоге могут существовать только уникальные имена файлов.
модель данных
-
ПОСТОЯННЫЙ постоянный узел, после создания узла он не исчезнет из-за сбоя сеанса
-
Временные узлы EPHEMERAL, такие узлы будут автоматически удалены по истечении времени ожидания сеанса клиента.
-
EPHEMERAL_SEQUENTIAL Временные узлы с автоматической нумерацией
-
PERSISTENT_SEQUENTIAL последовательность автоматически нумерованных постоянных узлов, которая будет автоматически увеличиваться на 1 в соответствии с количеством существующих узлов.
наблюдатель
При создании узла вы можете зарегистрировать монитор для узла.При изменении состояния узла и срабатывании часов ZooKeeper отправит клиенту только одно уведомление, потому что часы могут быть запущены только один раз.
В соответствии с этими функциями zookeeper, давайте посмотрим, как использовать эти функции для реализации распределенных блокировок:
-
Создать блокировку каталога блокировки
-
Поток A получает блокировку и создает временный последовательный узел в каталоге блокировки.
-
Получите все дочерние узлы в каталоге блокировки, а затем получите одноуровневые узлы меньшего размера, чем вы.Если он не существует, это означает, что текущий порядковый номер потока является наименьшим, и блокировка получена.
-
Поток B создает временный узел и получает все узлы-братья, это не минимальный узел, установите монитор, чем маленький узел, чем вы (уделяя внимание только узлу, чем вы, чтобы предотвратить «эффект стада»)
-
Поток A завершает обработку и удаляет свой собственный узел, поток B прослушивает событие изменения, определяет, что это наименьший узел, и получает блокировку.
анализ кода
Хотя ZooKeeper инкапсулирует сложные и подверженные ошибкам службы ключей, он предоставляет пользователям простой в использовании интерфейс и систему с высокой производительностью и стабильными функциями. Тем не менее, обычному разработчику все еще сложно передать распределенную блокировку, поэтому в случае шипа мы напрямую используем куратор с открытым исходным кодом Apache для реализации распределенной блокировки Zookeeper.
Здесь мы используем следующие версии, последняя версия 4.0.1 на данный момент:
<!-- zookeeper 分布式锁、注意zookeeper版本 这里对应的是3.4.6-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.10.0</version>
</dependency>
скопировать код
Сначала рассмотрим несколько методов в интерфейсе InterProcessLock:
/**
* 获取锁、阻塞等待、可重入
*/
public void acquire() throws Exception;
/**
* 获取锁、阻塞等待、可重入、超时则获取失败
*/
public boolean acquire(long time, TimeUnit unit) throws Exception;
/**
* 释放锁
*/
public void release() throws Exception;
/**
* Returns true if the mutex is acquired by a thread in this JVM
*/
boolean isAcquiredInThisProcess();
скопировать код
Получите замок:
//获取锁
public void acquire() throws Exception
{
if ( !internalLock(-1, null) )
{
throw new IOException("Lost connection while trying to acquire lock: " + basePath);
}
}
скопировать код
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
/*
实现同一个线程可重入性,如果当前线程已经获得锁,
则增加锁数据中lockCount的数量(重入次数),直接返回成功
*/
//获取当前线程
Thread currentThread = Thread.currentThread();
//获取当前线程重入锁相关数据
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
//原子递增一个当前值,记录重入次数,后面锁释放会用到
lockData.lockCount.incrementAndGet();
return true;
}
//尝试连接zookeeper获取锁
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{
//创建可重入锁数据,用于记录当前线程重入次数
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
//获取锁超时或者zk通信异常返回失败
return false;
}
скопировать код
Zookeeper получает реализацию блокировки:
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
//获取当前时间戳
final long startMillis = System.currentTimeMillis();
//如果unit不为空(非阻塞锁),把当前传入time转为毫秒
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
//子节点标识
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
//尝试次数
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
//自旋锁,循环获取锁
while ( !isDone )
{
isDone = true;
try
{
//在锁节点下创建临时且有序的子节点,例如:_c_008c1b07-d577-4e5f-8699-8f0f98a013b4-lock-000000001
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
//如果当前子节点序号最小,获得锁则直接返回,否则阻塞等待前一个子节点删除通知(release释放锁)
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
catch ( KeeperException.NoNodeException e )
{
//异常处理,如果找不到节点,这可能发生在session过期等时,因此,如果重试允许,只需重试一次即可
if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
{
isDone = false;
}
else
{
throw e;
}
}
}
//如果获取锁则返回当前锁子节点路径
if ( hasTheLock )
{
return ourPath;
}
return null;
}
скопировать код
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
boolean haveTheLock = false;
boolean doDelete = false;
try
{
if ( revocable.get() != null )
{
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
//自旋获取锁
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
//获取所有子节点集合
List<String> children = getSortedChildren();
//判断当前子节点是否为最小子节点
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
//如果是最小节点则获取锁
if ( predicateResults.getsTheLock() )
{
haveTheLock = true;
}
else
{
//获取前一个节点,用于监听
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
try
{
//这里使用getData()接口而不是checkExists()是因为,如果前一个子节点已经被删除了那么会抛出异常而且不会设置事件监听器,而checkExists虽然也可以获取到节点是否存在的信息但是同时设置了监听器,这个监听器其实永远不会触发,对于Zookeeper来说属于资源泄露
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
//如果设置了获取锁等待时间
if ( millisToWait <= 0 )
{
doDelete = true; // 超时则删除子节点
break;
}
//等待超时时间
wait(millisToWait);
}
else
{
wait();//一直等待
}
}
catch ( KeeperException.NoNodeException e )
{
// it has been deleted (i.e. lock released). Try to acquire again
//如果前一个子节点已经被删除则deException,只需要自旋获取一次即可
}
}
}
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
}
finally
{
if ( doDelete )
{
deleteOurPath(ourPath);//获取锁超时则删除节点
}
}
return haveTheLock;
}
скопировать код
Снимите блокировку:
public void release() throws Exception
{
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
//没有获取锁,你释放个球球,如果为空抛出异常
if ( lockData == null )
{
throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
}
//获取重入数量
int newLockCount = lockData.lockCount.decrementAndGet();
//如果重入锁次数大于0,直接返回
if ( newLockCount > 0 )
{
return;
}
//如果重入锁次数小于0,抛出异常
if ( newLockCount < 0 )
{
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
}
try
{
//释放锁
internals.releaseLock(lockData.lockPath);
}
finally
{
//移除当前线程锁数据
threadData.remove(currentThread);
}
}
скопировать код
Прецедент
Чтобы лучше понять его принцип и процесс получения блокировок при анализе кода, здесь мы реализуем простую демонстрацию:
/**
* 基于curator的zookeeper分布式锁
*/
public class CuratorUtil {
private static String address = "192.168.1.180:2181";
public static void main(String[] args) {
//1、重试策略:初试时间为1s 重试3次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//2、通过工厂创建连接
CuratorFramework client = CuratorFrameworkFactory.newClient(address, retryPolicy);
//3、开启连接
client.start();
//4 分布式锁
final InterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock");
//读写锁
//InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, "/readwriter");
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
fixedThreadPool.submit(new Runnable() {
@Override
public void run() {
boolean flag = false;
try {
//尝试获取锁,最多等待5秒
flag = mutex.acquire(5, TimeUnit.SECONDS);
Thread currentThread = Thread.currentThread();
if(flag){
System.out.println("线程"+currentThread.getId()+"获取锁成功");
}else{
System.out.println("线程"+currentThread.getId()+"获取锁失败");
}
//模拟业务逻辑,延时4秒
Thread.sleep(4000);
} catch (Exception e) {
e.printStackTrace();
} finally{
if(flag){
try {
mutex.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
});
}
}
}
скопировать код
Здесь мы запускаем 5 потоков, и максимальное время ожидания каждого потока для получения блокировки составляет 5 секунд.Для моделирования конкретного бизнес-сценария в методе установлено время ожидания 4 секунды. Начните выполнять основной метод и наблюдайте за узлами в /curator/lock через ZooInspector, как показано ниже:
Да, верно, продолжительность бизнес-обработки 4 секунды установлена для наблюдения за генерацией нескольких последовательных узлов. Конечно же, как описано в случае, каждый поток генерирует узел и по-прежнему упорядочен.
Взглянув на консоль, мы обнаружим, что только два потока успешно получают блокировку, а остальные три потока автоматически удалят узел, если им не удастся получить блокировку с течением времени. После выполнения потока мы обновляем узел /curator/lock и обнаруживаем, что пять только что созданных дочерних узлов больше не существуют.
резюме
Анализируя метод распределенной блокировки, реализованный сторонними инструментами с открытым исходным кодом, урожай все еще полон. Само обучение — это процесс от поверхностного к глубокому, от того, как вызывать API, до понимания реализации логики его кода, если вы хотите углубиться, вы можете покопаться в протоколе ZAB, основном алгоритме Zookeeper.
Наконец, чтобы облегчить обучение, мы суммируем несколько ключевых слов, встречающихся в процессе обучения: реентерабельная блокировка, спин-блокировка, упорядоченный узел, блокировка, неблокировка, мониторинг и надежда помочь всем.
Кейс Seckill: https://gitee.com/52itstyle/spring-boot-seckillСсылаться на
https://yq.aliyun.com/articles/60663
http://www.hollischuang.com/archives/1716
http://www.cnblogs.com/sunddenly/p/4033574.html
http://ifeve.com/zookeeper-lock/