Анализ JUC-AQS(1)

Java задняя часть

AQS — это аббревиатура от AbstractQueuedSynchronizer, что переводится как抽象队列同步器, буквально:

  • Аннотация: Предоставляет способ определения, но не включает конкретную реализацию.
  • Очередь: реализовано с использованием подхода на основе очереди
  • Синхронизация: реализует функцию синхронизации

На самом деле, об анализе AQS в Google есть много аналитических статей, посвященных как анализу исходного кода, так и принципам.Здесь я хотел бы попытаться проанализировать эту концепцию с другой точки зрения.

Дизайн синхронизатора

Перед конкретным анализом мы сначала объясним два метода синхронизации,独占模式а также共享模式:

  • Эксклюзивный режим: ресурс является эксклюзивным и может быть получен только одним потоком за раз.
  • Общий режим: он может быть получен несколькими потоками одновременно, а количество конкретных ресурсов может быть указано параметрами.

Если мы сами реализуем структуру синхронизатора, как мы ее спроектируем? Ниже могут быть более общие дизайнерские решения, о которых мы думаем (эксклюзивный режим):

  1. определить переменнуюint state=0, используйте эту переменную для представления количества приобретаемых ресурсов.
  2. Потоки должны проверять перед получением ресурсовstateЕсли он равен 0, он изменяется на 1, указывая на то, что получение ресурса прошло успешно, в противном случае это указывает на то, что ресурс был занят другими потоками, и поток блокируется в ожидании освобождения ресурса другими потоками.
  3. Чтобы найти те потоки, которые заблокированы в ожидании ресурсов после освобождения ресурсов, мы сохраняем эти потоки в очереди FIFO.
  4. Когда поток, удерживающий ресурс, освобождает ресурс, заблокированный поток может быть пробужден из очереди.Поскольку ресурс был освобожден в это время, пробужденный поток может получить ресурс и выполнить его.

На самом деле вышеизложенное также является схемой проектирования AQS.Здесь мы подробно расскажем, как AQS делает это в соответствии с вышеуказанными пунктами.Давайте сначала представим конкретную реализацию в эксклюзивном режиме.

Член данных AQS

Переменные-члены, определенные в AQS:

  1. stateЭффект такой же, как у переменной в пункте 1.
  2. headа такжеtail, определяет очередь FIFO в пункте 4 выше, начало и конец указывают на начало и конец очереди соответственно, здесь очередь является двусторонней.
  3. определениеNodeузел, каждый элемент в очереди являетсяNodeУзел, который содержит заблокированные потоки и указатели на узлы-предшественники и узлы-преемники.
  static final class Node {
      //标记一个结点(对应的线程)在共享模式下等待
      static final Node SHARED = new Node();
      // 标记一个结点(对应的线程)在独占模式下等待
      static final Node EXCLUSIVE = null; 
      
      // waitStatus的值,表示该结点(对应的线程)已被取消
      static final int CANCELLED = 1; 
      //waitStatus的值,表示后继结点(对应的线程)需要被唤醒
      static final int SIGNAL = -1;
      //waitStatus的值,表示该结点(对应的线程)在等待某一条件
      static final int CONDITION = -2;
      /*waitStatus的值,表示有资源可用,新head结点需要继续唤醒后继结点(共享模式下,多线程并发释放资源,而head唤醒其后继结点后,需要把多出来的资源留给后面的结点;设置新的head结点时,会继续唤醒其后继结点)*/
      static final int PROPAGATE = -3;
      
       // 等待状态,取值范围,-3,-2,-1,0,1
      volatile int waitStatus;
      volatile Node prev; // 前驱结点
      volatile Node next; // 后继结点
      volatile Thread thread; // 结点对应的线程
      Node nextWaiter; // 等待队列里下一个等待条件的结点

      //成员方法忽略,可以参考具体的源码
  }

Описание: члены вышеуказанной структуры данных должны быть поняты, это основа для понимания ее конкретного принципа реализации.

Доступ к ресурсам

  //arg是要获取的资源的个数,在独占模式下始终为1
  public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
      selfInterrupt();
    }

проиллюстрировать:

  1. Сначала попробуйте получить ресурс, позвонитеtryAcquire, глядя на исходный код, можно обнаружить, что эта функция-член выбрасывает исключение напрямую, а конкретного метода реализации нет.В этом причина упомянутой выше абстракции.Для разных методов синхронизации способ получения ресурсов разный, то есть,tryAcquireреализации различаются.
  2. Если получение ресурса не удается, соответствующая информация этого потока вставляется в очередь.Реализация конкретной вставки в очередь выполняетсяaddWaiter(Node.EXCLUSIVE)реализовано, параметрNode.EXCLUSIVEУказывает, что узел, который мы хотим вставить, является эксклюзивным, конкретная реализация:
   private Node addWaiter(Node mode) {
        //生成该线程对应的Node节点
        Node node = new Node(Thread.currentThread(), mode);
        //将Node插入队列中
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    
    //将Node节点插入到线程堵塞的队列中
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

Вышеупомянутые две функции легко понять, то есть вставить новый узел Node в конец очереди, но следует отметить, что, поскольку в AQS будет одновременно несколько потоков, конкурирующих за ресурсы, определенно будет несколько потоков, вставляющих узлы одновременно.Операция, в этом случае потокобезопасность операции обеспечивается методом спина CAS.Следует отметить, что таким образом, несколько потоков вставляются в узел одновременно, и порядок узла не может быть гарантирован.

  1. Настройте очередь. Причина корректировки заключается в том, что некоторые потоки могут быть временно отменены и завершены, так что в очереди будет много недопустимых узлов потока. Поэтому после завершения двухэтапной операции очередь может быть скорректирована. чтобы вновь вставленные узлы можно было ранжировать после действительных узлов.Конкретная реализация:
   final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //线程唤醒后将接着从这里开始执行
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

проиллюстрировать:

  • Для вновь вставленного узла, если его предшествующий узел является головным узлом, он может повторить попытку.tryAcquireЧтобы получить ресурсы, если приобретение прошло успешно, установите этот узел в качестве нового головного узла.Здесь вам нужно понять, почему он сначала не может получить ресурсы, и узел может быть снова успешно получен после того, как узел будет вставлен в очередь. ? Поскольку очередь управляется несколькими потоками, при первом получении ресурса этот ресурс может быть занят другими потоками, поэтому получение не удается, но в процессе вставки ресурс освобождается, а затем происходит получение ресурса. ресурс может быть успешным. , и получение ресурсов всегда выполняется от головного узла очереди к заднему. Если вышеуказанные условия не выполняются, узел будет скорректирован. Конкретная логика:
   private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //如果有效,则设置为SIGNAL,保证在该节点获取到资源后能通知后续的节点
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
  • Из приведенной выше логики видно, что вновь вставленный узел должен быть приспособлен к задней части узла, который не был отменен (в определении предыдущего узла узла будет обнаружено, что если поток отменен, соответствующий узлаwaitStatusбудет установлен наCANCELLED, поэтому действительный узелwaitStatus < 0). После того, как корректировка положения узла будет завершена, поток застрянет в заблокированном состоянии, Конкретная логика выглядит следующим образом:
    private final boolean parkAndCheckInterrupt() {
        //该函数调用了unsafe.park函数堵塞,在前面的系列文章中有对unsafe介绍过
        LockSupport.park(this);
        return Thread.interrupted();
    }

На данный момент введена логика получения ресурсов потоком Давайте представим процесс высвобождения ресурсов.

Релиз ресурса

  public final boolean release(int arg) {
      if (tryRelease(arg)) {
        Node h = head;
        // waitStatus等于0,说明后面没有要释放的线程结点
        if (h != null && h.waitStatus != 0) 
          // 唤醒等待队列里下一个结点对应的线程
          unparkSuccessor(h);
        return true; // 成功
      }
    return false; // 失败
  }
  1. Подобно приобретению ресурсов,tryReleaseКонкретной реализации нет, могут быть разные реализации для разных методов синхронизации, здесь при успешном освобождении ресурса будет разбужен следующий узел в очереди.
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
  1. Если узел node имеет значение null или был отменен, пройдите по очереди от конца к началу, найдите первый допустимый узел и затем вызовитеunparkПросыпайтесь, причина, по которой node здесь становится нулевым, также связана с многопоточной операцией.

Реализация класса AQS на самом деле является реализацией класса без блокировок, потокобезопасной. В случае без блокировок безопасность потоков в основном реализуется CAS и спином, и в коде будет много условных суждений. Это также связано с тем, что в В случае многопоточности каждые данные могут меняться, так что учтите это при чтении кода! !

После того, как поток пробудится, он продолжит предыдущий анализacquireQueuedМесто, где функция спит, выполняет суждение о вращении.Если условие истинно, получение ресурса успешно, и головной узел сбрасывается.В это время операция освобождения ресурса завершена.

Введение в общий режим

Общий режим аналогичен эксклюзивному режиму. Разница в том, что ресурсы не уникальны, поэтому они могут быть получены несколькими потоками. Поскольку разные потоки получают разное количество ресурсов, может быть поток для освобождения ресурсов в следствии -up логика потоков. , но она разбудит несколько ожидающих потоков, таких как следующие:

Имеется 3 ресурса и 4 потока, из которых поток 1 занимает 3 ресурса, поток 2 требует двух ресурсов, а поток 3 и поток 4 требуют по 1 ресурсу, но из-за нехватки ресурсов потоки 2, 3 и 4 заблокированы соответственно. В очереди при освобождении ресурса потока 1 будет три свободных ресурса, поэтому два других потока получат ресурсы последовательно в очереди, то есть один поток освобождает ресурс, а несколько потоков получают ресурс.

Предыдущие аналогичны, а другая логика заключается в следующем:

    private void doAcquireShared(int arg) {
        //生成共享节点并且插入队列
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    //返回值代表剩余的资源数量
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        //设置新的头结点,并且可能会继续唤醒后继的节点
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                //如果条件不满足,则调整节点在队列中的位置并且睡眠
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

Описание: Если узел является узлом-преемником головы, попробуйте получить ресурсы. Это то же самое, что и в эксклюзивном режиме. Разница в том, что количество ресурсов в совместно используемом режиме не является уникальным, после узел получает ресурсы.Пробудите последующие узлы, конкретная логика выглядит следующим образом:

 private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        //propagate表示剩余的资源,如果>0表示会尝试继续唤醒后继的节点
        if (propagate > 0 || h == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

Описание. Когда ресурсы остаются или исходный узел соответствует условиям, будет запущена операция последующего освобождения узла.

    private void doReleaseShared() {
        for (; ; ) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;
                    unparkSuccessor(h);
                //在没有新节点插入的情况下,该节点的默认状态为0,此时将状态设置为PROPAGATE主要是保留了后继节点唤醒的状态,等新节点接入的时候继续唤醒后继节点
                } else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;
            }
            if (h == head)
                break;
        }
    }

Описание: В случае, если поток успешно получает ресурс и есть оставшиеся ресурсы, поток попытается вызвать следующийsetHeadAndPropagateЧтобы освободить другие потоки, необходимо специально объяснить один момент:

В doReleaseShared, если вы хотите разбудить следующий узел, вы должны убедиться, что состояние ожидания головного узла равно Node.SIGNAL.Как это гарантируется? На самом деле никакой гарантии не требуется.Когда узел вызывает setHeadAndPropagate, чтобы стать новым головным узлом, по умолчанию состояние ожидания равно 0. В это время, если новый узел вставляется в очередь, статус головного узла будет изменен на Node.SIGNAL, который удовлетворяет условиям пробуждения. Если доступа к новому узлу нет, для состояния ожидания головного узла будет установлено значение Node.PROPAGATE в doReleaseShared. После этого, если новый узел будет вставлен, корректировка очереди shouldParkAfterFailedAcquire завершится ошибкой, и начнется новый раунд проверки. быть запущенным.В это время его предшествующий узел найден.Он уже является головным узлом, и ресурсы могут быть получены в это время.Это причина введения флага Node.PROPAGATE.Это относительно абстрактно и должно быть хорошо понято ~

На данный момент анализ AQS по получению и освобождению ресурсов завершен.Кроме того, в классе есть также части, связанные с состоянием.Первоначально я планировал написать это в статье, но теперь я нахожу, что статья немного слишком длинное, и его может быть трудно читать, поэтому часть «Условие» помещается во вторую часть анализа.