AQS подробное объяснение параллелизма Java

Java задняя часть

читать с вопросами

1. Что такое AQS, какова его роль и какова основная идея?

2. Каковы принципы эксклюзивных и общих блокировок в AQS? Является ли механизм блокировки, предоставляемый AQS, справедливым или несправедливым?

3. Какие есть реализации AQS на Java и как реализовать собственное управление блокировками на основе AQS

4. Какие еще возможности предоставляет AQS в дополнение к платформе блокировки?

Введение в AQS

AbstractQueuedSynchronizer(AQS)Предоставляет платформу, которую можно использовать для реализации механизмов синхронизации блокировок. Не будет преувеличением сказать, чтоAQSдаJUCКраеугольный камень структуры синхронизации.AQSчерезFIFOОчередь поддерживает состояние синхронизации потока. Классу реализации нужно только наследовать этот класс и переопределить указанный метод, чтобы реализовать набор механизмов синхронизации потока.

AQSПредоставляется в соответствии с уровнем взаимного исключения ресурсовэксклюзивный и общийДва режима доступа к ресурсам и их определенияConditionструктура обеспечиваетwait/signalДождитесь срабатывания механизма пробуждения. существуетJUCв, напримерReentrantLock,CountDownLatchи т.д. основаны наAQSвыполнить.

Платформа AQS

Принцип AQS

AQSПринцип не сложныйAQSподдерживалvolatile int stateпеременная иCLH(三个人名缩写)双向队列, узлы в очереди содержат ссылки на потоки, и каждый узел может передаватьgetState(),setState()иcompareAndSetState()правильноstateдля изменения и доступа. ·

Когда поток получает блокировку, он пытаетсяstateЕсли переменная модифицируется, блокировка устанавливается, если модификация прошла успешно; если модификация не удалась, она упаковывается как узел и монтируется в очередь, ожидая, пока поток, удерживающий блокировку, освободит блокировку и пробудит узел в очередь.

Метод шаблона AQS

AQSЛогика обслуживания очереди инкапсулирована внутри, а метод шаблона используется для предоставления следующих методов класса реализации:

tryAcquire(int);        // 尝试获取独占锁,可获取返回true,否则false
tryRelease(int);        // 尝试释放独占锁,可释放返回true,否则false
tryAcquireShared(int);  // 尝试以共享方式获取锁,失败返回负数,只能获取一次返回0,否则返回个数
tryReleaseShared(int);  // 尝试释放共享锁,可获取返回true,否则false
isHeldExclusively();    // 判断线程是否独占资源

Если класс реализации должен реализовать только функцию монопольной/разделяемой блокировки, он может реализовать толькоtryAcquire/tryReleaseилиtryAcquireShared/tryReleaseShared. Хотя понялtryAcquire/tryReleaseВы можете задать свою логику, но рекомендуется использоватьstateпара методовstateпеременные для работы для реализации синхронизированных классов.

Ниже приведен пример простой реализации блокировки синхронизации:

public class Mutex extends AbstractQueuedSynchronizer {
    
    @Override
    public boolean tryAcquire(int arg) {
        return compareAndSetState(0, 1);
    }
    
    @Override
    public boolean tryRelease(int arg) {
        return compareAndSetState(1, 0);
    }
    
    public static void main(String[] args) {
        final Mutex mutex = new Mutex();
        
        new Thread(() -> {
            System.out.println("thread1 acquire mutex");
            mutex.acquire(1);
            // 获取资源后sleep保持
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch(InterruptedException ignore) {
                
            }
            mutex.release(1);
            System.out.println("thread1 release mutex");
        }).start();
        
        new Thread(() -> {
            // 保证线程2在线程1启动后执行
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch(InterruptedException ignore) {
                
            }
            // 等待线程1 sleep结束释放资源
            mutex.acquire(1);
            System.out.println("thread2 acquire mutex");
            mutex.release(1);
        }).start()
    }
}

Пример кода легко передатьAQSРеализовать операцию взаимного исключения, поток 1 получаетmutexПосле этого нить 2-хacquireЗастрял в блокировке, пока поток 1 не освободится. вtryAcquire/acquire/tryRelease/releaseизargПараметры можно настраивать в соответствии с логикой реализации, особых требований нет.

@param arg the acquire argument. This value is conveyed to {@link #tryAcquire} but is otherwise uninterpreted and can represent anyting you like.

Основная структура AQS

Node

Как упоминалось выше, вAQSЕсли потоку не удастся получить ресурсы, он будет упакован как узел и смонтирован вCLHв очереди,AQSопределено вNodeКлассы используются для переноса потоков.

NodeВ основном он содержит 5 основных полей:

  • waitStatus: текущий статус узла, это поле имеет 5 значений:
    • CANCELLED = 1. Состояние потока ссылки на узел, когда время ожидания истекло или было прервано.
    • SIGNAL = -1. Текущее состояние узла, когда необходимо разбудить поток узла-преемника. Когда узел-преемник добавляется в очередь, он приостанавливается.(block), его предшествующий узел будет установлен вSIGNALСостояние, указывающее, что узел необходимо разбудить.
    • CONDITION = -2. Когда поток узла входитconditionСостояние очереди. (ВидетьConditionObject)
    • PROPAGATE = -3. снятие только общих блокировокreleaseSharedПри использовании для головного узла. (См. анализ общей блокировки)
    • 0. Состояние узла при его инициализации.
  • prev: предшествующий узел.
  • next: Узел-преемник.
  • thread: относится к потоку, головной узел не содержит потока.
  • nextWaiter:conditionОчередь условий. (ВидетьConditionObject)

Эксклюзивный анализ блокировки

acquire

public final void acquire(int arg) {
    // tryAcquire需实现类处理
    // 如获取资源成功,直接返回
    if (!tryAcquire(arg) && 
        // 如获取资源失败,将线程包装为Node添加到队列中阻塞等待
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 如阻塞线程被打断
        selfInterrupt();
}

acquireядро дляtryAcquire,addWaiterиacquireQueuedтри функции, гдеtryAcquireТребуется конкретная реализация класса. всякий раз, когда поток вызываетacquireпозвонит первымtryAcquire, он будет подключен к очереди только после сбоя, поэтомуacquireвыполнитьПо умолчанию это несправедливая блокировка.

addWaiterОберните поток как эксклюзивный узел и добавьте его в очередь вставкой хвоста.Если очередь пуста, будет добавлен пустой головной узел. Стоит отметить, чтоaddWaiterсерединаenqметод, черезCAS+自旋способ обработки конфликтов добавления хвостовых узлов.

acquireQueueПосле того, как узел потока добавлен в очередь, определяется, может ли он снова попытаться получить ресурс.Если он не может быть получен, его предшествующий узел помечается какSIGNALстатус (с указанием того, что он должен бытьunparkпроснуться), затем пройтиparkв состояние блокировки.

Ссылаясь на блок-схему,acquireQueuedОсновная логика метода заключается в следующем.for(;;)иshouldParkAfterFailedAcquire.tailНачальное состояние узла по умолчанию — 0. Когда новый узел монтируется в очередь, его предшественником является исходный узел.tailСостояние узла установлено наSIGNAL, указывая на то, что узел нужно разбудить, вернутьtrueбыл позжеparkзастрял в блоке.forцикл до тех пор, пока узел-предшественник не будетheadЗатем попытайтесь получить ресурс.

release

releaseПроцесс относительно прост. После успешного выпуска узел-преемник будет разбужен от головного узла. Если узел-преемник будет отменен, он обратится к хвосту, чтобы найти заблокированный узел, чтобы разбудить его. После пробуждения блокирующего узла он входит вacquireQueuedсерединаfor(;;)Цикл начинает новый виток конкуренции за ресурсы.

Анализ общей блокировки

acquireShared & releaseShared

public final void acquireShared(int arg) {
    // 负数表示获取共享锁失败,不同于tryAcquire的bool返回
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

acquireSharedиreleaseSharedОбщий процесс аналогичен монопольной блокировке,tryAcquireSharedПосле неудачиNode.SHAREDПодключение к хвосту блоков очереди до тех пор, пока голова очереди не разбудит его. существуетdoAcquireSharedВ отличие от эксклюзивных блокировок, поскольку общие блокировки могут быть получены несколькими потоками, после пробуждения первого блокирующего узла он пройдет.setHeadAndPropagateПрохождение пробуждает последующие блокирующие узлы.

// doAcquireShared核心代码
final Node node = addWaiter(Node.SHARED);
...
for (;;) {
    final Node p = node.predecessor();
    if (p == head) {
        int r = tryAcquireShared(arg);
        if (r >= 0) {
            // r>=0 表示获取锁成功,调整头结点并传递唤醒
            setHeadAndPropagate(node, r);
        }
    }
    ...
}

setHeadAndPropagateиdoReleaseSharedСоставить основную логику пробуждения общей блокировки.

Логика этих двух методов относительно проста и не будет расширяться, в основном дляsetheadAndPropagateЛогика оценки пробуждения нескольких узлов выполняет анализ.

ВходитьsetHeadAndPropagate, первое, что нужно уяснить, это то, что входящие параметры функцииpropagateЭто должно быть неотрицательное число, и тогда его пробуждение в основном состоит из двух логических суждений:

  • еслиpropagate > 0, что указывает на то, что можно получить несколько общих блокировок, которые можно выполнить напрямую.doReleaseSharedРазбудите заблокированные узлы.

  • еслиpropagate = 0, указывающий, что можно разбудить только текущий узел, возможны два случая:

    • h == null || h.waitStatus < 0,как правилоh != null, теперь даноh.waitStatus < 0место действия.

    • (h = head) == null || h.waitStatus < 0Последовательность выполнения сценария следующая:

Эксклюзивная блокировка, общая блокировка, сводка

1. По умолчанию эксклюзивные блокировки и общие блокировки являются недобросовестными стратегиями получения и могут быть поставлены в очередь.

2. Только один поток может получить эксклюзивную блокировку, а остальные потоки в очереди заблокированы, общая блокировка может быть получена несколькими потоками.

3. Снятие эксклюзивной блокировки пробуждает только один заблокированный узел, а общая блокировка может разбудить несколько заблокированных узлов одновременно в зависимости от доступного количества.

ConditionObject

AQSсерединаNodeПомимо формирования очереди на блокировку, такжеConditionObjectприменяется в,ConditionObjectОсновное определение:

public class ConditionObject implements Condition, java.io.Serializable {
    ... 
    private transient Node firstWaiter;
    private transient Node lastWaiter;
    ...
}

ConditionObjectпройти черезNodeтакже представляет собойFIFOочередь, тоConditionObjectзаAQSКакую функцию он обеспечивает?

public interface Condition {
    ...
    void await() throws InterruptedException;
    void signal();
    void signalAll();
    ...
}

ПроверятьConditionОпределение интерфейса, вы можете видеть, что методы, которые он определяет, такие же, как иObjectКатегорияwait/notify/notifyAllФункционал такой же.

существуетСинхронизировано в деталяхУ автора естьObjectMonitorсделал краткое введение, в том числеObjectMonitorВключают_WaitSetи_EntryListДве очереди, одна для храненияwait调用иsychronized锁竞争приостанавливает поток, покаAQSпройти черезConditionObjectтакже обеспечиваетwait/notifyмеханизм блокировки очередей.

ConditionObjectМеханизм показан выше, в условной очередиNodeиспользоватьnextWaiterСформируйте односвязный список, когда поток, удерживающий блокировку, инициируетcondition.awaitПосле вызова он будет обернут какNodeподняться наОчередь блокировки условий условия; при соответствииcondition.signalПосле срабатывания узлы в очереди условной блокировки будут разбужены и подключены кблокировка очереди блокировкисередина.ConditionObjectЛогика очереди такая же, как и вышеacquire/releaseЕсть сходства и незначительные различия, поэтому я не буду их повторять.

Ссылаться на