Анализ исходного кода AQS параллельного программирования

Node.js Java задняя часть исходный код

предисловие

В пакете java.util.concurrent.locks JDK 1.5 все блокировки, и есть абстрактный класс AbstractQueuedSynchronizer (Abstract Queue Synchronizer), который является AQS, сегодня мы рассмотрим этот класс.

1. Структура

类结构

Давайте взглянем на структуру этого класса.Этот класс наследуется внутренними классами CountDown, ThreadPoolExecutor, ReentrantLock, ReentrantReadWriteLock и Semaphore, и эти внутренние классы являются реальной реализацией этих блокировок, независимо от того, являются ли они справедливыми блокировками или несправедливыми. замки.

То есть реальная реализация этих блокировок реализуется этим классом. Итак, давайте начнем с этих замков и посмотрим, как добиться от блокировки до разблокировки.

2. Блокировочный метод реентерабельных блокировок

Давайте сначала посмотрим на метод блокировки реентерабельной блокировки ReentranLock.

    public void lock() {
        sync.lock();
    }

Этот метод вызывает метод блокировки абстрактного класса синхронизации внутреннего класса, который реализует справедливые и нечестные блокировки. Давайте посмотрим, как реализованы честные блокировки:

    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }

Вызывается метод получения, который является методом AQS, поскольку sync наследует AQS, а Fair Lock наследует Sync, что эквивалентно косвенному наследованию AQS. Давайте рассмотрим этот метод.

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

Аннотации JDK для этого метода:

Захватывает объект в эксклюзивном режиме, прерываясь в случае прерывания. Этот метод реализуется, сначала проверяя состояние прерывания, затем вызывая tryAcquire(int) по крайней мере один раз и возвращаясь в случае успеха. В противном случае, до успеха или до того, как поток будет прерван, вызывается tryAcquire(int) для добавления потока в очередь, и поток может быть повторно заблокирован или не заблокирован. Вы можете использовать этот метод для реализации метода Lock.lockInterruptably().

Хозяин кратко расскажет о функции этого метода: этот метод попытается получить блокировку. Если он не может быть получен, он будет добавлен в очередь ожидания и будет ждать пробуждения. На самом деле это похоже на синхронизированный, который мы проанализировали. до.

Рассмотрим этот метод подробнее. Первый — это метод tryAcquire, который пытается получить блокировку. Этот метод необходимо написать. Метод по умолчанию родительского класса — генерировать исключение. Как его переписать? Абстрактный класс определяет стандарт: если он возвращает true, это означает, что блокировка получена успешно, в противном случае она терпит неудачу.

tryAcquire

Возвращаемся к методу Acquire, если блокировка получена успешно, то происходит возврат напрямую, если нет, то продолжаем следующие операции, то есть ставим поток в очередь ожидания:

AcquireQueued(addWaiter(Node.EXCLUSIVE), arg)

Давайте сначала посмотрим на метод addWaiter(Node.EXCLUSIVE):

    /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

Комментарии к этому методу: Поместите текущий поток в узел очереди. А параметры? Существует два типа параметров: Node.EXCLUSIVE — эксклюзивная блокировка, Node.SHARED — общая блокировка.

Эти две константы определены в классе Node:

    static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

Эксклюзивные блокировки являются нулевыми, общие блокировки являются пустыми объектами.

Рассмотрим этапы метода:

  1. Создайте объект Node текущего потока (свойство nextWaiter равно null, свойство потока — текущий поток).
  2. Получите конечный узел, если конечный узел не равен нулю, установите для конечного узла значение свойства prev только что созданного узла. 2.1 Установите конечный узел как новый узел через CAS. В случае успеха установите только что созданный узел в качестве следующего узла старого конечного узла. Вернись наконец.
  3. Если конечный узел имеет значение null, вызывается метод enq. Создайте конечный узел, затем установите для вновь созданного конечного узла значение свойства prev нового узла (конечным узлом в этой точке является головной узел). Наконец, верните только что созданный узел узла.

Давайте посмотрим на реализацию метода enq:

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

Этапы метода следующие:

  1. Бесконечный цикл, получить конечный узел, если он нулевой, использовать CAS для создания головного узла (головной узел в это время также нулевой) и назначить головной узел конечному узлу.
  2. Поскольку CAS только что завершился успешно, выполните логику else, назначьте конечный узел атрибуту prev нового узла и используйте CAS, чтобы установить новый конечный узел для только что созданного объекта узла. Затем верните объект узла.

Этот метод в основном предназначен для инициализации головного узла и конечного узла, добавления новых узлов к конечному узлу и обновления конечного узла.

Мы придем к методу addWaiter, основная функция которого — создать объект node на основе текущего потока и добавить его в конец очереди.

Вернемся к методу приобретения:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

Метод addWaiter вернет только что созданный объект node, а затем вызовет методAcquireQueued, мы вводим этот метод, чтобы увидеть:

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

Этапы метода следующие:

  1. Бесконечная петля. Сначала получаем узел узла предыдущего узла, если узел равен головному, значит он второй узел, то можно попытаться получить блокировку в это время. 1.1 Если блокировка получена успешно, установите текущий узел в качестве головного узла (и установите для потока текущего узла значение null, а для prev — значение null) и установите для следующего узла его предыдущего узла значение null (чтобы помочь повторному использованию сборщика мусора) . Наконец, возвращает логическое значение того, был ли ожидающий процесс прерван.
  2. Если два вышеуказанных условия не выполняются, вызываются метод shouldParkAfterFailedAcquire и метод parkAndCheckInterrupt. Цель этих двух методов — приостановить текущий поток. Затем подождите, пока вас разбудят или прервут. Позже мы более подробно рассмотрим оба метода.
  3. Если он разбужен текущим потоком после приостановки, он снова зациклится, чтобы определить, является ли предыдущий узел узла головным.Вообще говоря, когда вы проснулись, это означает, что вам не разрешено брать блокировку , то есть головной узел выполнил задачу освобождения блокировки. Затем повторите шаг 1. Вернись наконец.

Давайте посмотрим на метод shouldParkAfterFailedAcquire:

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)

            return true;
        if (ws > 0) {

            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
     
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

Этапы метода следующие:

  1. Получите состояние ожидания предыдущего узла. Если состояние SIGNAL -1, он напрямую вернет true, указывая, что он может приостановить и отдохнуть.
  2. Если состояние ожидания больше 0, цикл проверяет состояние ожидания предыдущего узла prev до тех пор, пока состояние не станет больше 0. Это поле имеет 4 состояния: CANCELED = 1, SIGNAL = -1, CONDITION = -2, PROPAGATE = -3, то есть, если оно больше 0, это отмененное состояние. Итак, что мне делать после нахождения узла, который не больше 0? Укажите текущий узел на следующий узел этого узла, то есть те узлы с состоянием больше 0 будут признаны здесь недействительными и будут переработаны сборщиком мусора в любое время.
  3. Если не больше 0 и не -1, установите состояние предыдущего узла как действительное, то есть -1. Наконец, верните false. Обратите внимание, что в методе acceptQueued цикл будет продолжаться после возврата false, а узел pred в это время уже равен -1, поэтому в конечном итоге он вернет true.

Посмотрите еще раз на метод parkAndCheckInterrupt (приостановка и проверка на прерывание):

   private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

Этот метод очень прост. Он приостанавливает текущий поток, ждет, пока не проснется другой поток (обычно поток в головном узле), а затем возвращает информацию о том, прерван ли текущий поток. Обратите внимание, что этот метод очищает статус прерывания.

Вернемся к методу AcquiQueued, чтобы обобщить метод: этот метод заключается в том, чтобы приостановить только что созданный узел потока, а затем дождаться его пробуждения.Если он проснулся, установить себя в качестве головного узла. Наконец, верните, было ли оно прервано.

Снова вернемся к методу получения:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

В этом методе, если получение блокировки завершается сбоем, пробуждение и прерывание, выполните метод selfInterrupt:

    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

Установите бит состояния прерывания для текущего потока.

Ну вот, мы в принципе закончили анализ всего метода блокировки, можно сказать, что весь метод заключается в том, чтобы поставить поток в очередь ожидания и приостановить его, а затем дождаться пробуждения головного узла. Среди них часто появляется метод tryAcquire, а конкретная реализация этого метода реализуется подклассами, такими как реентерабельные блокировки, блокировки чтения-записи и работники пула потоков.Среди них CountDown и Semaphore реализуют метод tryAcquire в общем режиме, но принцип тот же. Как определяется AQS? То есть возврат true означает, что блокировка была получена, а возврат false означает, что блокировка не удалась.Как реализовать AQS нельзя контролировать. Но все они опираются на крайне важное поле ------- состояние.

Об этом поле необходимо рассказать арендодателю, которое определяет состояние текущего синхронизатора.Если вы знаете примитив pv, вы должны хорошо понимать это поле.Как это поле определяется в AQS:

    /**
     * The synchronization state.
     */
    private volatile int state;

изменчивый. Это поле может быть изменено несколькими потоками, поэтому для обеспечения видимости переменной его необходимо установить в volatile.

Мы можем видеть, как это поле используется честными блокировками в реентерабельных блокировках.

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

Этот метод переопределяет метод tryAcquire. Шаги следующие:

  1. Получить текущий поток, получить состояние блокировки (синхронизатора).
  2. Если синхронизатор равен 0, CAS устанавливает состояние в 1, указывая, что синхронизатор занят, и устанавливает поток, удерживающий синхронизатор, в текущий поток (для определения повторного входа). Наконец, он возвращает true, когда блокировка успешно получена.
  3. Если он не равен 0, а текущий поток является потоком, удерживающим синхронизатор, это означает повторный вход. Затем добавьте 1 к состоянию и, наконец, верните true. Итак, когда вы повторно входите один раз, вам нужно разблокировать его один раз, иначе следующий поток никогда не получит блокировку.
  4. Если нет, верните false, указав, что блокировка не удалась.

Отсюда видно, что поле statei очень важно, и то, удерживается ли блокировка, полностью зависит от этого поля. Это нужно отметить, и эта конструкция аналогична pv операционной системы.

Итак, прочитав блокировку, а затем посмотрев на разблокировку, мы можем сначала догадаться, как спроектировать, во-первых, поле состояния должно быть установлено в 0, чтобы следующий поток мог взять блокировку, а затем? Разбудить следующий поток в очереди ожидания. Пусть попытается получить замок. Так Дуг Ли так устроен? Давайте взглянем.

3. Метод разблокировки повторной блокировки

Этот метод вызывает метод выпуска AQS:

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

Сначала попробуйте освободить, в случае успеха будите следующий поток.

Давайте сначала посмотрим на метод tryRelease (который нужно переопределить):

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

Этапы метода следующие:

  1. Вычисляет состояние синхронизатора минус 1.
  2. Определите, совпадают ли поток синхронизатора и текущий поток, если нет, создайте исключение состояния монитора.
  3. Определить, равно ли состояние 0, то есть если оно равно 0, это означает, что ни один поток не удерживает блокировку, затем установить free в true и установить для свойства thread синхронизатора значение null,
  4. Наконец, установите для состояния вычисленное значение, где необходимо учитывать повторный вход. Вернись наконец.

Как видите, если состояние не равно 0, оно вернет false, и следующие шаги пропадут, то есть следующий поток не будет разбужен при разблокировке реентерабельной блокировки.

Если разблокировка прошла успешно, выполните следующие шаги: Если головной узел не нулевой и его статус не равен 0, это означает, что существует поток, который может проснуться и выполнить метод unparkSuccessor.

    private void unparkSuccessor(Node node) {

        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);


        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

Этапы метода следующие:

  1. Получить статус головного узла.
  2. Если меньше 0, CAS устанавливает состояние на 0.
  3. Получите следующий узел головного узла и оцените, является ли он нулевым или следующий узел больше 0. Если он равен нулю или больше 0, он будет искать вверх от конечного узла, пока не найдет узел, состояние которого меньше или равно 0.
  4. Наконец, разбудите поток этого узла.

В это время, ожидая в методе AcquireQueued, если быть точным, поток в методе parkAndCheckInterrupt пробуждается, начинает продолжать цикл, пытается взять блокировку (необходимо изменить переменную состояния) и установить себя как голову.

Здесь также отсутствует место, то есть переменная waitStatus, когда она будет больше или равна 0?Значение этой переменной по умолчанию равно 0, а состояние больше 0 — это состояние Canceled. Когда его отменят? В методе AcquireQueued, если метод не завершается нормально, будет выполнен метод cancelAcquire в finally, который изменит состояние на 1, что является отмененным состоянием.

4 Резюме

На этот раз мы анализируем AQS, который является реальной реализацией блокировок, и анализируем только метод блокировки и метод разблокировки, которые являются основой реентерабельных блокировок. CountDown и Semaphore являются общими блокировками, но основной принцип тот же, но его можно достичь, увеличив количество состояний. Условие, связанное с блокировками, такими как повторные блокировки, напрямую приостанавливает текущий поток через класс инструмента LockSupport и добавляет текущий поток в очередь ожидания.При вызове сигнального метода условия пробуждается первый поток в очереди. . У нас есть возможность повторно проанализировать конкретный исходный код.

Короче говоря, реализация блокировок повторного входа в Java основана на AQS, а AQS в основном реализуется на основе переменных состояния и очередей. Принцип реализации аналогичен примитиву pv.

удачи! ! ! ! !