Параллелизм Java — условие

Java задняя часть Java EE опрос

Введение

Подобно методу монитора базового класса Object, интерфейс Condition также предоставляет аналогичные методы, которые можно использовать с Lock для реализации режима ожидания/уведомления.

Метод монитора объекта сравнивается с интерфейсом Condition:

Контраст Методы мониторинга объектов Condition
Предварительные условия получить блокировку объекта Вызовите Lock.lock(), чтобы получить блокировку
Вызовите Lock.newCondition(), чтобы получить объект Condition
метод вызова позвонить напрямую
Например: object.wait()
позвонить напрямую
Например: условие. ожидание ()
количество ожидающих очередей Один несколько
Текущий поток снимает блокировку и переходит в состояние ожидания. служба поддержки служба поддержки
Текущий поток снимает блокировку и переходит в состояние ожидания и не отвечает на прерывания в состоянии ожидания не поддерживается служба поддержки
Текущий поток снимает блокировку и переходит в состояние ожидания тайм-аута. служба поддержки служба поддержки
Текущий поток освобождает блокировку и переходит в состояние ожидания до определенного момента в будущем. не поддерживается служба поддержки
Разбудить поток в очереди ожидания служба поддержки служба поддержки
Разбудить все потоки в очереди ожидания служба поддержки служба поддержки

Реализация условия

ПредставляемAQSСинхронизатор знает, что он поддерживаетсяОдинОчередь синхронизации фактически поддерживаетсянесколькоочередь ожидания,Обе очереди являются очередями FIFO., F4 смотрит на класс реализации Condition, и вы можете обнаружить, что он имеет только эти два класса (разница между этими двумя классами в том, что состояние синхронизации состояния — это int и long)

очередь ожидания

В частности, давайте взглянем на внутренний класс ConditionObject в классе реализации Condition AQS.


    public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        // 头节点
        private transient Node firstWaiter;
        // 尾结点
        private transient Node lastWaiter;

        public ConditionObject() { }
        ...
    }    

Структура показана на рисунке (очередь в один конец):

await()

Процесс метода await() эквивалентенГоловной узел очереди синхронизации (узел, получивший блокировку) перемещается в очередь ожидания Условия.


        public final void await() throws InterruptedException {
            // 判断当前线程是否中断
            if (Thread.interrupted())
                throw new InterruptedException();
            // 当前线程加入等待队列    
            Node node = addConditionWaiter();
            // 释放同步状态
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            // 判断此节点是否在同步队列中,若不在直至在同步队列为止
            while (!isOnSyncQueue(node)) {
                // 阻塞当前线程
                LockSupport.park(this);
                // 若线程已经中断则退出
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 竞争同步状态
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

Во-первых, определите, прерван ли поток. Если прерывание прервано, сразу генерируется исключение. В противном случае вызывается addConditionWaiter(), чтобы обернуть поток в узел и присоединиться к очереди ожидания.


        private Node addConditionWaiter() {
            // 获取等待队列的尾节点
            Node t = lastWaiter;
            // 若尾节点状态不为CONDITION,清除节点
            if (t != null && t.waitStatus != Node.CONDITION) {
                // 清除等待队列中所有状态不为CONDITION的节点
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            // 将当前线程包装成Node节点
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            // 尾插节点
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            // 将节点置为尾节点    
            lastWaiter = node;
            return node;
        }

Из исходного кода добавьте узел в очередь ожиданияне использует CAS, потому что поток, вызывающий метод await(), должен быть потоком, который получил блокировку, то есть этот процесс гарантируется блокировкой для обеспечения безопасности потока.После успешного присоединения к очереди ожидания вызовите fullRelease(), чтобы освободить состояние синхронизации


    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            // 获取同步状态
            int savedState = getState();
            // 释放锁
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

Вызовите метод шаблона release() AQS, чтобы отменить состояние синхронизации и разбудить поток, на который ссылается узел-преемник головного узла в очереди синхронизации., возвращается в обычном режиме, если выпуск выполнен успешно, в противном случае генерируется исключение. Затем вызовите isOnSyncQueue(), чтобы определить, находится ли узел в очереди синхронизации.


    final boolean isOnSyncQueue(Node node) {
        // 若状态为CONDITION或者前驱节点为null
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        // 若后继节点不为null,表明节点肯定在同步队列中    
        if (node.next != null) // If has successor, it must be on queue
            return true;
        // 从同步队列尾节点找节点    
        return findNodeFromTail(node);
    }

Если узел не находится в очереди на синхронизацию, он всегда будет находиться в теле цикла while, когдаЭтот поток прерывается или узел, связанный с потоком, перемещается в очередь синхронизации (то есть метод signal или signalAll условия, вызываемого другим потоком), завершит цикл и вызовет методAcquireQueued() для его получения, в противном случае он будет получен через LockSupport.park() в потоке блоков метода тела цикла

Метод await() показывает:

signal()/signalAll()

  • signal()
  • 
        public final void signal() {
            // 判断当前线程是否为获取锁的线程
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            // 唤醒条件队列中的第一个节点
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }
    
    Метод isHeldExclusively() должен быть переписан подклассами, его цель — определить, является ли текущий поток тем потоком, который получает блокировку.
    
        protected boolean isHeldExclusively() {
            throw new UnsupportedOperationException();
        }
    
    doSignal
    
            private void doSignal(Node first) {
                do {
                    // 将头节点从等待队列中移除
                    if ( (firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
                    first.nextWaiter = null;
                } while (!transferForSignal(first) &&
                         (first = firstWaiter) != null);
            }
    
    doSignal() удаляет головной узел из очереди ожидания, а затем вызывает метод transferForSignal() для добавления узла в очередь синхронизации.
    
        final boolean transferForSignal(Node node) {
            // 将node节点状态由CONDITION  CAS置为初始状态0
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
            // 将node节点尾插同步队列,返回插入后同步队列中node节点的前驱节点
            Node p = enq(node);
            int ws = p.waitStatus;
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }
    

    Метод signal() показывает:

  • signalAll()
  • 
            private void doSignalAll(Node first) {
                lastWaiter = firstWaiter = null;
                // 将等待队列中节点从头节点开始逐个移出等待队列,添加到同步队列
                do {
                    Node next = first.nextWaiter;
                    first.nextWaiter = null;
                    transferForSignal(first);
                    first = next;
                } while (first != null);
            }
    
    Метод signalAll() эквивалентен однократному выполнению метода signal() для каждого узла в очереди ожидания, перемещению всех узлов в очереди ожидания в очередь синхронизации и пробуждению потока каждого узла.

    благодарный

    Искусство параллельного программирования на Java