Параллельное программирование — подробная блокировка AQS CLH

Node.js Java задняя часть Безопасность
  1. Начните с метода приобретения — приобретите
  2. Зачем AQS нужен виртуальный головной узел
  3. Как метод Release снимает блокировку
  4. Суммировать

предисловие

AQS — это ядро ​​JUC, которое инкапсулирует получение и высвобождение ресурсов в нашем предыдущемАнализ исходного кода AQS параллельного программированияВ статье мы проанализировали получение и освобождение блокировки от ReentranLock. Но мне нужно еще раз объяснить основную блокировку CLH AQS.

Вот цитата из чьего-то объяснения CLH:

CLH CLH (замки Крейга, Лэндина и Хагерстена): это спин-блокировка, которая обеспечивает отсутствие голодания и обеспечивает справедливость в порядке поступления заявок.

Блокировка CLH также является масштабируемой, высокопроизводительной и справедливой циклической блокировкой, основанной на связном списке. Поток приложения использует только локальные переменные. Он постоянно опрашивает состояние прекурсора. Если он обнаруживает, что прекурсор снял блокировку , это заканчивает вращение.

Дизайн Java AQS представляет собой оптимизацию или вариант блокировки CLH.

Начнем с кода.

1. Начните с метода приобретения — приобретите

Метод получения — это обычный способ получить блокировку. код показывает, как показано ниже:

public final void acquireQueued(int arg) {
	// 当 tryAcquire 返回 true 就说明获取到锁了,直接结束。
	// 反之,返回 false 的话,就需要执行后面的方法。
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

Пока метод tryAcquire подкласса возвращает false, это означает, что событие получения блокировки требует, чтобы вы добавили себя в очередь.

private Node addWaiter(Node mode) {
	// 创建一个独占类型的节点
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    // 如果 tail 节点不是 null,就将新节点的 pred 节点设置为 tail 节点。
    // 并且将新节点设置成 tail 节点。
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 如果 tail 节点是  null,或者 CAS 设置 tail 失败。
    // 在 enq 方法中处理
    enq(node);
    return node;
}

Добавил себя в хвост и обновил узел хвоста.

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 如果 tail 是 null,就创建一个虚拟节点,同时指向 head 和 tail,称为 初始化。
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {// 如果不是 null
        	// 和 上个方法逻辑一样,将新节点追加到 tail 节点后面,并更新队列的 tail 为新节点。
        	// 只不过这里是死循环的,失败了还可以再来 。
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

Какова логика метода enq? Когда хвост равен нулю (очередь не инициализирована), очередь необходимо инициализировать. CAS не может установить хвост, он тоже пойдет сюда, а хвост нужно устанавливать циклически в методе enq. пока успешно.

Примечание: это создаствиртуальный узел.

2. Зачем AQS нужен виртуальный головной узел

Зачем создавать виртуальный узел?

Все начинается с переменной waitStatus класса Node, именуемой ws. У каждого узла есть переменная ws для некоторых флагов состояния этого узла. Начальное состояние равно 0. Если он отменен, узел равен 1, то он будет очищен AQS.

Также есть важное состояние: SIGNAL — -1, что означает, что когда текущий узел снимает блокировку, ему нужно разбудить следующий узел.

Все, каждый узел должен установить ws предыдущего узла в SIGNAL перед переходом в спящий режим.Иначе вы никогда не сможете проснуться.

А зачем тебе такой ws? - Предотвращение повторных операций. Предположим, когда узел был освобожден, а другой поток в это время не знает, освободить его снова. На этот раз это неправильно.

Итак, переменная нужна, чтобы гарантировать состояние этого узла. И изменение этого узла должно обеспечить безопасность потоков посредством операций CAS.

Итак, вернемся к нашему предыдущему вопросу: зачем создавать виртуальный узел?

Каждый узел должен установить состояние ws переднего узла в SIGNAL, поэтому должен быть передний узел, и этот передний узел фактически является узлом, удерживающим блокировку в данный момент.

Проблема в том, что есть граничная проблема: ** А как насчет первого узла? **У него нет предузла.

Затем создайте поддельный.

Вот почему создается фиктивный узел.

Подвести итог:Каждому узлу необходимо установить состояние ws переднего узла (это состояние необходимо для обеспечения согласованности данных), а у первого узла нет переднего узла, поэтому необходимо создать виртуальный узел..

Вернитесь к нашему методу acceptQueued для подтверждения:

// 这里返回的节点是新创建的节点,arg 是请求的数量
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
        	// 找上一个节点
            final Node p = node.predecessor();
            // 如果上一个节点是 head ,就尝试获取锁
            // 如果 获取成功,就将当前节点设置为 head,注意 head 节点是永远不会唤醒的。
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 在获取锁失败后,就需要阻塞了。
            // shouldParkAfterFailedAcquire ---> 检查上一个节点的状态,如果是 SIGNAL 就阻塞,否则就改成 SIGNAL。
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

Этот метод имеет 2 логики:

  1. Как приостановить себя?
  2. Что вы делаете, когда просыпаетесь?

Сначала ответьте на второй вопрос: что вы делаете, когда просыпаетесь?

Попробуйте получить блокировку.После успеха установите себя в качестве главы и отключитесь от следующего.

Посмотрите на второй вопрос: как приостановить себя?

Примечание. Прежде чем приостановить себя, вам нужно установить состояние ws передней ноды в SIGNAL и сказать ему: не забудьте разбудить меня, когда вы снимите блокировку.

Конкретная логика находится в методе shouldParkAfterFailedAcquire:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    //  如果他的上一个节点的 ws 是 SIGNAL,他就需要阻塞。
    if (ws == Node.SIGNAL)
    	// 阻塞
        return true;
    // 前任被取消。 跳过前任并重试。
    if (ws > 0) {
        do {
        	// 将前任的前任 赋值给 当前的前任
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        // 将前任的前任的 next 赋值为 当前节点
        pred.next = node;
    } else { 
    	// 如果没有取消 || 0 || CONDITION || PROPAGATE,那么就将前任的 ws 设置成 SIGNAL.
    	// 为什么必须是 SIGNAL 呢?
    	// 答:希望自己的上一个节点在释放锁的时候,通知自己(让自己获取锁)
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    // 重来
    return false;
}

Основная логика этого метода заключается в изменении состояния предыдущего узла на СИГНАЛ. Среди них, если узел-предшественник отменен, он будет пропущен.

Так что, когда передний узел снимет блокировку, он обязательно разбудит этот узел. Посмотрите на логику релиза.

3. Как метод освобождения снимает блокировку

Первая волна кода:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        // 所有的节点在将自己挂起之前,都会将前置节点设置成 SIGNAL,希望前置节点释放的时候,唤醒自己。
        // 如果前置节点是 0 ,说明前置节点已经释放过了。不能重复释放了,后面将会看到释放后会将 ws 修改成0.
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

Из решения этого метода видно, что head не должен быть равен 0. Зачем? Когда узел пытается приостановить себя, он устанавливает для переднего узла значение СИГНАЛ -1, даже если это первый узел, присоединившийся к очереди, после неудачной попытки получить блокировку ws, установленный виртуальным узлом, будет установлен на СИГНАЛ -1. .

И это решение также предназначено для предотвращения многократного выпуска нескольких потоков.

Так что, конечно, после снятия блокировки состояние ws обязательно будет установлено на 0. Предотвращение повторных операций.

код показывает, как показано ниже:

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
    	// 将 head 节点的 ws 改成 0,清除信号。表示,他已经释放过了。不能重复释放。
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    // 如果 next 是 null,或者 next 被取消了。就从 tail 开始向上找节点。
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 从尾部开始,向前寻找未被取消的节点,直到这个节点是 null,或者是 head。
        // 也就是说,如果 head 的 next 是 null,那么就从尾部开始寻找,直到不是 null 为止,找到这个 head 就不管了。
        // 如果是 head 的 next 不是 null,但是被取消了,那这个节点也会被略过。
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 唤醒 head.next 这个节点。
    // 通常这个节点是 head 的 next。
    // 但如果 head.next 被取消了,就会从尾部开始找。
    if (s != null)
        LockSupport.unpark(s.thread);
}

Если ws меньше 0, мы предполагаем, что это СИГНАЛ, и изменяем его на 0. Это подтверждает нашу идею.

Если его next равен нулю, это означает, что next отменяется, тогда начинайте искать с хвоста (нет возможности не начать с хвоста). Разумеется, в процессе поиска отказавший узел также был пропущен.

Наконец, разбудите его.

Помните, как выглядит логика после пробуждения?

Для обзора: возьмите замок, назначьте себя главой, прежнюю голову отсоедините от себя.

4. Резюме

Для блокировки CLH, используемой AQS, требуется виртуальный головной узел, функция которого заключается в предотвращении многократного снятия блокировки. Когда у первого узла, входящего в очередь, нет предшествующего узла, создается виртуальный узел.

Попробуем объяснить AQS на картинке:

  1. При добавлении нового узла

    image.png

  2. обновить хвост

image.png

  1. При пробуждении узла предыдущая голова отменялась

image.png