Серия параллельных программ на Java: потрясающий AQS (ниже)

Java

Метка: статья в публичном аккаунте "Мы все маленькие лягушки"

Condition

Внутренняя реализация ReentrantLock

закончить смотретьAQSБазовый механизм синхронизации в , давайте кратко проанализируем представленный ранееReentrantLockпринцип реализации. Давайте рассмотрим типичное использование этой явной блокировки:

Lock lock = new ReentrantLock();
lock.lock();
try {
    加锁后的代码
} finally {
    lock.unlock();     
}

ReentrantLockПервая — это явная блокировка, которая реализуетLockинтерфейс. Может быть, вы забылиLockКак выглядит интерфейс, рассмотрим его еще раз:

public interface Lock {
    void lock();
    void lockInterruptibly() throws InterruptedException;
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();
    Condition newCondition();
}

фактическиReentrantLockвнутренне определяетAQSподкласс, чтобы помочь ему реализовать функцию блокировки, потому чтоReentrantLockработает в独占模式вниз, так чтоlockметод на самом деле вызываетAQSобъектaquireметод получения состояния синхронизации,unlockметод на самом деле вызываетAQSобъектreleaseСпособ выхода из состояния синхронизации, они уже всем знакомы, поэтому повторяться не буду, давайте в общих чертахReentrantLockкод:

public class ReentrantLock implements Lock {

    private final Sync sync;    //AQS子类对象
    
    abstract static class Sync extends AbstractQueuedSynchronizer { 
        // ... 为节省篇幅,省略其他内容
    }
    
    // ... 为节省篇幅,省略其他内容
}

Итак, если мы просто напишем следующую строку кода:

Lock lock = new ReentrantLock();

Это означает созданиеReentrantLockобъект, аAQSобъект, вAQSобъект поддерживается同步队列изheadузел иtailузел, но в начальном состоянии, так как нет потоков, конкурирующих за блокировки, поэтому同步队列Он пустой, а рисунок такой:

image_1c3hf30h3bmodidrvogfe11oh2q.png-16.3kB

Предложение условия

Мы упомянули встроенную блокировку, когда говорили о межпоточном взаимодействии.wait/notifyмеханизм,等待线程Типичный код выглядит следующим образом:

synchronized (对象) {
    处理逻辑(可选)
    while(条件不满足) {
        对象.wait();
    }
    处理逻辑(可选)
}

Типичный код потока уведомлений выглядит следующим образом:

synchronized (对象) {
    完成条件
    对象.notifyAll();、
}

То есть, когда поток не может быть удовлетворен из-за определенного условия, он может вызвать объект блокировки, удерживая блокировку.waitметод, то поток снимет блокировку и войдет в очередь ожидания, связанную с объектом блокировки, для ожидания; если поток завершает условие ожидания, то вызывает метод блокировки, удерживая ту же блокировкуnotifyилиnotifyAllМетод пробуждает потоки, ожидающие в очереди ожидания, связанной с этим объектом блокировки.

Суть явной блокировки на самом деле черезAQSОбъект получает и освобождает состояние синхронизации, а реализация встроенной блокировки инкапсулируется в виртуальной машине Java.Мы не говорили, что реализация этих двух различна.. иwait/notifyмеханизм работает только со встроенными замками, в显式锁Нам нужно определить другой набор подобных механизмов, и нам нужно сделать это ясным, когда мы определяем этот механизм:Когда поток, который получает блокировку, не соответствует определенному условию, в какую очередь ожидания он должен войти и когда следует снять блокировку?Если поток выполняет условие ожидания, как его можно удалить из соответствующей очереди ожидания, удерживая такая же блокировка? Удалить ожидающий поток из очереди в.

Чтобы определить эту очередь ожидания, дяди, которые разработали java, находятся вAQSдобавлено имяConditionObjectЧлен внутреннего класса:

public abstract class AbstractQueuedSynchronizer {
    
    public class ConditionObject implements Condition, java.io.Serializable {
        private transient Node firstWaiter;
        private transient Node lastWaiter;

        // ... 为省略篇幅,省略其他方法
    }
}

Очевидно, этоConditionObjectподдерживать очередь,firstWaiterссылка на головной узел очереди,lastWaiterявляется ссылкой на хвостовой узел очереди. Но класс узлаNode? Да, вы правильно прочитали, это то, что мы проанализировали ранее同步队列используется вAQSстатический внутренний классNode, боюсь ты забыл, тогда ставь этоNodeОсновное содержимое класса узла записывается снова:

static final class Node {
    volatile int waitStatus;
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
    Node nextWaiter;
    
    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;
}

Это:AQSКласс узла, используемый очередью синхронизации и настраиваемой очередью ожидания, одинаков..

потому чтоКогда поток в очереди ожидания пробуждается, ему необходимо повторно получить блокировку, то есть повторно получить состояние синхронизации, поэтому очередь ожидания должна знать, какую блокировку удерживает поток, когда он начинает ждать.. Дяди, которые разработали java, находятся вLockИнтерфейс предоставляет такой метод для получения очереди ожидания через блокировку:

Condition newCondition();

мы представили вышеConditionObjectэто сбылосьConditionинтерфейс, посмотриReentrantLockКак блокировка получает связанную с ней очередь ожидания:

public class ReentrantLock implements Lock {

    private final Sync sync;
    
    abstract static class Sync extends AbstractQueuedSynchronizer {
        final ConditionObject newCondition() {
            return new ConditionObject();
        }
        // ... 为节省篇幅,省略其他方法
    }
    
    public Condition newCondition() {
        return sync.newCondition();
    }
    
    // ... 为节省篇幅,省略其他方法
}

Как видите, это просто вопрос созданияConditionObjectтолько объект~Поскольку ConditionObject является членом внутреннего класса AQS, созданный объект ConditionObject содержит ссылку на объект AQS, поэтому доступ к очереди синхронизации осуществляется через объект ConditionObject, то есть состояние синхронизации может быть повторно получено, то есть блокировка может быть повторно приобретен. Это все еще немного запутанно, чтобы описать словами.Давайте сначала создадим замок.ConditionОбъект:

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();

Поскольку в начальном состоянии потоки не конкурируют за блокировки, поэтому同步队列пуст, и ни один поток не входит в очередь ожидания, потому что определенное условие не выполняется, поэтому等待队列тоже пусто,ReentrantLockобъект,AQSПредставление объектов и очередей ожидания в памяти показано на рисунке:

image_1c3hji5a4uvn1sdj1ro33hm87m61.png-26.7kB

Конечно, этоnewConditionМетод можно вызывать повторно, так что несколько блокировок могут быть сгенерированы с помощью одной блокировки.等待队列:

Lock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();

Затем нам нужно подумать, как завернуть нить вNodeГде узлы помещаются в очередь ожидания и как они удаляются из очереди ожидания.ConditionObjectВнутренний класс-член реализуетConditionИнтерфейс, этот интерфейс предоставляет следующие методы:

public interface Condition {
    void await() throws InterruptedException;
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;
    void awaitUninterruptibly();
    void signal();
    void signalAll();
}

Давайте посмотрим на конкретное значение этих методов:

имя метода описывать
void await() Текущий поток переходит в состояние ожидания до тех пор, пока он не будет уведомлен (вызов метода signal или signalAll) или не будет прерван.
boolean await(long time, TimeUnit unit) Текущий поток входит в состояние ожидания в течение заданного времени и возвращается, если он превышает указанное время или получает уведомление или прерывается в состоянии ожидания.
long awaitNanos(long nanosTimeout) То же, что и предыдущий метод, за исключением того, что используемая единица времени по умолчанию — наносекунды.
boolean awaitUntil(Date deadline) Текущий поток входит в состояние ожидания и возвращается, если достигнут крайний срок или если он уведомлен или прерван в состоянии ожидания.
void awaitUninterruptibly() Текущий поток переходит в состояние ожидания до тех пор, пока он не будет уведомлен в состоянии ожидания.Когда необходимо обратить внимание, этот метод не прерывается соответственно
void signal() Разбудите ожидающий поток.
void signalAll() Разбудить все ожидающие потоки.

можно увидеть,Conditionсерединаawaitметоды и встроенные объекты блокировкиwaitФункция метода такая же, она переводит текущий поток в состояние ожидания,signalметоды и встроенные объекты блокировкиnotifyФункция метода такая же, он разбудит потоки в очереди ожидания.

как вызов встроенного замкаwait/notifyметод, поток должен сначала получить блокировку, например, вызовConditionобъектawait/siganlПоток метода должен сначала получить поток, породившийConditionЯвная блокировка объекта. Его основное использование заключается в следующем:Генерируется методом newCondition явной блокировкиConditionобъект, поток может вызвать сгенерированную блокировку, удерживая явную блокировкуConditionметоды ожидания/сигнала объекта, общее использование выглядит следующим образом:

Lock lock = new ReentrantLock();

Condition condition = lock.newCondition();

//等待线程的典型模式
public void conditionAWait() throws InterruptedException {
    lock.lock();    //获取锁
    try {
        while (条件不满足) {
            condition.await();  //使线程处于等待状态
        }
        条件满足后执行的代码;
    } finally {
        lock.unlock();    //释放锁
    }
}

//通知线程的典型模式
public void conditionSignal() throws InterruptedException {
    lock.lock();    //获取锁
    try {
        完成条件;
        condition.signalAll();  //唤醒处于等待状态的线程
    } finally {
        lock.unlock();    //释放锁
    }
}

Предположим, что теперь есть блокировка и две очереди ожидания:

Lock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();

На чертеже видно, что:

image_1c3hjlt3n14rl1i9g1epg3371ce16e.png-39.7kB

Есть 3 темыmain,t1,t2звонить в то же времяReentrantLockобъектlockЕсли метод конкурирует за блокировку, только потокmainБлокировка получена, поэтому нить будетt1,t2упаковано вNodeВставка узла同步队列,такReentrantLockобъект,AQSобъект и同步队列Схема выглядит так:

image_1c3hjmnj819nl57f11l11p7f9196r.png-94.5kB

потому что в это времяmainПоток получает блокировку и находится в рабочем состоянии, но, поскольку определенное условие не выполняется, он выбирает выполнение приведенного ниже кода для входа.condition1очередь ожидания:

lock.lock();
try {
    contition1.await();
} finally {
    lock.unlock();
}

конкретныйawaitМы не будем анализировать код, он слишком длинный, боюсь, вы заснете, просто посмотрите вот этоawaitЧто делает метод:

  1. существуетcondition1Ожидание создания очередиNodeузел, этот узелthreadзначениеmainнить иwaitStatusза-2, статическая переменнаяNode.CONDITION, указывающий, что узел находится в очереди ожидания, поскольку этот узел является репрезентативным потокомmain, так это называетсяmain节点Проще говоря, вновь созданный узел выглядит так:

    image_1c3hs5hvfu3g186m1g8m9tjdg9cg.png-14.1kB

  2. вставьте этот узелcondition1Ожидание в очереди:

    image_1c3hs74l64m11n6eedc357acvct.png-118.9kB

  3. так какmainПоток также удерживает блокировку, поэтому необходимо уведомить поток, ожидающий получения блокировки, после снятия блокировки.t,так同步队列Узел 0 в удален, потокtполучить замок,节点1называетсяheadузел и поставитьthreadПоле имеет значение null:

    image_1c3hs8phe1r1smrl12q41231hfuda.png-103.4kB

Уже,mainОперация ожидания потока завершена, если блокировка получена сейчасt1Поток также выполняет следующий код:

lock.lock();
try {
    contition1.await();
} finally {
    lock.unlock();
}

Вышеупомянутый процесс все еще будет выполняться, иt1Нитки упакованы какNodeузел вставляется вcondition1Ожидание выхода в очередь, потому что оригинал в очереди ожидания节点1будет удален, мы помещаем эту новую вставку в очередь ожидания от имени потокаt1узел называется新节点1Бар:

image_1c3hshhsik77531ribb6kv57e4.png-112.2kB

Особое внимание здесь:Очередь синхронизации представляет собой двусвязный список, prev представляет собой предыдущий узел, next представляет следующий узел, а очередь ожидания представляет собой односвязный список, использующий nextWaiter для представления следующего узла, здесь они различаются..

Поток, получивший блокировку, теперьt2, все вышли смешать, первые двое зашли, только остальныеt2Как плохо, но не в этот разcondition1Конец очереди, замените его наcondition2Ставьте в очередь:

lock.lock();
try {
    contition2.await();
} finally {
    lock.unlock();
}

Эффект:

image_1c3hsjumr5jb3c5cqhdk57tieh.png-127.6kB

Все обнаружили, что хотя ни один поток не получает блокировку, и ни один поток не ожидает блокировки, но同步队列В нем еще есть узел, да,Очередь синхронизации пуста только тогда, когда ни один поток не заблокирован из-за блокировки в начале Пока есть поток, заблокированный из-за невозможности получения блокировки, очередь не пуста..

Уже,main,t1иt2Эти три потока вошли в состояние ожидания, кто их выведет, когда они все будут заняты? ? ? Ну~ Хорошо, давайте заставим другой поток получить ту же блокировку, например, потокt3Перейти кcondition2Пробуждение потока условной очереди, вы можете вызвать этоsignalметод:

lock.lock();
try {
    contition2.signal();
} finally {
    lock.unlock();
}

Так какcondition2Потоки, ожидающие в очереди, толькоt2,такt2будет разбужен, и этот процесс выполняется в два этапа:

  1. будетcondition2Репрезентативный поток, ожидающий очередиt2из新节点2, удален из очереди ожидания.

  2. будет удален节点2Поместите его в очередь синхронизации и подождите получения блокировки, одновременно меняя узелwaitStautsза0.

Иллюстрация этого процесса выглядит следующим образом:

image_1c3hsv52u8i64rgsdo2t1lngeu.png-119.3kB

если нитьt3Продолжай звонитьsignalAllПучокcondition1Ожидание пробуждения потока в очереди аналогично, ноcondition1Два узла на обоих перемещаются в очередь синхронизации одновременно:

lock.lock();
try {
    contition1.signalAll();
} finally {
    lock.unlock();
}

Эффект показан на рисунке:

image_1c3hthb14i21a58i5168p1a1bfb.png-98.9kB

Таким образом, все потоки начинаются с等待Состояние восстановилось, и блокировка может быть повторно задействована для следующей операции.

ВышеупомянутоеConditionПринцип и использование механизма, фактически это встроенный замок.wait/notifyДругая реализация механизма в явной блокировке, ноИсходный встроенный объект блокировки может соответствовать только одной очереди ожидания.Теперь явная блокировка может генерировать несколько очередей ожидания.Мы можем размещать потоки в разных очередях ожидания в соответствии с разными условиями ожидания потоков..ConditionНазначение механизма может относиться кwait/notifyмеханизм, далее мы используем встроенный замок иwait/notifyСинхронизированная очередь, написанная механизмомBlockedQueueиспользовать显式锁 + Conditionспособ написать:

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionBlockedQueue<E> {

    private Lock lock = new ReentrantLock();

    private Condition notEmptyCondition = lock.newCondition();

    private Condition notFullCondition = lock.newCondition();

    private Queue<E> queue = new LinkedList<>();

    private int limit;

    public ConditionBlockedQueue(int limit) {
        this.limit = limit;
    }

    public int size() {
        lock.lock();
        try {
            return queue.size();
        } finally {
            lock.unlock();
        }
    }

    public boolean add(E e) throws InterruptedException {
        lock.lock();
        try {
            while (size() >= limit) {
                notFullCondition.await();
            }

            boolean result = queue.add(e);
            notEmptyCondition.signal();
            return result;
        } finally {
            lock.unlock();
        }
    }

    public E remove() throws InterruptedException{
        lock.lock();
        try {
            while (size() == 0) {
                notEmptyCondition.await();
            }
            E e = queue.remove();
            notFullCondition.signalAll();
            return e;
        } finally {
            lock.unlock();
        }
    }
}

В этой очереди мы использовалиReentrantLockблокировка, через которую генерируются две блокировкиConditionобъект,notFullConditionУказывает условие, что очередь не заполнена,notEmptyConditionУказывает условие, что очередь не пуста. Когда очередь заполнена, потокnotFullConditionОжидание, каждый раз, когда элемент вставляется, он будет уведомлятьnotEmptyConditionПоток, который условно ожидает; когда очередь пуста, поток будет ждатьnotEmptyConditionОжидание, каждый раз, когда элемент удаляется, он будет уведомлятьnotFullConditionУсловно ожидающие потоки. Таким образом, семантика становится очевидной.Если у вас есть больше условий ожидания, вы можете сгенерировать больше с помощью явной блокировки.Conditionобъект. иКаждый встроенный объект блокировки может иметь только одну связанную очередь ожидания, что является одним из преимуществ явных блокировок по сравнению со встроенными блокировками..

Подведем итог использования выше:Каждый объект явной блокировки может генерировать несколькоConditionобъект, каждыйConditionОбъект будет соответствовать очереди ожидания, поэтому он имеет эффект явной блокировки, соответствующей нескольким очередям ожидания..

AQSДругие важные методы для ожидающих очередей в

КромеConditionобъектawaitиsignalметод,AQSСуществует также ряд методов прямого доступа к этой очереди, все из которыхpublic finalдекоративный:

public abstract class AbstractQueuedSynchronizer {
    public final boolean owns(ConditionObject condition)
     public final boolean hasWaiters(ConditionObject condition) {}
     public final int getWaitQueueLength(ConditionObject condition) {}
     public final Collection<Thread> getWaitingThreads(ConditionObject condition) {}
}
имя метода описывать
owns Проверьте, не является ли этоAQSУказанный объект ConditionObject, сгенерированный объектом
hasWaiters Есть ли ожидающие потоки в указанной очереди ожидания
getWaitQueueLength Возвращает оценку количества потоков, ожидающих выполнения этого условия. Поскольку фактический набор потоков в многопоточной среде может значительно измениться при построении этого результата.
getWaitingThreads Возвращает оценку набора потоков, ожидающих выполнения этого условия. Поскольку фактический набор потоков в многопоточной среде может значительно измениться при построении этого результата.

При необходимости их можно использовать в нашем пользовательском инструменте синхронизации.

Не по теме

Написание статей очень утомительно, и иногда вы чувствуете, что чтение идет очень гладко, что на самом деле является результатом бесчисленных правок за ним. Если вы думаете, что это хорошо, пожалуйста, помогите переслать его.Большое спасибо~ Вот мой публичный аккаунт "Мы все маленькие лягушки".

буклет

Кроме того, автор также написал буклет MySQL:Ссылка на статью «Как работает MySQL: понимание MySQL у истоков». Содержание буклета в основном представлено с точки зрения Xiaobai, объясняя некоторые основные концепции MySQL на относительно распространенном языке, такие как записи, индексы, страницы, табличные пространства, оптимизация запросов, транзакции и блокировки и т. д. Общее количество слов составляет от 300 000 до 400 000 слов с сотнями оригинальных иллюстраций. Основная цель состоит в том, чтобы облегчить обычным программистам изучение MySQL для продвинутых пользователей и сделать кривую обучения немного более плавной. Студенты, у которых есть сомнения относительно MySQL для продвинутых пользователей, могут взглянуть на: