«Повышайте способности, зарплату можно увеличить» — исчерпывающее подробное объяснение Java concurrent AQS

Java

Добро пожаловать в Наггетс: [Ccww】учимся вместе
Возможность развиваться, ожидается повышение заработной платы
Знание интервью, ожидание работы
Практичная дрель, брак 996
Также приглашаем обратить внимание на публичный аккаунт WeChat [Технический блог Ccww], впервые была запущена оригинальная техническая статья
Если эта статья была вам полезна, понравилась, то ставьте палец вверх!

предисловие

Считаете ли вы, что вам трудно продвигаться по работе?
Вам трудно пройти собеседование при приеме на работу?
Вам кажется, что вы работаете сверхурочно за 996 каждый день?

Вы должны поддерживать способность учиться на работе, чтобы получить лучшее продвижение по службе, а повышение зарплаты уже не за горами. Добро пожаловать учиться вместе【Возможность развиваться, ожидается повышение заработной платы】ряд
При поиске собеседования при приеме на работу вы должны суммировать баллы знаний на собеседовании на основе обучения, а работа не за горами.Добро пожаловать, чтобы учиться вместе【Знание интервью, ожидание работы】ряд
Наконец, должны ли быть полностью подготовлены теоретические знания, должны ли они применяться на практике? Добро пожаловать учиться вместе【Практичная дрель, брак 996】ряд

1. Что такое СКВ? Какая польза?

Полное наименование АСAbstractQueuedSynchronizer, абстрактный синхронизатор очереди, представляет собой основу для создания блокировок и синхронизаторов.

Соберите синхронизатор на основе AQS:

  • ReentrantLock
  • Semaphore
  • CountDownLatch
  • ReentrantReadWriteLock
  • SynchronusQueue
  • FutureTask

Преимущество:

  • AQS учитывает множество деталей, связанных с реализацией синхронизаторов, таких как настраиваемые стандартные состояния синхронизации, очереди синхронизации FIFO.
  • Создание синхронизатора на основе AQS может принести много преимуществ. Это не только значительно сокращает усилия по внедрению, но и избавляет от конфликтов, происходящих в нескольких местах.

2. Основные знания AQS

2.1 Основная идея AQS

   Если запрошенный общий ресурс простаивает, поток, запрашивающий в данный момент ресурс, устанавливается в допустимый рабочий поток, а общий ресурс устанавливается в заблокированное состояние. Если запрошенные разделяемые ресурсы заняты, то необходим механизм блокировки потока в ожидании и выделения блокировки при пробуждении.Этот механизм AQS реализуется блокировками очереди CLH, то есть в очередь добавляются потоки, которые не могут временно получить блокировки. как показано на рисунке:

Очередь синхронизации:Синхронная очередь представляет собой двусторонний список. Включая головной узел и хвостовой узел. Головной узел в основном используется для последующего планирования.

Очередь условий:Необязательный, односторонний список. Этот список существует только тогда, когда в программе есть условие.

2.2 Идеи дизайна AQS

  • AQS использует переменную-член int для представления состояния синхронизации.
  • Используйте Node для реализации очередей FIFO, которые можно использовать для создания блокировок или других устройств синхронизации.
  • Режим совместного использования ресурсов AQS: эксклюзивный эксклюзивный (режим эксклюзивной блокировки) и общий ресурс (режим общей блокировки)

Все его подклассы AQS либо реализуют и используют API своей эксклюзивной функции, либо используют функцию общей блокировки вместо одновременного использования двух наборов API, даже самый известный подкласс ReentrantReadWriteLock проходит через две внутренние блокировки чтения класса. и блокировки записи реализуют два набора API соответственно.

2.3 состояние

Состояние состояния использует переменную типа volatile int для представления текущего состояния синхронизации. Существует три способа доступа к состоянию:

  • getState()
  • setState()
  • compareAndSetState()

2.4 Значение констант Node в AQS

CANCELLED
Когда значение waitStatus равно 1, это означает, что узел потока был освобожден (тайм-аут, прерывание), и отмененный узел больше не будет заблокирован.

SIGNAL
Когда waitStatus равен -1, это означает, что следующий поток потока должен быть заблокирован, то есть, пока передний узел снимает блокировку, будет уведомлен поток последующего узла, помеченный как состояние SIGNAL.

CONDITION
Когда waitStatus равен -2, это означает, что поток заблокирован в очереди условий (используется условие).

PROPAGATE
Когда waitStatus равен -3, это означает, что поток и последующие потоки выполняют безусловное распространение (используется в CountDownLatch).В общем режиме поток в состоянии PROPAGATE находится в состоянии выполнения.

2.5 Почему синхронная очередь называется FIFO?

Потому что только узел, чей предшествующий узел является головным узлом, может быть разбужен первым для получения состояния синхронизации. Когда узел переходит в состояние синхронизации, он очищает свое собственное значение и использует себя в качестве головного узла, чтобы разбудить следующий узел.

2.6 Очередь условий

В дополнение к очереди синхронизации в AQS также есть очередь условий, которая является односторонней очередью. Вызов метода ConditionObject.await() может инкапсулировать текущий поток в качестве узла и добавить его в конец очереди условий, а затем освободить полученное состояние синхронизации (то есть изменить значение состояния синхронизации и разбудить поток). в очереди на синхронизацию).

Очередь условий также является FIFO. Вызовите метод ConditionObject.signal(), чтобы разбудить узел firstWaiter и добавить его в конец очереди синхронизации.

2.7 Реализация пользовательского синхронизатора

При создании пользовательского синхронизатора вам нужно полагаться только на нижний уровень AQS, чтобы реализовать получение и освобождение состояния общего ресурса. При реализации пользовательского синхронизатора в основном реализуются следующие методы:

  • isHeldExclusively(): использует ли поток исключительно ресурсы. Вам нужно реализовать его только в том случае, если вы используете условие.
  • tryAcquire(int): Эксклюзивный режим. Попытаться получить ресурс, вернуть true в случае успеха, false в случае неудачи.
  • tryRelease(int): Эксклюзивный режим. Попытаться освободить ресурс, вернуть true в случае успеха, false в случае неудачи.
  • tryAcquireShared(int): общий метод. Попробуйте получить ресурс. Отрицательное число означает сбой, 0 — успех, но доступных ресурсов нет, положительное число — успех и оставшиеся ресурсы.
  • tryReleaseShared(int): общий метод. Попытайтесь освободить ресурс, верните true, если последующим ожидающим узлам разрешено просыпаться после освобождения, в противном случае — false.

Три детали реализации AQS

Поток сначала пытается получить блокировку, и если это не удается, информация, такая как текущий поток и состояние ожидания, упаковывается в узел node и добавляется в очередь FIFO. Затем он продолжит цикл, пытаясь получить блокировку, при условии, что текущий узел является прямым преемником головы. В случае сбоя он блокирует себя до тех пор, пока не проснется. Когда поток, удерживающий блокировку, освобождает блокировку, он пробуждает последующие потоки в очереди.

3.1 AQS в монопольном режиме

Так называемый эксклюзивный режим, то есть только один поток может получить состояние синхронизации.Когда поток не освободил состояние синхронизации, другие потоки не могут его получить и могут только присоединиться к очереди синхронизации и ждать.

Очевидно, мы можем установить начальное значение состояния равным 0, что означает бездействие. Когда поток получает состояние синхронизации, он использует операцию CAS для увеличения состояния на 1, указывая на то, что он не простаивает, тогда другие потоки могут только ждать. При освобождении состояния синхронизации операция CAS не требуется, поскольку только один поток может получить состояние синхронизации в монопольном режиме. ReentrantLock и CyclicBarrier разработаны на основе этого.

Например, состояние ReentrantLock инициализируется равным 0, что указывает на разблокированное состояние. Когда поток A блокирует(), он вызывает функцию tryAcquire(), чтобы монополизировать блокировку и состояние+1.

AQS в монопольном режиме не отвечает на прерывания, относится к потоку, добавленному в очередь синхронизации, если он будет разбужен из-за прерывания, он не вернется немедленно, и будет сгенерировано InterruptedException. Вместо этого он оценивает, является ли его предшествующий узел снова головным узлом, и решает, следует ли конкурировать за состояние синхронизации. Если его предшествующий узел не является головным узлом или не может конкурировать за состояние синхронизации, он снова приостанавливается.

3.1.1 Получение ресурсов в эксклюзивном режиме - метод получения

приобретать приобретает исключительно ресурсы. Если ресурс получен, поток возвращается напрямую, в противном случае он входит в очередь ожидания, пока ресурс не будет получен.И весь процесс игнорирует влияние прерывания. Исходный код выглядит следующим образом:

 public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

блок-схема:

  • вызов пользовательского синхронизатораtryAcquire()Попробуйте получить ресурс напрямую и вернуться напрямую в случае успеха;
  • неудачно, тоaddWaiter()Добавьте поток в конец очереди ожидания и пометьте его как монопольный режим;
  • acquireQueued()Ставить поток в очередь ожидания, когда у него есть шанс (настала его очередь, он будетunpark()) попытается получить ресурс. Возвращается только после получения ресурса. Возвращает true, если он был прерван во время всего ожидания, иначе false.
  • Если поток прерывается во время ожидания, он не отвечает. Самопрерывание только после получения ресурсаselfInterrupt(), компенсирует перерыв.

3.1.2 Получение ресурсов в монопольном режиме – метод tryAcquire

tryAcquireПопытка приобрести ресурс исключительно и вернуться напрямую, если приобретение прошло успешно.true, иначе вернитесь напрямуюfalse, а конкретная реализация реализуется синхронизатором пользовательского AQS.

 protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

3.1.3 Получение ресурсов в эксклюзивном режиме - метод addWaiter

  По разным режимам (Node.EXCLUSIVEвзаимоисключающий режим,Node.SHAREDОбщий режим) создать узел и добавить узел текущего потока в конец непустой очереди ожидания в режиме CAS (черезcompareAndSetTail()метод). Если очередь пуста, пройтиenq(node)Метод инициализирует очередь ожидания и возвращает текущий узел.

/**
* 参数
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* 返回值
* @return the new node
*/
private Node addWaiter(Node mode) {
    //将当前线程以指定的模式创建节点node
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    // 获取当前同队列的尾节点
    Node pred = tail;
    //队列不为空,将新的node加入等待队列中
    if (pred != null) {
        node.prev = pred;
         //CAS方式将当前节点尾插入队列中
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //当队列为empty或者CAS失败时会调用enq方法处理
    enq(node);
    return node;
}

Среди них очередь пуста, используйтеenq(node)Обработать, вставить текущий узел в очередь ожидания и инициализировать текущую очередь, если очередь пуста. Все операции выполняются в режиме вращения CAS до тех пор, пока хвост очереди не будет успешно добавлен.

 private Node enq(final Node node) {
        //不断自旋
        for (;;) {
            Node t = tail;
            //当前队列为empty
            if (t == null) { // Must initialize
             //完成队列初始化操作,头结点中不放数据,只是作为起始标记,lazy-load,在第一次用的时候new
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                //不断将当前节点使用CAS尾插入队列中直到成功为止
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

3.1.4 Получение ресурсов в монопольном режиме – методAcquireQueued

acquireQueuedИспользуется для потоков, уже находящихся в очереди, для получения состояния в эксклюзивном и непрерывном режиме до возврата после получения блокировки. Основной процесс:

  • После того, как узел node входит в хвост очереди, проверьте статус;

  • Вызовите функцию park(), чтобы войти в состояние ожидания, и подождите, пока unpark() или interrupt() проснутся;

  • Приобретать ли блокировку после пробуждения. Если он получен, head указывает на текущий узел и возвращает, был ли прерван весь процесс от входа в очередь до получения блокировки; если нет, продолжить процесс 1

    final boolean acquireQueued(final Node node, int arg) {
      //是否已获取锁的标志,默认为true 即为尚未
      boolean failed = true;
      try {
          //等待中是否被中断过的标记
          boolean interrupted = false;
          for (;;) {
              //获取前节点
              final Node p = node.predecessor();
              //如果当前节点已经成为头结点,尝试获取锁(tryAcquire)成功,然后返回
              if (p == head && tryAcquire(arg)) {
                  setHead(node);
                  p.next = null; // help GC
                  failed = false;
                  return interrupted;
              }
              //shouldParkAfterFailedAcquire根据对当前节点的前一个节点的状态进行判断,对当前节点做出不同的操作
              //parkAndCheckInterrupt让线程进入等待状态,并检查当前线程是否被可以被中断
              if (shouldParkAfterFailedAcquire(p, node) &&
                  parkAndCheckInterrupt())
                  interrupted = true;
          }
      } finally {
          //将当前节点设置为取消状态;取消状态设置为1
          if (failed)
              cancelAcquire(node);
      }
    }
    

3.1.5 Освобождение ресурсов в эксклюзивном режиме — метод освобождения

Метод освобождения заключается в снятии блокировки общего ресурса потоком в эксклюзивном монопольном режиме. Он вызовет функцию tryRelease(), чтобы освободить ресурсы синхронизации.Если состояние синхронизации полностью освобождено (т. е. состояние = 0), когда состояние синхронизации неактивно, он разбудит другие потоки в очереди ожидания для получения ресурсов. Это точно семантика unlock(), конечно, не ограничиваясь только разблокировкой().

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

3.1.6 Освобождение ресурсов в эксклюзивном режиме — метод tryRelease

tryRelease()иtryAcquire()Такую же реализацию реализует настраиваемый таймер в монопольном режиме. Поскольку это эксклюзивный режим, нет необходимости учитывать вопросы безопасности потоков для высвобождения общих ресурсов, и вы можете напрямую уменьшить соответствующий объем ресурсов (state-=arg). иtryRelease()Возвращаемое значение показывает, завершил ли поток освобождение ресурсов, поэтому в пользовательском синхронизатореtryRelease()При , нужно указать это условие, когда ресурс полностью освобожден (состояние=0), вернуть true, иначе вернуть false.

 protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

Реализация ReentrantReadWriteLock:

protected final boolean tryRelease(int releases) {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        //减掉相应量的资源(state-=arg)
        int nextc = getState() - releases;
        //是否完全释放资源
        boolean free = exclusiveCount(nextc) == 0;
        if (free)
            setExclusiveOwnerThread(null);
        setState(nextc);
        return free;
    }

3.1.7 Освобождение ресурсов в эксклюзивном режиме — unparkSuccessor

unparkSuccessorИспользуйте unpark() для пробуждения передового потока в очереди ожидания, который не сдался. Этот поток не обязательно является следующим узлом текущего узла, но следующим потоком, который можно использовать для пробуждения. Если этот узел существует, вызовите метод unpark() для пробуждения.

 private void unparkSuccessor(Node node) {
    //当前线程所在的结点node
    int ws = node.waitStatus;
    //置零当前线程所在的结点状态,允许失败
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    //找到下一个需要唤醒的结点
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 从后向前找
        for (Node t = tail; t != null && t != node; t = t.prev)
            //从这里可以看出,<=0的结点,都是还有效的结点
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
         //唤醒
        LockSupport.unpark(s.thread);
}

3.2 AQS в общем режиме

общий режим, разумеется, нескольким потокам разрешено одновременно получать состояние синхронизации, а AQS в совместно используемом режиме не отвечает на прерывания.

Очевидно, мы можем установить начальное значение состояния равным N (N > 0), что означает бездействие. Всякий раз, когда поток получает состояние синхронизации, он использует операцию CAS для уменьшения состояния на 1 до тех пор, пока оно не уменьшится до 0, что указывает на то, что он не простаивает, а другие потоки могут только присоединиться к очереди синхронизации и ждать. При освобождении состояния синхронизации требуется операция CAS, поскольку в совместно используемом режиме несколько потоков могут получить состояние синхронизации. CountDownLatch, Semaphore основан на этом дизайне.

Например, в CountDownLatch задача делится на N подпотоков для выполнения, и состояние состояния синхронизации также инициализируется равным N (обратите внимание, что N должно соответствовать количеству потоков):  

3.2.1 Получение ресурсов в совместно используемом режиме — метод AcquireShared

acquireSharedВ совместно используемом режиме поток получает запись верхнего уровня для общих ресурсов. Он получит указанное количество ресурсов и вернется напрямую, если приобретение завершится успешно.Если получение не удалось, он войдет в очередь ожидания до тех пор, пока ресурсы не будут получены, и весь процесс будет игнорировать прерывание.

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

Процесс:

  • Сначала попытайтесь получить ресурсы с помощью tryAcquireShared() и вернитесь напрямую в случае успеха;
  • В случае сбоя он входит в очередь ожидания с помощью функции park() в doAcquireShared() и возвращается до тех пор, пока ресурс не будет успешно получен с помощью функции unpark()/interrupt() (весь процесс ожидания также игнорирует ответ на прерывание).

3.2.2 Получение ресурсов в общем режиме — метод tryAcquireShared

tryAcquireShared()Та же реализация, что и метод получения ресурсов в эксклюзивном режиме, реализуется настраиваемым синхронизатором. Но это определено в спецификации AQStryAcquireShared()Возвращаемое значение:

  • Отрицательные значения представляют собой сбой сбора данных;
  • 0 означает, что получение прошло успешно, но ресурсов не осталось;
  • Положительное число указывает на то, что получение прошло успешно, и есть оставшиеся ресурсы, которые могут получить другие потоки.
 protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

3.2.3 Получение ресурсов в режиме общего доступа — метод doAcquireShared

doAcquireShared()Он используется для добавления текущего потока в хвост очереди ожидания для отдыха до тех пор, пока другие потоки не высвободят ресурсы для пробуждения и возврата после того, как они успешно получат соответствующее количество ресурсов.

private void doAcquireShared(int arg) {
    //加入队列尾部
    final Node node = addWaiter(Node.SHARED);
    //是否成功标志
    boolean failed = true;
    try {
        //等待过程中是否被中断过的标志
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();//获取前驱节点
            if (p == head) {//如果到head的下一个,因为head是拿到资源的线程,此时node被唤醒,很可能是head用完资源来唤醒自己的
                int r = tryAcquireShared(arg);//尝试获取资源
                if (r >= 0) {//成功
                    setHeadAndPropagate(node, r);//将head指向自己,还有剩余资源可以再唤醒之后的线程
                    p.next = null; // help GC
                    if (interrupted)//如果等待过程中被打断过,此时将中断补上。
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            
            //判断状态,队列寻找一个适合位置,进入waiting状态,等着被unpark()或interrupt()
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }   
}

3.2.4 Освобождение ресурсов в общем режиме — метод releaseShared

releaseShared()Он используется для потоков в общем режиме для освобождения общих ресурсов и освобождения определенного количества ресурсов.Если освобождение прошло успешно и ожидающий поток может быть разбужен, он разбудит другие потоки в очереди ожидания для получения ресурсов.

public final boolean releaseShared(int arg) {
    //尝试释放资源
    if (tryReleaseShared(arg)) {
        //唤醒后继结点
        doReleaseShared();
        return true;
    }
    return false;
}

tryRelease() в эксклюзивном режиме вернет значение true, чтобы разбудить другие потоки после того, как ресурс будет полностью освобожден (состояние = 0), что в основном основано на учете повторного входа в эксклюзивном режиме; в то время как releaseShared() в общем режиме этого не делает. Требование, суть общего режима заключается в управлении определенным количеством потоков для одновременного выполнения, чтобы поток, владеющий ресурсом, мог разбудить последующие ожидающие узлы, когда часть ресурса освобождается.
https://www.cnblogs.com/waterystone/p/4920797.html

3.2. Освобождение ресурсов в режиме общего доступа — метод doReleaseShared.

doReleaseShared()Он в основном используется для пробуждения потока узла-преемника.Когда состояние является положительным числом, он получает оставшиеся общие ресурсы, когда состояние=0, он получает общие ресурсы.

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                    //唤醒后继
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        // head发生变化
        if (h == head)
            break;
    }
}

Вы в порядке, офицеры? Если вам это нравится, проведите пальцем, чтобы нажать 💗, нажмите, чтобы подписаться! ! Спасибо за Вашу поддержку!

Добро пожаловать в публичный аккаунт【Технический блог Ccww], впервые была запущена оригинальная техническая статья