AbstractqueueueDsynchronizer супер подробный

Java
AbstractqueueueDsynchronizer супер подробный

Сегодня мы будем учиться и учитьсяAbstractQueuedSynchronizerсвязанные принципы классов,java.util.concurrentМногие классы в пакете зависят от синхронизатора очередей, предоставляемого этим классом, например, обычно используемыйReentranLock,SemaphoreиCountDownLatchЖдать.

  Для облегчения понимания мы используем абзацReentranLockПример кода, объяснитеReentranLockв каждом методеAQSиспользование.

Пример ReentranLock

мы все знаемReentranLockповедение блокировки иSynchronizedПохоже, все они являются реентерабельными блокировками, но реализация этих двух действительно совершенно различна, мы объясним это позже.Synchronizedпринцип.Кроме того, синхронизированную блокировку нельзя прервать, в то время как ReentrantLock обеспечивает прерываемую блокировку.. Код нижеReentranLockфункции, мы объясним принципы реализации этих функций в этом порядке.

ReentrantLock lock = new ReentrantLock();
lock.lock();
lock.unlock();

Честные и нечестные замки

ReentranLockРазделенные на справедливые блокировки и нечестные блокировки, разница между ними заключается в том, связана ли возможность получения блокировки с порядком постановки в очередь. Все мы знаем, что если блокировка удерживается другим потоком, другие потоки, подающие заявку на блокировку, будут приостановлены в ожидании и присоединятся к очереди ожидания. По идее сначала позвонитеlockФункция приостановлена, ожидающие потоки должны быть в начале очереди, если они вошли в очередь после вызова. Если в это время блокировка снимается, необходимо уведомить ожидающий поток, чтобы попытаться снова получить блокировку, блокировка сделает справедливым первого, кто войдет в очередь, чтобы получить блокировку потока. Вместо того, чтобы справедливые блокировки разбудили все потоки, пусть они попытаются снова получить блокировку, это может привести к тому, что следующий поток получит блокировку, тогда это несправедливо.

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

Мы найдемFairSyncиNonfairSyncунаследовалиSyncкласс, покаSyncРодительский классAbstractQueuedSynchronizer(именуемый в дальнейшемAQS). ноAQSКонструктор пуст и не работает.

 Последующий анализ исходного кода, если не указано иное, относится к честным блокировкам.

операция блокировки

ReentranLockизlockФункция выглядит следующим образом, напрямую вызываетсяsyncизlockфункция. то есть звонитьFairSyncизlockфункция.

    //ReentranLock
    public void lock() {
        sync.lock();
    }
    //FairSync
    final void lock() {
        //调用了AQS的acquire函数,这是关键函数之一
        acquire(1);
    }

 Мы официально начнем следующийAQSСоответствующий исходный код анализируется,acquireФункция функции состоит в том, чтобы получить количество, которое может быть получено только одним потоком за тот же период времени.Эта сумма является абстрактным понятием блокировки. Давайте сначала проанализируем код, и постепенно вы поймете, что он означает.

public final void acquire(int arg) {
	// tryAcquire先尝试获取"锁",获取了就不进入后续流程
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        //addWaiter是给当前线程创建一个节点,并将其加入等待队列
        //acquireQueued是当线程已经加入等待队列之后继续尝试获取锁.
        selfInterrupt();
}

tryAcquire,addWaiterиacquireQueuedВсе они очень важные функции.Давайте узнаем об этих функциях по очереди и поймем их функции.

//AQS类中的变量.
private volatile int state;
//这是FairSync的实现,AQS中未实现,子类按照自己的需要实现该函数
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    //获取AQS中的state变量,代表抽象概念的锁.
    int c = getState();
    if (c == 0) { //值为0,那么当前独占性变量还未被线程占有
        //如果当前阻塞队列上没有先来的线程在等待,UnfairSync这里的实现就不一致
        if (!hasQueuedPredecessors() && 
            compareAndSetState(0, acquires)) {
            //成功cas,那么代表当前线程获得该变量的所有权,也就是说成功获得锁
            setExclusiveOwnerThread(current);
            // setExclusiveOwnerThread将本线程设置为独占性变量所有者线程
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        //如果该线程已经获取了独占性变量的所有权,那么根据重入性
        //原理,将state值进行加1,表示多次lock
        //由于已经获得锁,该段代码只会被一个线程同时执行,所以不需要
        //进行任何并行处理
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    //上述情况都不符合,说明获取锁失败
    return false;
}

  Из приведенного выше кода мы можем найти, чтоtryAcquireэто попытаться получить переменную, эксклюзивную для этого потокаstate. Значение state указывает на его состояние: если оно равно 0, то ни один поток в настоящее время не имеет монопольного использования этой переменной; если это не так, это означает, что поток уже имеет монопольное использование этой переменной, а это означает, что поток уже получил замок. Но в это время необходимо сделать еще одно суждение, чтобы увидеть, получена ли блокировка самим текущим потоком, и если да, то увеличить значение состояния.

ReentranLock获得锁

  Здесь необходимо пояснить несколько моментов, в первую очередьcompareAndSetStateфункция, которая задается с помощью операции CASstateзначение, и значение состояния устанавливаетсяvolatileМодификатор через эти две точки, чтобы гарантировать, что изменение значения состояния не вызовет проблем с многопоточностью. Тогда есть разница между справедливыми замками и несправедливыми замками.UnfairSyncизnonfairTryAcquireфункция не вызывается в том же местеhasQueuedPredecessorsЧтобы определить, есть ли в данный момент поток в очереди на получение блокировки.

еслиtryAcquireвозвращениеtrue, то блокировка получена успешно; если она возвращает false, то блокировка не была получена и ее необходимо добавить в очередь ожидания блокировки. Давайте посмотрим нижеaddWaiterсопутствующие операции.

Блокирующая очередь, ожидающая блокировки

  Связанная функция добавления узла, который сохраняет информацию о текущем потоке в очередь ожидания, включает связанный алгоритм очереди без блокировки.AQSДобавляйте узлы только в хвост очереди, а используемый безблокировочный алгоритм относительно прост. Алгоритм настоящей lock-free очереди подождет, пока мы его проанализируемConcurrentSkippedListMapпока объясняю.

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    //先使用快速入列法来尝试一下,如果失败,则进行更加完备的入列算法.
    //只有在必要的情况下才会使用更加复杂耗时的算法,也就是乐观的态度
    Node pred = tail; //列尾指针
    if (pred != null) {
        node.prev = pred; //步骤1:该节点的前趋指针指向tail
        if (compareAndSetTail(pred, node)){ //步骤二:cas将尾指针指向该节点
            pred.next = node;//步骤三:如果成果,让旧列尾节点的next指针指向该节点.
            return node;
        }
    }
    //cas失败,或在pred == null时调用enq
    enq(node);
    return node;
}
private Node enq(final Node node) {
    for (;;) { //cas无锁算法的标准for循环,不停的尝试
        Node t = tail;
        if (t == null) { //初始化
            if (compareAndSetHead(new Node())) 
              //需要注意的是head是一个哨兵的作用,并不代表某个要获取锁的线程节点
                tail = head;
        } else {
            //和addWaiter中一致,不过有了外侧的无限循环,不停的尝试,自旋锁
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

  ПозвонивaddWaiterфункция,AQSТекущий поток добавлен в очередь ожидания, но выполнение текущего потока не заблокировано, далее проанализируем его.acquireQueuedфункция.

Ожидание работы узла очереди

 Поскольку операция, входящая в состояние блокировки, снизит эффективность выполнения, поэтомуAQSПредпринимаются усилия, чтобы избежать блокировки потоков, пытающихся получить эксклюзивные переменные. Итак, когда поток присоединяется к очереди ожидания,acquireQueuedБудет выполнен цикл for, и каждый раз будет оцениваться, должен ли текущий узел получить эту переменную (в начало очереди). Если он не должен быть извлечен или терпит неудачу при следующей попытке извлечения, вызовитеshouldParkAfterFailedAcquireЭто решает, должен ли он войти в состояние блокировки. Если узел перед текущим узлом вошел в состояние блокировки, то можно определить, что текущий узел не может получить блокировку, чтобы предотвратить выполнение ЦП, цикл FOR, потребление ресурсов ЦП, вызовparkAndCheckInterruptфункция входа в состояние блокировки.

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) { //一直执行,直到获取锁,返回.
            final Node p = node.predecessor(); 
            //node的前驱是head,就说明,node是将要获取锁的下一个节点.
            if (p == head && tryAcquire(arg)) { //所以再次尝试获取独占性变量
                setHead(node); //如果成果,那么就将自己设置为head
                p.next = null; // help GC
                failed = false;
                return interrupted;
                //此时,还没有进入阻塞状态,所以直接返回false,表示不需要中断调用selfInterrupt函数
            }
            //判断是否要进入阻塞状态.如果`shouldParkAfterFailedAcquire`
            //返回true,表示需要进入阻塞
            //调用parkAndCheckInterrupt;否则表示还可以再次尝试获取锁,继续进行for循环
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                //调用parkAndCheckInterrupt进行阻塞,然后返回是否为中断状态
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL) //前一个节点在等待独占性变量释放的通知,所以,当前节点可以阻塞
        return true;
    if (ws > 0) { //前一个节点处于取消获取独占性变量的状态,所以,可以跳过去
        //返回false
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //将上一个节点的状态设置为signal,返回false,
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); //将AQS对象自己传入
    return Thread.interrupted();
}

блокировка и прерывание

 Из приведенного выше анализа мы знаемAQSпозвонивLockSupportизparkметод для выполнения операции, которая блокирует текущий процесс. Фактически блокировка здесь означает, что поток больше не выполняется, вызывая эту функцию, поток переходит в состояние блокировки.lockЗатем операция блокируется, ожидая прерывания или освобождения монопольной переменной.

public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);//设置阻塞对象,用来记录线程被谁阻塞的,用于线程监控和分析工具来定位
    UNSAFE.park(false, 0L);//让当前线程不再被线程调度,就是当前线程不再执行.
    setBlocker(t, null);
}

  Актуальные знания о прерывании, мы поговорим об этом позже и продолжим следитьAQSДавайте посмотрим на операции, связанные с освобождением эксклюзивных переменных.

ReentrantLock未获得阻塞,加入队列

операция разблокировки

иlockОперация аналогичная,unlockдействие называетсяAQSизrelaseМетоды, параметры и вызовacquireвремя то же самое, это 1.

public final boolean release(int arg) {
    if (tryRelease(arg)) { 
    //释放独占性变量,起始就是将status的值减1,因为acquire时是加1
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);//唤醒head的后继节点
        return true;
    }
    return false;
}

  Как видно из приведенного выше кода, релиз вызывается первымtryReleaseчтобы освободить эксклюзивную переменную. В случае успеха посмотреть, есть ли заблокированные потоки, ожидающие блокировки, и если да, то вызватьunparkSuccessorразбудить их.

protected final boolean tryRelease(int releases) {
    //由于只有一个线程可以获得独占先变量,所以,所有操作不需要考虑多线程
    int c = getState() - releases; 
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) { //如果等于0,那么说明锁应该被释放了,否则表示当前线程有多次lock操作.
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

Мы видим, чтоtryReleaseЛогика, заложенная в концепции реентерабельной блокировки, только доstateКогда значение 0 равно 0, это означает, что замок действительно выделяется. Так что эксклюзивная переменнаяstateЗначение представляет наличие или отсутствие блокировки. когдаstate=0Когда это означает, что блокировка не занята, если нет, то это означает, что текущая блокировка уже занята.

private void unparkSuccessor(Node node) {
    .....
     //一般来说,需要唤醒的线程就是head的下一个节点,但是如果它获取锁的操作被取消,或在节点为null时
     //就直接继续往后遍历,找到第一个未取消的后继节点.
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

называетсяunparkметод, делайlockПоток, операция которого заблокирована, восстанавливается в рабочее состояние и будет выполняться снова.acquireQueuedоперации в бесконечном цикле for, попробуйте еще раз получить блокировку.

ReentrantLock释放锁并通知阻塞线程恢复执行

постскриптум

БеспокойствоAQSиReentrantLockАнализ почти закончен. Должен сказать, я был шокирован, когда впервые увидел реализацию AQS, я подумалSynchronizedиReentrantLockПринцип реализации тот же, все полагаются на функцию виртуальной машины Java для достижения. я не думал, что естьAQSТакой большой босс за этим помогает. После изучения принципа этого класса наш анализ многих классов JUC стал намного проще. также,AQSВовлеченныйCASАлгоритм работы и разблокированных очередей также обеспечивает основу для изучения других алгоритмов поиска.Океан знаний безграничен!