Глубокое понимание AbstractQueuedSynchronizer (AQS)

Java задняя часть исходный код
Глубокое понимание AbstractQueuedSynchronizer (AQS)

![]>Оригинальная статья, краткий опыт и жизненные перипетии от школьного рекрутинга до фабрики А

Нажмите, чтобы узнать подробностиwww.codercc.com

1. Введение в AQS

существуетпредыдущий постУ нас есть предварительное представление о блокировке и AbstractQueuedSynchronizer (AQS). При реализации компонентов синхронизации AQS является основной частью Разработчики компонентов синхронизации реализуют семантику компонентов синхронизации с помощью шаблонного метода, предоставляемого AQS, а AQS реализует семантику компонентов синхронизации.Управление состоянием синхронизации и постановка в очередь заблокированных потоков для уведомленияДождитесь некоторой низкоуровневой обработки реализации. Ядро AQS также включает следующие аспекты:Очередь синхронизации, получение и освобождение эксклюзивных блокировок, получение и освобождение общих блокировок, прерываемые блокировки и реализация тайм-аута ожидания получения блокировки, и на самом деле это шаблонные методы, предоставляемые AQS, которые резюмируются следующим образом:

Эксклюзивный замок:

void Acquire(int arg): монопольное получение состояния синхронизации, если получение не удалось, вставить очередь синхронизации для ожидания; void AcquireInterruptably(int arg): то же, что метод получения, но может обнаруживать прерывания во время ожидания в очереди синхронизации; boolean tryAcquireNanos(int arg, long nanosTimeout): добавляет функцию ожидания тайм-аута, основанную на эквайреинтерруптибли, и возвращает false, если статус синхронизации не получен в течение периода тайм-аута; boolean release(int arg): сброс состояния синхронизации, этот метод разбудит следующий узел в очереди синхронизации

Общий замок:

void AcquireShared(int arg): общее состояние синхронизации получения, отличие от монопольного состоит в том, что несколько потоков получают состояние синхронизации одновременно; voidacquireSharedInterruptily(int arg): добавлена ​​возможность реагировать на прерывания на основе методаAcquireShared; boolean tryAcquireSharedNanos(int arg, long nanosTimeout): добавляет функцию ожидания тайм-аута на основе AcquireSharedInterruptably; boolean releaseShared(int arg): общее состояние синхронизации выпуска

Фактически, чтобы освоить базовую реализацию AQS, нужно изучить логику этих шаблонных методов. Прежде чем изучать эти шаблонные методы, мы должны сначала понять, какой структурой данных является очередь синхронизации в AQS, поскольку очередь синхронизации является краеугольным камнем управления AQS состоянием синхронизации.

2. Синхронизируйте очередь

Когда общий ресурс занят потоком, другие потоки, запрашивающие ресурс, блокируются и попадают в очередь синхронизации. Что касается структуры данных, то реализация очереди не более чем двумя способами: один в виде массива, а другой в виде связанного списка. Синхронная очередь в AQSпо цепочкереализовать. Далее, очевидно, у нас будет по крайней мере этот вопрос: ** 1. Какова структура данных узла? 2. Он односторонний или двусторонний? 3. Является ли это ведущим узлом или неведущим узлом? ** Мы по-прежнему сначала смотрим на исходный код.

В AQS есть статический внутренний класс Node со следующими свойствами:

volatile int waitStatus //состояние узла volatile Node prev // Узел-предшественник текущего узла/потока volatile Node next; //Узел-преемник текущего узла/потока volatile Thread thread;//ссылка на поток для присоединения к очереди синхронизации Node nextWaiter;//Ожидание следующего узла в очереди

Состояние узлов следующее:

int CANCELED = 1//Узел исключен из очереди синхронизации int SIGNAL = -1//Поток узла-преемника находится в состоянии ожидания, если текущий узел освобождает состояние синхронизации, он уведомляет узел-преемник, чтобы поток узла-преемника мог работать; int CONDITION = -2//Текущий узел входит в очередь ожидания int PROPAGATE = -3//Указывает, что следующее получение общего состояния синхронизации будет распространяться безоговорочно int INITIAL = 0;//Исходное состояние

Теперь, когда мы знаем тип структуры данных узла и каждый узел имеет своего предшественника и преемника, становится ясно, что этодвусторонняя очередь. Точно так же мы можем взглянуть на это с демо.

public class LockDemo {
    private static ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread(() -> {
                lock.lock();
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            });
            thread.start();
        }
    }
}

В коде примера открывается 5 потоков, и блокировка сначала устанавливается, а затем приостанавливается на 10 с. Фактически, спящий поток здесь предназначен для имитации ситуации входа в очередь синхронизации, когда поток не может получить блокировку. При отладке, когда Thread-4 (последний поток в этом примере) не может получить блокировку и входит в синхронизацию, текущая очередь синхронизации во время AQS выглядит так, как показано на рисунке:

LockDemo debug下 .png

Поток-0 сначала получает блокировку, а затем спит.Другие потоки (Поток-1, Поток-2, Поток-3, Поток-4) не могут получить блокировку и попадают в очередь на синхронизацию. видно, что у каждого узла есть два домена: предыдущий (предшественник) и следующий (преемник), и каждый узел используется для сохранения информации, такой как ссылки на потоки и состояния ожидания, которым не удалось получить статус синхронизации. Кроме того, в AQS есть две важные переменные-члены:

private transient volatile Node head;
private transient volatile Node tail;

Другими словами, AQS фактически управляет очередью синхронизации с помощью указателей начала и конца и в то же время реализует основные методы, включая постановку в очередь потока, которому не удалось получить блокировку, и уведомление потока в очереди синхронизации при снятии блокировки. . Его принципиальная схема выглядит следующим образом:

队列示意图.png

Благодаря пониманию исходного кода и способа проведения экспериментов мы теперь можем четко знать следующие моменты:

  1. Структура данных узла, то есть статический внутренний класс Node AQS, состояние ожидания узла и другая информация;
  2. Очередь синхронизации — это двусторонняя очередь, и AQS управляет очередью синхронизации, удерживая указатели начала и конца.;

Итак, как узлы ставятся в очередь и удаляются из очереди? Фактически это соответствует двум операциям получения и снятия блокировки: при получении блокировки не удается выполнить операцию постановки в очередь, а при получении блокировки успешно выполняется операция удаления из очереди.

3. Эксклюзивный замок

3.1 Получение монопольных замков (метод получения)

Давайте продолжим просмотр исходного кода и отладку.В качестве примера вышеприведенной демонстрации вызов метода lock() должен получить эксклюзивную блокировку.Если получение не удается, текущий поток будет добавлен в очередь синхронизации.Если это успешно, поток будет выполнен. Метод lock() фактически вызовет метод **acquire()** AQS, исходный код выглядит следующим образом.

public final void acquire(int arg) {
		//先看同步状态是否获取成功,如果成功则方法结束返回
		//若失败则先调用addWaiter()方法再调用acquireQueued()方法
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}

Ключевую информацию см. в комментариях. В зависимости от того, является ли текущее состояние синхронизации успешным или нет, в зависимости от того, является ли текущий статус синхронизации успешным или нет, происходит две вещи: 1. Если это удается, метод завершается и возвращается, 2. Если это не удается, то сначала вызывается addWaiter() а затем вызывается методAcquireQueued().

Не удалось получить статус синхронизации, поставить в очередь операцию

Когда потоку не удается получить эксклюзивную блокировку, текущий поток будет добавлен в очередь синхронизации, так как же присоединиться к очереди? Далее нам следует изучить функции addWaiter() и AcquireQueued(). Исходный код addWaiter() выглядит следующим образом:

private Node addWaiter(Node mode) {
		// 1. 将当前线程构建成Node类型
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        // 2. 当前尾节点是否为null?
		Node pred = tail;
        if (pred != null) {
			// 2.2 将当前节点尾插入的方式插入同步队列中
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
		// 2.1. 当前同步队列尾节点为null,说明当前线程是第一个加入同步队列进行等待的线程
        enq(node);
        return node;
}

Анализ можно увидеть в комментариях выше. Логика программы в основном делится на две части: **1.Хвостовой узел текущей очереди синхронизации является нулевым, и для вставки вызывается метод enq() 2.Хвостовой узел текущей очереди не является нулевым , затем вставка хвоста (используется метод compareAndSetTail(). ) для присоединения к команде. **Кроме того, будет еще вопрос: еслиif (compareAndSetTail(pred, node))Что, если оно ложно? Он продолжит выполнение метода enq(), и, очевидно, compareAndSetTail является операцией CAS.Вообще говоря, если операция CAS не удалась, она продолжит вращаться (бесконечный цикл) для повторной попытки. Таким образом, после нашего анализа метод enq() может выполнять две задачи: ** 1. Ставить в очередь, когда текущий хвостовой узел очереди синхронизации имеет значение null 2. Если завершающий узел вставки CAS дает сбой, он отвечает за вращение и попытку . **Так это действительно похоже на то, что мы проанализировали? Только исходный код даст нам ответ :), исходный код enq() выглядит следующим образом:

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
			if (t == null) { // Must initialize
				//1. 构造头结点
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
				// 2. 尾插入,CAS操作失败自旋尝试
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
}

В приведенном выше анализе мы видим, что головной узел будет создан первым на шаге 1, что указывает на то, что очередь синхронизацииСвязанная структура хранения с головным узлом. По сравнению с неведущим узлом ведущий узел получит большее удобство в операциях постановки в очередь и исключения из очереди, поэтому очередь синхронизации выбирает структуру хранения цепочки ведущего узла. Итак, каково время инициализации очереди ведущего узла? естественно вКогда хвост равен нулю, то есть текущий поток вставляется в очередь синхронизации в первый раз.. Метод compareAndSetTail(t, node) будет использовать операцию CAS для установки хвостового узла.for (;;)Бесконечный цикл for продолжает попытки, пока не завершится успешно. Таким образом, метод enq() можно резюмировать следующим образом:

  1. Когда текущий поток первым присоединится к очереди синхронизации, вызовите метод compareAndSetHead(new Node()) для завершения инициализации головного узла связанной очереди.;
  2. Spin продолжает пытаться вставить хвостовой узел CAS, пока не добьется успеха.

Теперь, когда мы знаем, как оборачивать поток, которому не удалось получить эксклюзивную блокировку, в узел и вставлять его в очередь синхронизации? Тогда есть следующий вопрос? Что будут делать узлы (потоки) в очереди синхронизации, чтобы гарантировать, что у них есть шанс получить монопольную блокировку? С такой проблемой давайте посмотрим на методAcquireQueued(), что очень понятно из названия метода.Функция этого метода состоит в том, чтобы поставить в очередь процесс получения блокировок.Исходный код выглядит следующим образом:

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
				// 1. 获得当前节点的先驱节点
                final Node p = node.predecessor();
				// 2. 当前节点能否获取独占式锁					
				// 2.1 如果当前节点的先驱节点是头结点并且成功获取同步状态,即可以获得独占式锁
                if (p == head && tryAcquire(arg)) {
					//队列头指针用指向当前节点
                    setHead(node);
					//释放前驱节点
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
				// 2.2 获取锁失败,线程进入等待状态等待获取独占式锁
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
}

Логика программы отмечена комментариями, в целом это процесс вращения (для (;;)), код сначала получает узел-предшественник текущего узла,Если узел-первопроходец является головным узлом и успешно получает состояние синхронизации (если (p == head && tryAcquire(arg))), поток, на который указывает текущий узел, может получить блокировку. И наоборот, если получение блокировки не удается, она переходит в состояние ожидания. Общая схема выглядит следующим образом:

自旋获取锁整体示意图.png

Успешное получение блокировки, операция удаления из очереди

Логика исключения из очереди узла, получившего блокировку, такова:

//队列头结点引用指向当前节点
setHead(node);
//释放前驱节点
p.next = null; // help GC
failed = false;
return interrupted;

Метод setHead():

private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
}

Установить текущий узел как головной узел очереди с помощью метода setHead(), а затем установить следующее поле предыдущего головного узла равным нулю, а поле pre равным нулю, то есть отключенным от очереди, и нет ссылка для облегчения GC Память исправлена. Схематическая диаграмма выглядит следующим образом:

当前节点引用线程获取锁,当前节点设置为队列头结点.png

Затем, когда получение блокировки не удается, вызываются метод shouldParkAfterFailedAcquire() и метод parkAndCheckInterrupt(), чтобы посмотреть, что они сделали. Исходный код метода shouldParkAfterFailedAcquire():

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

Основная логика метода shouldParkAfterFailedAcquire() заключается в использованииcompareAndSetWaitStatus(pred, ws, Node.SIGNAL)Используйте CAS, чтобы установить состояние узла с INITIAL на SIGNAL, указывающее, что текущий поток заблокирован. Когда настройка compareAndSetWaitStatus не выполняется, это означает, что метод shouldParkAfterFailedAcquire возвращает значение false, а затем он будет продолжать повторять попытки в бесконечном цикле for (;;) в методе AcquireQueued() до тех пор, пока compareAndSetWaitStatus не установит бит состояния узла в SIGNAL и shouldParkAfterFailedAcquire. возвращает true перед выполнением метода parkAndCheckInterrupt(), исходный код этого метода:

private final boolean parkAndCheckInterrupt() {
        //使得该线程阻塞
		LockSupport.park(this);
        return Thread.interrupted();
}

Ключом к этому методу является вызов метода LookSupport.park() (LookSupport будет обсуждаться в следующей статье), который используется для блокировки текущего потока. Таким образом, здесь должно быть ясно, что методAcquireQueued() в основном выполняет две вещи во время процесса вращения:

  1. Если узел-предшественник текущего узла является головным узлом и может получить состояние синхронизации, текущий поток может получить блокировку, а выполнение метода завершается и завершается.;
  2. Если получение блокировки не удается, сначала установите состояние узла в SIGNAL, а затем вызовите метод LookSupport.park, чтобы заблокировать текущий поток..

После приведенного выше анализа процесс получения эксклюзивной блокировки, то есть поток выполнения метода Acquire(), показан на следующем рисунке:

独占式锁获取(acquire()方法)流程图.png

3.2 Снятие монопольной блокировки (метод release())

Выпуск эксклюзивных блокировок относительно прост для понимания, давайте посмотрим на исходный код, не говоря ерунды:

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
}

Логику этого кода легче понять. Если состояние синхронизации успешно снято (tryRelease возвращает true), будет выполнен код в блоке if. Когда головной узел, на который указывает head, не равен нулю, а значение состояния узел не равен 0 Будет выполнен метод unparkSuccessor(). Исходный код метода unparkSuccessor:

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */

	//头节点的后继节点
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
		//后继节点不为null时唤醒该线程
        LockSupport.unpark(s.thread);
}

Ключевую информацию об исходном коде см. в комментариях.Сначала получается узел-преемник головного узла.При использовании узла-преемника будет вызван метод LookSupport.unpark(), который разбудит поток, обернутый узлом-преемником узла. следовательно,Каждый раз, когда блокировка освобождается, поток, на который ссылается узел-преемник узла в очереди, будет пробуждаться, что еще раз доказывает, что процесс получения блокировки является процессом FIFO (первым пришел, первым обслужен).

Теперь мы, наконец, перегрызли твердую кость: изучив исходный код, мы глубоко изучили процесс получения и освобождения эксклюзивных блокировок и очереди синхронизации. Подводить итоги:

  1. Если потоку не удается получить блокировку, поток инкапсулируется как узел для операции постановки в очередь.Основными методами являются addWaiter() и enq(). В то же время enq() завершает инициализацию головного узла очередь синхронизации и повторная попытка сбоя операции CAS.;
  2. Поток, получающий блокировку, является вращающимся процессом.Тогда и только тогда, когда предшествующий узел текущего узла является головным узлом и успешно получает состояние синхронизации, узел удаляется из очереди, то есть поток, на который ссылается узел, получает блокировку, в противном случае , при невыполнении условия поток удаляется из очереди, для блокировки потока будет вызываться метод LookSupport.park();
  3. Когда блокировка будет снята, узел-преемник будет разбужен;

В целом:При получении состояния синхронизации AQS поддерживает очередь синхронизации, и поток, которому не удается получить состояние синхронизации, присоединяется к очереди для вращения; условием для удаления очереди (или остановки вращения) является то, что предшествующий узел является головным узлом. и успешно получено состояние синхронизации. При освобождении состояния синхронизации синхронизатор вызовет метод unparkSuccessor(), чтобы разбудить узел-преемник.

Эксклюзивное изучение функции блокировки

3.3 Блокировка получения прерывания (метод AcquireInterruptably)

Мы знаем, что у блокировки есть некоторые более удобные функции, чем у синхронизации, такие как возможность реагировать на прерывания и ожидание таймаутов.Теперь мы все еще используем метод изучения исходного кода, чтобы увидеть, как реагировать на прерывания. Для прерываемой блокировки можно вызвать метод lock.lockInterruptably(), а нижний уровень этого метода вызовет методAcquireInterruptablyAQS.Исходный код:

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
		//线程获取锁失败
        doAcquireInterruptibly(arg);
}

Метод doAcquireInterruptily вызывается после того, как не удалось получить состояние синхронизации:

private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
	//将节点插入到同步队列中
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            //获取锁出队
			if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
				//线程中断抛异常
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

Ключевую информацию смотрите в комментариях.Теперь этот код очень легко читать :), и логика метода Acquire почти такая же, разница только в том, когдаparkAndCheckInterruptКогда он возвращает true, то есть когда поток заблокирован, поток прерывается, и код выдает прерываемое исключение.

3.4 Время ожидания для получения блокировки (метод tryAcquireNanos())

При вызове lock.tryLock(timeout, TimeUnit) достигается эффект ожидания получения блокировки с течением времени.Этот метод возвращает значение в трех случаях:

  1. В течение времени ожидания текущий поток успешно получил блокировку;
  2. Текущий поток был прерван в течение времени ожидания;
  3. Если время ожидания истекло, а блокировка не была получена, вернуть false.

Мы все еще узнаем, как реализован нижний слой, читая исходный код.Этот метод вызовет метод AQS tryAcquireNanos().Исходный код:

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
		//实现超时等待的效果
        doAcquireNanos(arg, nanosTimeout);
}

Очевидно, что этот исходный код, наконец, полагается на метод doAcquireNanos для достижения эффекта ожидания тайм-аута.Исходный код этого метода выглядит следующим образом:

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
	//1. 根据超时时间和当前时间计算出截止时间
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
			//2. 当前线程获得锁出队列
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
			// 3.1 重新计算超时时间
            nanosTimeout = deadline - System.nanoTime();
            // 3.2 已经超时返回false
			if (nanosTimeout <= 0L)
                return false;
			// 3.3 线程阻塞等待 
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            // 3.4 线程被中断抛出被中断异常
			if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

Логика программы представлена ​​на рисунке:

超时等待式获取锁(doAcquireNanos()方法)

Логика программы в основном такая же, как монопольная блокировка может реагировать на прерванное получение.Единственное отличие состоит в том, что после сбоя получения блокировки, при обработке тайм-аута, на первом этапе, теоретический срок будет рассчитан в соответствии с к текущему времени и времени таймаута. , например, текущее время 8ч10мин, а время таймаута 10мин, то согласноdeadline = System.nanoTime() + nanosTimeoutРасчетное системное время при достижении тайм-аута составляет 8 часов 10 минут + 10 минут = 8 часов 20 минут. затем согласноdeadline - System.nanoTime()Вы можете судить, истекло ли время, например, текущее системное время составляет 8 часов 30 минут, что явно превышает теоретическое системное время 8 часов 20 минут.deadline - System.nanoTime()Вычисление является отрицательным числом, и оно, естественно, вернет false между суждениями If на шаге 3.2. Если нет тайм-аута, то есть если if на шаге 3.2 считается истинным, он продолжит выполнение шага 3.3, чтобы пройтиLockSupport.parkNanosТекущий поток блокируется, а обнаружение прерывания добавляется на шаге 3.4.Если прерывание обнаружено, сразу создается исключение прерывания.

4. Общий замок

4.1 Получение общих замков (метод AcquireShared())

После разговора о реализации эксклюзивных блокировок с помощью AQS давайте продолжим рассмотрение того, как реализуются общие блокировки? Метод получения общей блокировки —AcquireShared, а исходный код:

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

Логику этого исходного кода легко понять. В этом методе метод tryAcquireShared будет вызываться первым. Возвращаемое значение tryAcquireShared является типом int. Когда возвращаемое значение больше или равно 0, конец метода указывает, что блокировка была успешно получена. В противном случае это означает, что состояние синхронизации было получено. Ошибка означает, что указанный поток не может получить блокировку, и будет выполнен метод doAcquireShared. Исходный код этого метода:

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
					// 当该节点的前驱节点是头结点且成功获取同步状态
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

Разве не было бы легко увидеть этот код сейчас? Логика почти такая же, как и при приобретении эксклюзивной блокировки, а условия выхода в процессе спина здесь таковы.Если узел-предшественник текущего узла является головным узлом, а возвращаемое значение tryAcquireShared(arg) больше или равно 0, состояние синхронизации может быть успешно получено..

4.2 Снятие общих блокировок (метод releaseShared())

Снятие общей блокировки вызовет метод releaseShared в AQS:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

Когда состояние синхронизации будет успешно освобождено, tryReleaseShared продолжит выполнение метода doReleaseShared:

private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

Этот метод немного отличается от процесса снятия эксклюзивной блокировки.В процессе снятия разделяемой блокировки для параллельных компонентов, которые могут поддерживать одновременный доступ нескольких потоков, необходимо убедиться, что несколько потоков могут безопасно снять состояние синхронизации. используется здесь. Гарантируется, что в случае сбоя продолжения операции CAS она будет повторена в следующем цикле.

4.3 Прерывание (метод AcquireSharedInterruptably()), ожидание тайм-аута (метод tryAcquireSharedNanos())

Реализация прерываемой блокировки и ожидания тайм-аута почти такая же, как реализация монопольной блокировки, получение прерываемой блокировки и ожидание тайм-аута.Специфика не будет описана.Если вы понимаете вышеизложенное, ваше понимание этой части придет само собой.

Благодаря этой статье я углубил понимание лежащей в основе реализации AQS и заложил основу для понимания принципов реализации параллельных компонентов. Учиться нет конца, и продолжайте радовать :); Если вы считаете, что это хорошо, пожалуйста, дайте ему палец вверх, хе-хе.

использованная литература

Искусство параллельного программирования на Java