Параллельное программирование на Java ----- AQS (Abstract Queue Synchronizer)

Node.js Java задняя часть Шаблоны проектирования
Параллельное программирование на Java ----- AQS (Abstract Queue Synchronizer)

1. Что такое СКВ?

AQS, сокращение от AbstractQueuedSynchronizer, — это платформа для реализации синхронизаторов в параллельном программировании. Фреймворк, фреймворк, трижды сказано важные вещи, фреймворк означает, что он обрабатывает большую часть логики за вас, а другие функции требуют расширения. Подумайте о сценарии, в котором вы используете среду Spring. Spring помогает разработчикам реализовать управление зависимостями компонентов, разрешение тегов и т. д. контейнера IOC. Нам нужно только настроить компонент и не заботиться об остальном.

AQS реализован на основе двунаправленной очереди FIFO и предназначен для использования синхронизаторами, которые полагаются на атомарное целочисленное значение, представляющее состояние. Все мы знаем, что, поскольку он называется синхронизатором, должно быть что-то, представляющее состояние синхронизации (критический ресурс).В AQS это целочисленное значение, называемое состоянием, которое атомарно модифицируется CAS.

В AQS существует очередь FIFO. Узлы в очереди представляют блокирующий поток, а элементы узла очереди имеют четыре типа. Каждый тип представляет причину, по которой поток блокируется. Эти четыре типа:

  • CANCELLED: Указывает, что поток помещен в очередь из-за тайм-аута или причин прерывания.
  • CONDITION: Указывает, что поток помещается в очередь, поскольку определенное условие не выполняется, и ему нужно дождаться условия, и он не будет удален из очереди, пока условие не будет выполнено.
  • SIGNAL: Указывает, что поток необходимо разбудить.
  • PROPAGATE: указывает, что в совместно используемом режиме текущий узел выполняет выпускreleaseПосле операции текущий узел должен распространить уведомление на все последующие узлы.

Поскольку общий ресурс может одновременно удерживаться только одним потоком, он также может удерживаться несколькими потоками, поэтому в AQS есть два режима, а именно:

  • 1, эксклюзивный режим

    Exclusive представляет общие значения состояния, каждое состояние может храниться потоком, если другие потоки должны получить, вам нужно препятствие, как в JUCReentrantLock

  • 2. Режим обмена

    Общий режим означает, что состояние значения общего состояния может одновременно храниться несколькими потоками, например, в JUC.CountDownLatch

2. Основные структуры данных и методы в AQS

1. Поскольку AQS основан на структуре очереди FIFO, давайте сначала посмотрим на структуру данных узла узла узла очереди.Исходный код выглядит следующим образом:
static final class Node {
    /**共享模式*/
    static final Node SHARED = new Node();
    /**独占模式*/
    static final Node EXCLUSIVE = null;

    /**标记线程由于中断或超时,需要被取消,即踢出队列*/
    static final int CANCELLED =  1;
    /**线程需要被唤醒*/
    static final int SIGNAL = -1;
    /**线程正在等待一个条件*/
    static final int CONDITION = -2;
    /**
     * 传播
     */
    static final int PROPAGATE = -3;
    
    // waitStatus只取上面CANCELLED、SIGNAL、CONDITION、PROPAGATE四种取值之一
    volatile int waitStatus;

    // 表示前驱节点
    volatile Node prev;

    // 表示后继节点
    volatile Node next;

    // 队列元素需要关联一个线程对象
    volatile Thread thread;

    // 表示下一个waitStatus值为CONDITION的节点
    Node nextWaiter;

    /**
     * 是否当前结点是处于共享模式
     */
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    /**
     * 返回前一个节点,如果没有前一个节点,则抛出空指针异常
     */
    final Node predecessor() throws NullPointerException {
        // 获取前一个节点的指针
        Node p = prev;
        // 如果前一个节点不存在
        if (p == null)
            throw new NullPointerException();
        else
        // 否则返回
            return p;
    }

    // 初始化头节点使用
    Node() {}

    /**
     *  当有线程需要入队时,那么就创建一个新节点,然后关联该线程对象,由addWaiter()方法调用
     */
    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    /**
     * 一个线程需要等待一个条件阻塞了,那么就创建一个新节点,关联线程对象
     */
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

Подводя итог дизайну структуры данных узла Node, элементы в очереди определенно предназначены для сохранения потоков, поставленных в очередь из-за того, что по какой-то причине состояние общего ресурса не может быть получено, поэтомуNodeиспользуется вwaitStatusПричина указывает на то, что узел в команду, используяThreadОбъект для представления потока, связанного с узлом. Что касаетсяprev,next, который является указателем, который должен предоставляться общей структурой данных двусторонней очереди для выполнения связанных операций в очереди.

2. Общие значения состояния в AQS

Как упоминалось ранее, AQS реализует синхронизацию синхронизатора на основе общего значения состояния типа int, которое объявляется следующим образом:

 /**
 * 同步状态值
 */
private volatile int state;

/**
 * 获取同步状态值
 */
protected final int getState() {
    return state;
}

/**
 * 修改同步状态值
 */
protected final void setState(int newState) {
    state = newState;
}

Из исходного кода видно, что AQS объявляет значение состояния типа int.Для реализации функции многопоточной синхронизации модификация этого значения должна быть видна нескольким потокам.Поэтому состояние декорировано volatile , иgetState()иsetState()Метод оформлен в финале, цель - ограничить подкласс AQ, чтобы вызвать только эти два метода, чтобы установить и получить значение состояния, но не переопределить пользовательскую настройку / получить логику.

AQS предоставляет не только методы изменения значений состоянияsetState()иgetState(), а также такие настройки, как использование механизма CAScompareAndSetState()метод, аналогично, этот метод также используетfinalМодифицированные подклассы не могут переопределяться, их можно только вызывать.

3. Метод tryXXX в AQS

Как правило, на основе синхронизатора реализации AQS, такого какReentrantLock,CountDownLatchи т. д. Для операции получения состояния подклассам нужно только переопределить егоtryAcquire()иtryAcquireShared()метод, эти два метода соответствуют операции получения состояния в эксклюзивном режиме и совместно используемом режиме соответственно, а для операции освобождения подклассам нужно только переписатьtryRelease()иtryReleaseShared()метод.

Что касается того, как поддерживать операции удаления из очереди и постановки в очередь, вам не нужно беспокоиться о подклассах, AQS уже сделал это за вас.

Три, AQS дизайн красоты

В отличном проекте всегда будут изюминки, в том числе и в AQS. Xiaobian После прочтения исходного кода AQS это был очень хороший момент проектирования, который казался очень превосходным, и предшественники всегда были такими хорошими.

1. Спин-блокировка

Когда мы выполняем операцию с определенным результатом, нам нужно одновременно правильно, и обычно можно добиться спинового блокировки. В AQS, замок саморегулирования死循环 + CASвыполнить. Для AQS вenq()Объяснять:

private Node enq(final Node node) {
    // 死循环 + CAS ,解决入队并发问题
    /**
     * 假设有三个线程同时都需要入队操作,那么使用死循环和CAS可保证并发安全,同一时间只有一个节点安全入队,入队失败的线程则循环重试
     * 
     * 1、如果不要死循环可以吗?只用CAS.
     *   不可以,因为如果其他线程修改了tail的值,导致1处代码返回false,那么方法enq方法将推出,导致该入队的节点却没能入队
     * 
     * 2、如果只用死循环,不需要CAS可以吗?
     *   不可以,首先不需要使用CAS,那就没必要再使用死循环了,再者,如果不使用CAS,那么当执行1处代码时,将会改变队列的结构
     */
    for (;;) {
        // 获取尾部节点
        Node t = tail;
        // 如果还没有初始化,那么就初始化
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                // 刚开始肯定是头指针和尾指针相等
                tail = head;
        } else {
            // 当前结点的前驱节点等于尾部节点
            node.prev = t;
            // 如果当前尾结点仍然是t,那么执行入队并返回true,否则返回false,然后重试
            if (compareAndSetTail(t, node)) {   // 1
                t.next = node;
                return t;
            }
        }
    }
}

Прежде всего, конечный результат, требуемый операцией постановки в очередь, должен состоять в том, что узел вставляется в очередь, что может быть выполнено только успешно, а не с ошибкой! Однако эта операция постановки в очередь должна выполняться одновременно, и может быть много потоков, которым необходимо выполнять операции постановки в очередь одновременно, поэтому нам необходимо принять соответствующий механизм синхронизации потоков. Спин-блокировка использует оптимистическую стратегию, то есть использует CAS вcompareAndSet()Операция, если определенное возвращение выполнения не удалось, то текущая операция должна быть получена. Следовательно, то для бесконечной петли используется до тех пор, пока он не добится успеха. Если успешно, вырваться из цикла или напрямую вернуть операцию для выхода из метода.

2. Шаблонный метод

В AQS шаблон проектирования метода шаблона воплощен в егоacquire()、release()С точки зрения метода, давайте сначала посмотрим на исходный код:

 public final void acquire(int arg) {
        // 首先尝试获取共享状态,如果获取成功,则tryAcquire()返回true
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

который вызываетtryAcquire()Реализация метода по умолчанию заключается в создании исключения, т. е.tryAcquire()методы оставлены для подклассов для реализации,acquire()Метод определяет шаблон, набор логики обработки, а соответствующие конкретные методы выполнения оставлены для реализации подклассами.

Для получения дополнительных шаблонов проектирования методов шаблона вы можете обратиться кРасскажите о моем понимании шаблона проектирования «метод шаблона» (Шаблон)

В-четвертых, настройте собственный параллельный синхронизатор.

ниже сJDKПредставлен пример документации:

class Mutex implements Lock, java.io.Serializable {
    // 自定义同步器
    private static class Sync extends AbstractQueuedSynchronizer {
        // 判断是否锁定状态
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        // 尝试获取资源,立即返回。成功则返回true,否则false。
        public boolean tryAcquire(int acquires) {
            assert acquires == 1; // 这里限定只能为1个量
            if (compareAndSetState(0, 1)) {//state为0才设置为1,不可重入!
                setExclusiveOwnerThread(Thread.currentThread());//设置为当前线程独占资源
                return true;
            }
            return false;
        }

        // 尝试释放资源,立即返回。成功则为true,否则false。
        protected boolean tryRelease(int releases) {
            assert releases == 1; // 限定为1个量
            if (getState() == 0)//既然来释放,那肯定就是已占有状态了。只是为了保险,多层判断!
                throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);//释放资源,放弃占有状态
            return true;
        }
    }

    // 真正同步类的实现都依赖继承于AQS的自定义同步器!
    private final Sync sync = new Sync();

    //lock<-->acquire。两者语义一样:获取资源,即便等待,直到成功才返回。
    public void lock() {
        sync.acquire(1);
    }

    //tryLock<-->tryAcquire。两者语义一样:尝试获取资源,要求立即返回。成功则为true,失败则为false。
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    //unlock<-->release。两者语文一样:释放资源。
    public void unlock() {
        sync.release(1);
    }

    //锁是否占有状态
    public boolean isLocked() {
        return sync.isHeldExclusively();
    }
}

Класс синхронизации, который реализует свой собственный, обычно настраивает синхронизатор (sync) и определяет класс как внутренний класс для собственного использования, в то время как сам класс синхронизации (Mutex) реализует интерфейс для внешних служб. Конечно, реализация интерфейса напрямую зависит от синхронизации, и между ними существует определенное смысловое соответствие! ! Синхронизация использует только tryAcquire-tryReelase для реализации метода получения-освобождения состояния ресурсов.Что касается постановки потоков в очередь, ожидания, пробуждения и т. д., AQS верхнего уровня уже реализована, и нам не нужно об этом заботиться.

Кроме того, Mutex, ReentrantLock / CountDownlatch / Semphore Эти реализации класса синхронизации являются аналогичными и различными местами в приобретении - способ выпуска ресурсов. Осведомленный этот момент основные AQ будут разбиты!