ReentrantLock Анализ исходного кода мертвой серии синхронизации Java (2) — условная блокировка

Java

вопрос

(1) Что такое условная блокировка?

(2) Для каких сценариев подходят условные блокировки?

(3) Просыпается ли await() условной блокировки, когда другие потоки сигнализируют()?

Введение

Условная блокировка относится к блокировке, которая используется, когда обнаруживается, что текущий бизнес-сценарий не может быть обработан сам по себе после получения блокировки, и необходимо дождаться наступления определенного условия для продолжения обработки.

Например, в блокирующей очереди, когда в очереди нет элемента, вытолкнуть элемент невозможно, в это время нужно заблокировать по условию notEmpty, дождаться, пока другие потоки положат в нее элемент, будить до условия notEmpty, и текущий поток будет. Вы можете продолжать выполнять поведение «вытолкнуть элемент».

Обратите внимание, что условие здесь должно бытьждать после получения блокировки, что соответствует условной блокировке ReentrantLock, то есть метод condition.await() может быть вызван только после получения блокировки.

В Java реализация условных блокировок находится в классе ConditionObject AQS. ConditionObject реализует интерфейс Condition. Давайте используем пример для изучения условных блокировок.

Пример использования

public class ReentrantLockTest {
    public static void main(String[] args) throws InterruptedException {
        // 声明一个重入锁
        ReentrantLock lock = new ReentrantLock();
        // 声明一个条件锁
        Condition condition = lock.newCondition();

        new Thread(()->{
            try {
                lock.lock();  // 1
                try {
                    System.out.println("before await");  // 2
                    // 等待条件
                    condition.await();  // 3
                    System.out.println("after await");  // 10
                } finally {
                    lock.unlock();  // 11
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        
        // 这里睡1000ms是为了让上面的线程先获取到锁
        Thread.sleep(1000);
        lock.lock();  // 4
        try {
            // 这里睡2000ms代表这个线程执行业务需要的时间
            Thread.sleep(2000);  // 5
            System.out.println("before signal");  // 6
            // 通知条件已成立
            condition.signal();  // 7
            System.out.println("after signal");  // 8
        } finally {
            lock.unlock();  // 9
        }
    }
}

Приведенный выше код очень прост. Один поток ожидает выполнения условия, а другой поток сообщает, что условие установлено. Следующие числа представляют фактический порядок выполнения кода. Если вы понимаете этот порядок, вы сможете понять основные условия блокировки.

Анализ исходного кода

Основные свойства объекта ConditionObject

public class ConditionObject implements Condition, java.io.Serializable {
    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;
}

Видно, что в условной блокировке также поддерживается очередь, чтобы отличить ее от очереди AQS, я называю ее здесь условной очередью, firstWaiter — головной узел очереди, lastWaiter — хвостовой узел очереди. , что они делают? Тогда смотри.

Метод lock.newCondition()

Создайте новую условную блокировку.

// ReentrantLock.newCondition()
public Condition newCondition() {
    return sync.newCondition();
}
// ReentrantLock.Sync.newCondition()
final ConditionObject newCondition() {
    return new ConditionObject();
}
// AbstractQueuedSynchronizer.ConditionObject.ConditionObject()
public ConditionObject() { }

Последнее, что нужно сделать, чтобы создать новую условную блокировку, — это вызвать класс ConditionObject в AQS для создания экземпляра условной блокировки.

метод condition.await()

Метод condition.await() указывает, что теперь мы хотим дождаться появления условия.

// AbstractQueuedSynchronizer.ConditionObject.await()
public final void await() throws InterruptedException {
    // 如果线程中断了,抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 添加节点到Condition的队列中,并返回该节点
    Node node = addConditionWaiter();
    // 完全释放当前线程获取的锁
    // 因为锁是可重入的,所以这里要把获取的锁全部释放
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 是否在同步队列中
    while (!isOnSyncQueue(node)) {
        // 阻塞当前线程
        LockSupport.park(this);
        
        // 上面部分是调用await()时释放自己占有的锁,并阻塞自己等待条件的出现
        // *************************分界线*************************  //
        // 下面部分是条件已经出现,尝试去获取锁
        
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    
    // 尝试获取锁,注意第二个参数,这是上一章分析过的方法
    // 如果没获取到会再次阻塞(这个方法这里就不贴出来了,有兴趣的翻翻上一章的内容)
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 清除取消的节点
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // 线程中断相关
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
// AbstractQueuedSynchronizer.ConditionObject.addConditionWaiter
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 如果条件队列的尾节点已取消,从头节点开始清除所有已取消的节点
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        // 重新获取尾节点
        t = lastWaiter;
    }
    // 新建一个节点,它的等待状态是CONDITION
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // 如果尾节点为空,则把新节点赋值给头节点(相当于初始化队列)
    // 否则把新节点赋值给尾节点的nextWaiter指针
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    // 尾节点指向新节点
    lastWaiter = node;
    // 返回新节点
    return node;
}
// AbstractQueuedSynchronizer.fullyRelease
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        // 获取状态变量的值,重复获取锁,这个值会一直累加
        // 所以这个值也代表着获取锁的次数
        int savedState = getState();
        // 一次性释放所有获得的锁
        if (release(savedState)) {
            failed = false;
            // 返回获取锁的次数
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}
// AbstractQueuedSynchronizer.isOnSyncQueue
final boolean isOnSyncQueue(Node node) {
    // 如果等待状态是CONDITION,或者前一个指针为空,返回false
    // 说明还没有移到AQS的队列中
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // 如果next指针有值,说明已经移到AQS的队列中了
    if (node.next != null) // If has successor, it must be on queue
        return true;
    // 从AQS的尾节点开始往前寻找看是否可以找到当前节点,找到了也说明已经在AQS的队列中了
    return findNodeFromTail(node);
}

Здесь есть несколько сложных моментов:

(1) очередь Condition не совсем такая же, как очередь AQS;

AQS的队列头节点是不存在任何值的,是一个虚节点;

Condition的队列头节点是存储着实实在在的元素值的,是真实节点。

(2) Изменения в различных состояниях ожидания (waitStatus);

首先,在条件队列中,新建节点的初始等待状态是CONDITION(-2);

其次,移到AQS的队列中时等待状态会更改为0(AQS队列节点的初始等待状态为0);

然后,在AQS的队列中如果需要阻塞,会把它上一个节点的等待状态设置为SIGNAL(-1);

最后,不管在Condition队列还是AQS队列中,已取消的节点的等待状态都会设置为CANCELLED(1);

另外,后面我们在共享锁的时候还会讲到另外一种等待状态叫PROPAGATE(-3)。

(3) Похожие имена;

AQS中下一个节点是next,上一个节点是prev;

Condition中下一个节点是nextWaiter,没有上一个节点。

Если вы понимаете эти моменты, все еще легко и приятно понимать приведенный выше код.Если вы этого не понимаете, Тонг Гэ указал на это здесь, я надеюсь, что вы снова вернетесь к приведенному выше коду.

Следующие суммируют общий процесс метода await ():

(1) Создайте новый узел и добавьте его в очередь условий;

(2) Полностью снять блокировку, занятую текущим потоком;

(3) Заблокировать текущий поток и дождаться появления условия;

(4) Произошло условие (узел был перемещен в очередь AQS в это время), попытайтесь получить блокировку;

То есть внутри метода await() на самом деле先释放锁->等待条件->再次获取锁процесс.

СООТВЕТСТВЕННЫЙ () Метод

Метод condition.signal() уведомляет о возникновении условия.

// AbstractQueuedSynchronizer.ConditionObject.signal
public final void signal() {
    // 如果不是当前线程占有着锁,调用这个方法抛出异常
    // 说明signal()也要在获取锁之后执行
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 条件队列的头节点
    Node first = firstWaiter;
    // 如果有等待条件的节点,则通知它条件已成立
    if (first != null)
        doSignal(first);
}
// AbstractQueuedSynchronizer.ConditionObject.doSignal
private void doSignal(Node first) {
    do {
        // 移到条件队列的头节点往后一位
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // 相当于把头节点从队列中出队
        first.nextWaiter = null;
        // 转移节点到AQS队列中
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
// AbstractQueuedSynchronizer.transferForSignal
final boolean transferForSignal(Node node) {
    // 把节点的状态更改为0,也就是说即将移到AQS队列中
    // 如果失败了,说明节点已经被改成取消状态了
    // 返回false,通过上面的循环可知会寻找下一个可用节点
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    // 调用AQS的入队方法把节点移到AQS的队列中
    // 注意,这里enq()的返回值是node的上一个节点,也就是旧尾节点
    Node p = enq(node);
    // 上一个节点的等待状态
    int ws = p.waitStatus;
    // 如果上一个节点已取消了,或者更新状态为SIGNAL失败(也是说明上一个节点已经取消了)
    // 则直接唤醒当前节点对应的线程
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    // 如果更新上一个节点的等待状态为SIGNAL成功了
    // 则返回true,这时上面的循环不成立了,退出循环,也就是只通知了一个节点
    // 此时当前节点还是阻塞状态
    // 也就是说调用signal()的时候并不会真正唤醒一个节点
    // 只是把节点从条件队列移到AQS队列中
    return true;
}

Общий поток метода signal() таков:

(1) Начать с головного узла очереди условий, чтобы найти узел в состоянии отсутствия отмены;

(2) Переместите его из очереди условий в очередь AQS;

(3) и переместить только один узел;

Обратите внимание, что здесь вызов метода signal() на самом деле не пробуждает узел, так когда же он пробуждает узел?

Помните пример из самого начала? Вернитесь назад и взгляните поближе.После метода signal() в конечном итоге будет выполнен метод lock.unlock(), и в это время будет пробужден узел.Если пробужденный узел раньше был условным узлом, ожидание () будет продолжать выполняться код ниже «разделительной линии».

Все кончено, почувствуй это внимательно ^^

Если вам нужно использовать диаграмму для ее представления, я думаю, что следующая диаграмма может быть примерно представлена ​​(вот временная диаграмма, но на самом деле ее нельзя рассматривать как реальную временную диаграмму, просто поймите ее):

ReentrantLock

Суммировать

(1) Блокировка с повторным входом относится к блокировке, которая может быть получена повторно, то есть поток будет автоматически получать блокировку, когда он попытается получить блокировку после получения блокировки;

(2) Реентерабельная блокировка в ReentrantLock реализуется путем непрерывного накопления значения переменной состояния;

(3) выпуск ReentrantLock должен соответствовать приобретению, то есть он должен выпускаться несколько раз после того, как был приобретен;

(4) ReentrantLock по умолчанию использует нечестный режим, потому что нечестный режим более эффективен;

(5) Условная блокировка относится к блокировке, используемой для ожидания выполнения определенного условия;

(6) Классический сценарий использования условных блокировок — блокировка по условию notEmpty, когда очередь пуста;

(7) Условная блокировка в ReentrantLock реализована через внутренний класс ConditionObject AQS;

(8) Перед снятием блокировки после получения блокировки необходимо использовать оба метода await() и signal();

(9) Метод await() создаст новый узел и поместит его в очередь условий, затем полностью снимет блокировку, затем заблокирует текущий поток и дождется появления условия;

(10) Метод signal() найдет первый доступный узел в очереди условий и переместит его в очередь AQS;

(11) Вызвать метод unlock() при вызове метода signal(), чтобы действительно разбудить узел, который блокирует условие (в этот момент узел уже находится в очереди AQS);

(12) После этого узел снова попытается получить блокировку, и логика в основном такая же, как и в lock().

пасхальные яйца

Почему в java есть синхронизация собственного ключевого слова и нужно реализовать ReentrantLock?

Во-первых, это реентерабельные блокировки;

Во-вторых, все они по умолчанию используют нечестный режим;

Затем... мы углубимся в синхронизацию ReentrantLock VS в следующей главе.

Рекомендуемое чтение

  1. ReentrantLock анализ исходного кода тупиковой серии синхронизации Java (1) - справедливая блокировка, нечестная блокировка

  2. Начало AQS в серии синхронизации Java

  3. Напишите блокировку самостоятельно в серии «Синхронизация Java»

  4. Небезопасный анализ мертвого магического класса Java

  5. JMM (модель памяти Java) из мертвой серии синхронизации Java

  6. Неустойчивый анализ мертвой серии синхронизации Java

  7. Синхронный анализ мертвой серии синхронизации Java


Добро пожаловать, чтобы обратить внимание на мою общедоступную учетную запись «Брат Тонг читает исходный код», проверить больше статей из серии исходного кода и поплавать в океане исходного кода с братом Тонгом.

qrcode