ReentrantLock из Java Lock (2)

Java задняя часть исходный код

Введение

Предыдущий«ReentrantLock блокировки Java (1)»Представлен базовый исходный код ReentrantLock, проанализирован механизм справедливой и нечестной блокировки ReentrantLock, а окончательный анализ ReentrantLock по-прежнему зависит отAbstractQueuedSynchronizerОчередь синхронизации (далее синхронизатор) реализована, поэтому в этой статье начинается анализ реализации кода внутри синхронизатора.Учитывая, что структура кода относительно длинная, маловажный код будет упрощен при анализе исходного кода, но в в конце концов это не повлияет на код.упрощается логически.

2. Анализ синхронизатора

  • Основные свойства синхронизатора

Согласно приведенному выше исходному коду, мы можем знать, чтоAbstractQueuedSynchronizerОбъект узла Node строится внутри, а головной узел и хвостовой узел с изменчивыми атрибутами строятся одновременно, чтобы обеспечить видимость между несколькими потоками.В то же время самое важное - определить состояние переменной типа int, который проанализирован в предыдущей статье. , мы знаем, что решение о том, получил ли ReenTrantLock блокировку, заключается в том, является ли состояние больше 0, равное 0 означает, что блокировка простаивает, а больше 0 означает, что блокировка была получена. Далее мы сосредоточимся на анализе внутренней структуры узла Node и принципа реализации синхронизатора.Исходный код узла выглядит следующим образом:



        static final class Node {
        //共享模式
        static final Node SHARED = new Node();
        //独占模式
        static final Node EXCLUSIVE = null;
        //取消状态
        static final int CANCELLED =  1;
        //唤醒状态
        static final int SIGNAL    = -1;
        //等待条件状态
        static final int CONDITION = -2;
       //传播状态
        static final int PROPAGATE = -3;

       //等待状态
        volatile int waitStatus;

       //上一个节点
        volatile Node prev;

      //下一个节点
        volatile Node next;

      //获取同步状态的线程
        volatile Thread thread;

        //等待队列中的后继节点,如果当前节点是共享的,那么这个nextWaiter=SHARED
        Node nextWaiter;

        //判断当前后继节点是否是共享的
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        //返回当前节点的前一个节点
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

Здесь необходимо выделить следующие свойства:waitStatus, состояние включает в себя несколько констант, объявленных внутри узла, а именно:

постоянное имя Функции
CANCELLED Если значение равно 1, текущий узел переходит в отмененное состояние. Причина в том, что он входит в отмененное состояние из-за прерывания или ожидания тайм-аута. Следует отметить, что после того, как поток узла перейдет в отмененное состояние, состояние будет не изменится, и получение блокировки не будет заблокировано.
SIGNAL Если значение равно -1, поток узла-преемника находится в состоянии ожидания, и если поток текущего узла освобождает состояние синхронизации или отменяется, он уведомляет узел-преемник, так что поток узла-преемника может бежать
CONDITION Значение равно -2, узел находится в очереди ожидания, а поток узла ожидает выполнения условия. Когда другие потоки вызывают метод signal() для условия, узел будет перемещен из очереди ожидания в очередь синхронизации. и добавлено состояние синхронизации.в соревновании
PROPAGATE Значение -3 указывает, что следующее получение общего состояния синхронизации будет распространяться безоговорочно.Например, если головной узел получает общее состояние синхронизации и определяет, что это состояние РАСПРОСТРАНЕНИЕ, он продолжит вызывать doReleaseShared, чтобы последующие узлы продолжали получить замок.
INITIAL Значение равно 0, что указывает на начальное состояние (это должно быть в коде в старой версии. В настоящее время jdk1.8 не показывает объявление начального состояния, потому что переменная int по умолчанию равна 0 во время инициализации)

Анализируя свойства синхронизатора, мы можем примерно начертить принципиальную схему очереди конструктора следующим образом:

Во-первых, синхронизатор объявляет головной узел и хвостовой узел.Головной узел указывает на узел узла, указывая, что узел является головным узлом очереди, а хвостовой узел указывает на узел узла, указывая, что узел является узлом. хвостовой узел В то же время каждый узел имеет атрибуты pre и next, указывает на узел node, а затем строит очередь двусвязного списка FIFO, как показано на рисунке. Давайте рассмотрим основные методы, обычно используемые синхронизатором.

  • Список основных методов синхронизатора

Имя метода Функции
compareAndSetState(int expect, int update) CAS для установки статуса синхронизации
enq(final Node node) Цикл в очереди ожидания, пока очередь не будет успешной
addWaiter(Node mode) Создайте хвостовой узел с текущим потоком и добавьте его в хвост
unparkSuccessor(Node node) Разбудить преемника узла
doReleaseShared() Отпустите состояние синхронизации в режиме обмена
setHeadAndPropagate(Node node, int propagate) Установите узел головки и продолжайте распространять лицензию синхронизации
release(int arg) Эксклюзивное состояние синхронизации выпуска
acquireShared(int arg) Общее состояние синхронизации выпуска
hasQueuedPredecessors() Определить, есть ли поток, ожидающий дольше, чем текущий поток (для справедливых блокировок).
  • Блокировка синхронизатора

Выше приведены основные методы синхронизатора. Мы сосредоточимся на анализе некоторых из вышеперечисленных методов. Чтобы понять, как синхронизатор завершает блокировку, ожидает получения блокировки и освобождает блокировку, мы сначала рассмотрим предыдущую статью, чтобы проанализировать метод Lock(), исходный код выглядит следующим образом:

  /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

Мы обнаружили, что основное внимание уделяется методу accept(1), который предоставляется родительским классом, то есть синхронизатором.Исходный код выглядит следующим образом:

 public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

Исходный код фактически можно разделить на три части:

  • tryAcquire(arg)попробуй получить замок
  • addWaiter(Node.EXCLUSIVE), arg)Создайте узел с текущим потоком и добавьте его в конец очереди
  • acquireQueued(final Node node, int arg)Пусть узел получает состояние синхронизации в бесконечном цикле и выходит из цикла, если это удается

На самом деле, функция этого метода очень ясна, если разбить его на три части. Во-первых, попытаться получить блокировку. Если вы не можете ее получить, добавьте себя в хвост, а затем выполните цикл в очереди, чтобы получить замок. Самая важная частьacquireQueued(final Node node, int arg), исходный код выглядит следующим образом:

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
               //获取节点的前任节点
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) { //如果前任节点是头节点,
                //并且当前节点获取到了锁,
                //也就是说队列头节点释放了锁,同时把当前节点设置为头节点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //判断当前节点是否应该被阻塞,那么就把当前线程阻塞挂起,防止无谓的死循环
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
//该方法主要靠前驱节点判断当前线程是否应该被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * 如果前任节点的状态等于SIGNAL,
             * 说明前任节点获取到了同步状态,当前节点应该被阻塞,返回true
             */
            return true;
        if (ws > 0) {
            /*
             * 前任节点被取消
             */
            do {//循环查找取消节点的前任节点,
            //直到找到不是取消状态的节点,然后剔除是取消状态的节点,
            //关联前任节点的下一个节点为当前节点
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * CAS设置前任节点等待状态为SIGNAL,
             * 设置成功表示当前节点应该被阻塞,下一次循环调用就会
            *  return  true
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
//把当前线程挂起,从而阻塞住线程的调用栈,
//同时返回当前线程的中断状态。
//其内部则是调用LockSupport工具类的park()方法来阻塞该方法。
   private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);//阻塞线程
        return Thread.interrupted();
    }

Выше приведен процесс блокировки lock.lock(), резюмируем:

  • Во-первых, попробуйте получить блокировку напрямую, и получение завершится сразу.
  • Если получение блокировки не удается, создайте хвостовой узел для текущего потока и добавьте его в конец очереди в режиме CAS.
  • В очереди бесконечный цикл определяет, является ли предшествующий узел головным узлом. Если это головной узел, он пытается получить блокировку. Если нет, он приостанавливается и ждет, пока предшествующий узел проснется. можно избежать бесконечного цикла нескольких потоков потребление производительности.
  • Синхронизатор разблокирован

    lock.unlock() Процесс снятия блокировки, анализ исходного кода, старые правила, исходный код:
    //释放锁
   public void unlock() {
        sync.release(1);
    }
    
     public final boolean release(int arg) {
        //尝试释放锁
        if (tryRelease(arg)) {
            Node h = head;
            //如果头节点不为空,并且不是初始状态,也就是不在队列中了
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);//唤醒后继节点
            return true;
        }
        return false;
    }
    //该方法和之前分析的代码类似,主要是设置Sate状态
     protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);//设置同步状态占有线程为null
            }
            setState(c);
            return free;
        }
        
    private void unparkSuccessor(Node node) {
       
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        // 取到当前节点的下一个节点,如果节点不为空就唤醒阻塞的线程
        //如果节点为空,或者节点是取消状态,那么就循环从尾部节点找到
        //当前节点的下一个节点唤醒
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);//唤醒阻塞的线程
    }    

Выше приведен анализ lock.unlock(), а также мы суммируем и анализируем

  • Поскольку первый судья блокировки Успех состояния не составляет 0, поэтому состояние будет отпустить блокировку установлено значение 0, а замок потока владельца устанавливается на NULL
  • Блокировка снимается успешно, а затем последующие узлы в очереди будут разбужены вызовомLockSupport.unpark(s.thread)разбудить нить,LockSupportв основном полагаться наsun.misc.UnsafeОн реализуется классом, который предоставляет методы на аппаратном уровне операционной системы и в данной статье не обсуждается.

В-третьих, вывод

  • На этот раз я в основном анализирую реализацию блокировки и разблокировки синхронизатора AQS.На самом деле, многие классы инструментов синхронизации jdk реализованы синхронизатором AQS.После понимания принципа синхронизатора AQS также очень полезно понять принципы других параллельные инструменты. полезные, такие какCountDownLatch,Semaphore,CyclicBarrier(依赖ReentrantLock)Ждать. Следующая статья «ReentrantReadWriteLock для блокировок Java» продолжает уточнять анализ, анализирует блокировки чтения и записи, а также анализирует, как ReentrantLock реализует такие функции, как повторное получение блокировок чтения и ухудшение блокировки.