В-пятых, поговорим о параллелизме — поговорим об эксклюзивной блокировке AQS.

Java

предисловие

Полное название AQS — (Abstract Queued Synchronizer), что можно перевести как абстрактный синхронизатор очередей только из названия. Это базовая структура для создания параллельных классов инструментов в пакете JUC (java.util.concurrent). к обеспечению прерываемых блокировок (ожидание прерываний), блокировок с тайм-аутом, эксклюзивных блокировок, разделяемых блокировок и т. д., а также расширять эти базовые функции для получения некоторых других классов инструментов, которые в основном могут удовлетворить различные требования к блокировкам в наших практических приложениях.

До того, как я прочитал исходный код AQS, я чувствовал, что его реализация была такой же, как и принцип синхронизации, казалось, что параллельный контроль доступа достигается за счет объектных блокировок, но на самом деле это был просто обычный класс инструментов, как и наш обычный процесс разработки. Как и класс utils, написанный в , реализация AQS не использует расширенные машинные инструкции и правила модели памяти, такие как ключевое слово synchronized, не использует расширенные машинные инструкции и не использует специальную обработку во время компиляции JDK, а только общий класс. реализовать контроль параллелизма. Это то, что делает меня очень заинтересованным в изучении и глубоком изучении его дизайнерских идей и принципов реализации.

Почему мы должны изучать внедрение AQS? Поскольку классы инструментов в пакетах synchronized и JUC часто используются в параллельном программировании, большинство классов инструментов в пакете JUC являются расширенными реализациями, основанными на AQS.Если вы хотите освоить и понять принципы реализации классов инструментов в пакете Пакет JUC. Важно понимать реализацию AQS.

Зачем вам нужно разрабатывать AQS с Synchronized?

Поскольку JVM предоставила такие ключевые слова, как synchronized и volatile, она может решить три проблемы параллелизма и проблему порядка выполнения потоков, так зачем же создавать AQS-фреймворк и повторять смысл изобретения велосипеда?Где?

Появление фреймворка или технологии должно решить некоторые проблемы, могут ли функция и производительность стать причиной изобретения велосипеда? Появление структуры синхронизации AQS должно решить сценарии использования, которые синхронизированы не могут удовлетворить.Давайте посмотрим на функции, предоставляемые AQS.

  1. ждать прерывания. Синхронизация не может быть прервана, что означает, что синхронизированное ожидание не может быть прервано.После того, как оно войдет в состояние блокировки, его нельзя прервать. InterruptedException может быть выброшен только вызванным методом, затем его можно прервать, метод, который не выдает InterruptedException, не может быть прерван.
  2. тайм-аут блокировки. AQS поддерживает блокировку по тайм-ауту. Можно указать время. Если блокировка не будет получена в течение указанного времени, она сразу прекратит получение блокировки.
  3. неблокирующий. Если попытка получить блокировку не удалась, он не переходит в состояние блокировки, а возвращается напрямую, и у потока также есть возможность снять блокировку, которую он когда-то удерживал.

Вышеупомянутые функции - это все функции, которых нет у ключевого слова synchronized.Помимо выполнения всех функций synchronized, AQS основан на реализации расширенной блокировки чтения-записи (ReadWriteLock), семафора (Semaphore), ограждения (CyclicBarrier). ) и другие дополнительные блокировки функций, значительно улучшающие сценарии использования и гибкость. Тогда давайте взглянем на подробную реализацию AQS.

Дизайн-мышление AQS

Когда мы входим в исходный код AQS, мы видим, что AQS является абстрактным классом, но мы обнаружили, что в AQS нет абстрактного метода. Это связано с тем, что AQS предназначен для поддержки нескольких целей. Это базовая структура для многих инструментов. Чтобы использовать, если есть абстрактные методы, подклассы должны переписать все абстрактные методы при наследовании, что, очевидно, не соответствует первоначальному дизайну AQS, поэтому инфраструктура AQS принимает шаблон проектирования методов шаблона, и AQS будет использовать некоторые подклассы. которые требуют подклассов. Все методы, переопределяемые классом, разработаны как методы защиты, которые реализованы по умолчанию для создания исключений UnsupportedOperationException. Если подклассы должны использовать этот метод, переопределите этот метод.

Базовая конструкция AQS не особенно сложна.Нижний уровень использует флаг состояния (переменная состояния) + очередь FIFO для записи серии операций блокировки, таких как получение блокировок, освобождение блокировок и конкурирующие блокировки; для AQS переменная состояния в Это можно рассматривать как блокировку.Очередь использует двусвязный список в порядке поступления.Общая переменная состояния состояния представляет состояние блокировки.Внутренне CAS используется для выполнения атомарной модификации операции в состоянии для завершения изменение состояния блокировки (удержание и освобождение блокировки).

队列结构.png

Когда поток запрашивает удержание блокировки, он определяет, удерживается ли блокировка другими потоками, оценивая текущее состояние состояния.Если поток не занят, пусть запрашивающий поток удерживает блокировку; если блокировка занята, запрашивающий поток перейдет в состояние блокировки, инкапсулирует его в узлы Node, а затем свяжет их для формирования двусвязного списка. Когда поток, удерживающий блокировку, завершит операцию, он освободит ресурс блокировки, а затем разбудит узел в очереди (разумеется, это практика честной блокировки, о ней мы поговорим ниже), поэтому блокировка и пробуждение потока осуществляется через очередь. Затем давайте взглянем на реализацию AQS с помощью специального кода.

Базовая реализация AQS

Основные элементы AQS

состояние

    private volatile int state;

    protected final int getState() {
        return state;
    }

    protected final void setState(int newState) {
        state = newState;
    }

    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

Состояние состояния здесь относительно простое.В нем используется volatile модификация для обеспечения видимости переменных состояния.Метод setState(int newState) используется только для инициализации состояния, а compareAndSetState(int expect, int update) используется для обновления состояния во время работы Изменение переменных.

Почему существует отдельный метод compareAndSetState для изменения переменной состояния? Поскольку назначение общих переменных не является атомарной операцией, требующей дополнительной синхронизации блокировки, мы можем подумать об использовании synchronized для обеспечения атомарности, но synchronizedh заблокирует поток, вызовет переключение контекста потока и повлияет на его производительность. Здесь используется безблокировочная операция CAS, но у CAS есть и недостатки: он будет выполнять операцию вращения, что также приведет к трате ресурсов процессора.

compareAndSet.gif
compareAndSet.gif

Синхронизированная очередь (FIFO)

AQS перенесет потоки, которые не конкурировали за блокировки, в узлы Node и добавит их в очередь Давайте посмотрим на структуру Node.

static final class Node {
   //标记节点是共享模式
    static final Node SHARED = new Node();
    //标记节点是独占的
    static final Node EXCLUSIVE = null;

   //代表此节点的线程取消了争抢资源
    static final int CANCELLED =  1;
  
   //表示当前node的后继节点对应的线程需要被唤醒
    static final int SIGNAL    = -1;
  
  //这两个状态和condition有关系,这里先不说condition
    static final int CONDITION = -2;
   
    static final int PROPAGATE = -3;

   // 取值为上面的1、-1、-2、-3 或者 0
    volatile int waitStatus;

    volatile Node prev;

    volatile Node next;
   //等待线程
    volatile Thread thread;

    Node nextWaiter;

    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {   
    }
 
   //线程入队。
    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

  //使用condition用到
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

Очередь синхронизации — это ядро ​​AQS, которое используется для реализации операций блокировки и пробуждения потока. Состояние ожидания представляет собой состояние ожидания блокировки потока, представленного текущим узлом. В режиме эксклюзивной блокировки нам нужно только заплатить обратите внимание на два состояния CANCELED и SIGNAL. Свойство nextWaiter, которое всегда имеет значение null в режиме монопольной блокировки, служит только в качестве маркера. Изображение ниже основано на эксклюзивных блокировках.

CHL.jpg
CHL.jpg

Эксклюзивные замки и общие замки

AQS определяет два метода совместного использования ресурсов:

  • Exclusive (эксклюзивный режим): только один поток может получить доступ к общему ресурсу. такие как ReentrantLock
  • Общий доступ (общий режим): несколько потоков могут одновременно обращаться к общим ресурсам, таким как Semaphore/CountDownLatch).

Дизайн AQS основан на режиме метода шаблона, и были реализованы такие операции, как обслуживание очереди и постановка узла в очередь и удаление из очереди или сбой получения ресурсов. Фактическая логика получения ресурсов реализуется подклассами. И он предоставляет два метода доступа к ресурсам: Exclusive (эксклюзивный режим) и Share (общий режим); какой режим доступа к ресурсам необходимо реализовать, подклассам нужно только переопределить метод резервирования AQS и использовать атомарные операции, предоставляемые этим методом, для изменить переменную состояния, чтобы реализовать соответствующую логику синхронизации. В обычных условиях подклассы должны реализовать только один из режимов в соответствии со своими потребностями.Конечно, существуют также классы синхронизации, которые реализуют оба режима одновременно, например ReadWriteLock.

При реализации пользовательского синхронизатора в основном реализуются следующие методы:

  • tryAcquire(int): Эксклюзивный режим. Попытаться получить ресурс, вернуть true в случае успеха, false в случае неудачи.
  • tryRelease(int): Эксклюзивный режим. Попытаться освободить ресурс, вернуть true в случае успеха, false в случае неудачи.
  • tryAcquireShared(int): общий метод. Попробуйте получить ресурс. Отрицательное число означает сбой, 0 — успех, но доступных ресурсов нет, положительное число — успех и оставшиеся ресурсы.
  • tryReleaseShared(int): общий метод. Попытайтесь освободить ресурс, верните true, если последующим ожидающим узлам разрешено просыпаться после освобождения, в противном случае — false.

В эксклюзивном режиме эффект синхронизированной реализации тот же, и только один поток может получить к нему доступ в каждый момент времени. состояние, равное 0, означает, что ни один поток не удерживает блокировку, а значение больше 0 означает, что существует поток, удерживающий текущую блокировку. Это значение может быть больше 1, так как блокировка может быть повторно введена, и к каждому повторному входу добавляется 1, а также требуются соответствующие множественные выпуски.

В совместно используемом режиме значение состояния показывает, сколько имеется лицензий, но его применение в каждом конкретном классе инструментов несколько отличается. Используйте анимацию ниже, чтобы понять, как использовать общие блокировки.

Apr-15-2020 07-22-21.gif
Apr-15-2020 07-22-21.gif
CountDownLatch.gif
CountDownLatch.gif

Эксклюзив: пример ReenTranLock

Ранее мы говорили, что логика получения блокировок AQS реализуется подклассами, поэтому давайте посмотрим, как подклассы реализуются с помощью конкретных кодов.Возьмите ReentranLock в качестве примера, чтобы увидеть детали реализации.

ReentrantLock имеет две реализации: справедливая блокировка и нечестная блокировка.Реализация по умолчанию — несправедливая блокировка, что отражено в его конструкторе.Далее давайте проанализируем ReentranLock с эксклюзивной блокировкой.Давайте посмотрим на структуру ReentranLock.

public class ReentrantLock implements Lock, java.io.Serializable {
  
    private final Sync sync;
    
    //ReentranLock的内部类,
    abstract static class Sync extends AbstractQueuedSynchronizer {
        ...
    }
    
    // 非公平锁
    static final class NonfairSync extends Sync{...}
    
    //公平锁
    static final class FairSync extends Sync {...}
    
    //构造函数
    public ReentrantLock() {
      sync = new NonfairSync();
    }

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    
    // 获取锁
    public void lock() {
        sync.lock();
    }
    
    // 释放锁
    public void unlock() {
        sync.release(1);
    }
    ...
}

Видно, что и FairSync, и NonfairSync наследуются от Sync, а Sync наследуется от AQS. Логика получения блокировки ReentrantLock заключается в прямом вызове логики FairSync или NonfairSync.Давайте возьмем FairSync в качестве примера, чтобы увидеть конкретную реализацию.

FairSync (честная блокировка)

 static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;
        //抢锁
        final void lock() {
        //这里直接调用的AQS的acquire()方法
            acquire(1);
        }
    
     //====此方法来自AQS,为了方便阅读,贴过来了====
     /**
     通过代码我们能看到,如果tryAcquire(arg)这个方法返回true,直接就退出了,后续也就不会进行了。
     所以我们可以推断出来,大部分情况下,应该返回的是false。我们一个方法一个方法来看。
     */
      public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
          acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
            selfInterrupt();
          }
      }
    //====================================== 
      
      protected final boolean tryAcquire(int acquires) {
         ...
      }
}

tryAcquire

tryAcquire определяет, занята ли текущая блокировка:

  1. Если замок не взят, попытайтесь получить замок честным путем.
  2. Если блокировка уже занята, проверьте, является ли блокировка повторной.

Получите замок и успешно вернитесьtrue, возврат в случае неудачиfalse

   /**
   if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
        selfInterrupt();
    } 
    尝试获取锁,返回boolean,是否获取锁成功。
    true:1.代表没有线程在等待锁。2.本身就持有锁,但是是重入锁。
    */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        //如果等于0,那么代表没有线程持有锁。
        if (c == 0) {
           /**
           到这里说明没有线程抢锁,再去判断队列中是否有线程在等待获取锁。
           因为是公平锁,总是先来后到的
           如果队列中没有线程等待获取锁,那就尝试去获取锁。
           如果获取成功了,那就把当前占用锁的线程,更新为当前线程。
           */
           if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
           /**
           hasQueuedPredecessors方法,主要来判断队列是否为空,
           判断头结点的后节点是不是当前节点。
            public final boolean hasQueuedPredecessors() {
             Node t = tail;
             Node h = head;
             Node s;
             return h != t &&
                    ((s = h.next) == null || s.thread != Thread.currentThread());
            }
           */
             // 将当前线程设置为占用锁的线程
              setExclusiveOwnerThread(current);
    
              return true;
           }
        }else if (current == getExclusiveOwnerThread()) {
         /**
         如果到这里,说明当前占用锁的线程就是本身。就是我们所说的重入锁。
         */
            int nextc = c + acquires;
            
            if (nextc < 0){
              throw new Error("Maximum lock count exceeded");
            }
            
            setState(nextc);
            
            return true;
        }
     //如果到这里都没有返回true,说明没有获取到锁。
      return false;
    }


addWaiter

Если метод tryAcquire() возвращает false, указывая на то, что блокировка не удалась, продолжайте выполнять метод AcquireQueued(addWaiter(Node.EXCLUSIVE), arg), Этот шаг в основном предназначен для добавления в очередь потоков, которые не захватили блокировку.Давайте сначала рассмотрим метод addWaiter().

  /**
  if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
        selfInterrupt();
    } 
  */  
private Node addWaiter(Node mode) {
   /** Node构造方法
        Node(Thread thread, Node mode) {     
            this.nextWaiter = mode;
            this.thread = thread;
        }
        Node(Thread thread, int waitStatus) { 
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
   */
        Node node = new Node(Thread.currentThread(), mode);
      
        Node pred = tail;
       // tail !=null 说明队列不为空。当队列为空时tail = head 是为null的,
        if (pred != null) {
          //将新节点的前驱节点指向旧的尾结点。
           node.prev = pred;
           //将新的节点变成尾结点。
           if (compareAndSetTail(pred, node)) {
                //如果设置成功,那就将旧的尾结点的后继节点,指向新的节点。直接返回Node
                pred.next = node;
                return node;
            }
        }
        //如果执行到这里说明有两种情况 :1.队列为空。2.CAS失败(有线程在竞争入队)
        //这时会执行enq()方法
        enq(node);
        return node;
    }
 

Этот метод в основном оборачивает ожидающий поток в узел Node. Видно, что для каждого узла в режиме эксклюзивной блокировки nextWaiter должен быть нулевым. Этот метод сначала определит, пуста ли очередь, если нет, попытается добавить узел Node в конец очереди. Если поставить в очередь не удается или очередь пуста, выполняется метод enq.

Если выполняется метод enq(), есть две возможности:

  1. Очередь ожидания теперь пуста и не инициализирована.
  2. Другие потоки первыми завершают постановку в очередь в процессе постановки в очередь текущего потока, что приводит к изменению значения хвостового узла и сбою операции CAS.

enq

В этом методе используется бесконечный цикл, то есть узел вставляется в очередь в режиме вращения, и в случае неудачи он будет продолжать попытки до тех пор, пока не добьется успеха.Кроме того, этот метод также отвечает за инициализацию очереди при очередь пуста, что также означает, что очередь лениво инициализируется:

   /*我们再看一下enq()这个方法的代码。
   这个方法采用了自旋式入队列的方式。
   如果没有抢到锁,那就一直循环,直到入队。
   */
     private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
          if (t == null) { 
           /**如果tail==null 说明队列为空,我们在刚开始的时候会发现,head和tail都为null,
            是没有进行初始化的。这里还是使用的cas设置头结点,跟设置尾结点一样。
            */
              if (compareAndSetHead(new Node())){
               /**
              这里设置了头节点,但是尾结点还是为null,
              将尾结点也设置一下,注意,此时还没有return,继续循环。
               */
                  tail = head;
              }
         
           }else {
           //这个其实和addWaiter()方法是类似的,都是将线程添加到队尾。
           //只不过是如果不成功一直循环,直到成功为止。
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

Здесь мы видим, что когда очередь пуста, очередь инициализации не использует входящий узел Node, а создает новый узел Node. После инициализации он не возвращается, а сразу входит в следующий цикл, в это время очередь уже не пуста, а входящий узел Node добавляется в конец очереди. Это также объясняет, почему головной узел пуст, когда мы впервые говорим об очереди FIFO.

Здесь мы видим, что метод enq() имеет возвращаемое значение, которое возвращает узел-предшественник узла узла, но его возвращаемое значение здесь не используется, но его возвращаемое значение используется в другом месте.

acquireQueued

Было объяснено, что код может быть здесь После addWaiter(Node.EXCLUSIVE) узел добавляется в очередь в это время.

Примечание. Если функцияacquireQueued(addWaiter(Node.EXCLUSIVE), arg)) возвращает значение true, это означает, что приведенный выше код войдет в функцию selfInterrupt(), поэтому при нормальных обстоятельствах следующее должно возвращать значение false.

 /**
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
        selfInterrupt();
    }
 */
  final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //当前节点的前驱节点。addWaiter方法返回的是经过封装的Node节点。
                final Node p = node.predecessor();
               /**
              p == head 说明当前节点已经进到了阻塞队列中,但是Node节点是阻塞队列的第一个,因为它的前驱是         head。正常情况下,我们是将Node节点添加到队尾的,如果说Node的前驱节点是head节点,说明Node节点是        阻塞队列中的第一个,可以再去尝试获取锁。
               */
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                /**
                private void setHead(Node node) {
                     head = node;
                     node.thread = null;
                     node.prev = null;
                }
                */
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
     
                //当前Node不是在CLH队列的第一位或者是当前线程获取锁失败,判断是否需要把当前线程挂起。
                if(shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()){
                    interrupted = true;
                }
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

shouldParkAfterFailedAcquire

Когда мы анализируем структуру FIFO-очереди, мы видим, что в составе узла есть состояние waitStatus, и оно имеет четыре значения.

static final int CANCELLED =  1;
static final int SIGNAL    = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

В случае эксклюзивной блокировки используются только два состояния CANCELED и SIGNAL Как понять смысл этого состояния?

CANCELED легко понять, это означает, что текущий узел отменяет очередь, то есть отменяет захват блокировки. Состояние SIGNAL не представляет состояние текущего узла, оно представляет состояние узла-предшественника текущего узла, когда для состояния ожидания узла установлено значениеSIGNAL, это означает, что его следующий узел был приостановлен (или скоро будет приостановлен), поэтому, когда текущий узел снимает блокировку или отказывается от получения блокировки, если его свойство waitStatus равноSIGNAL, он должен выполнить дополнительную операцию — разбудить свои узлы-преемники.

/**if(shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()){}
*/

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
  //我认为这里只是对前驱节点状态进行判断,判断前驱节点时候是正常状态,因为我们知道如果当前节点被挂
  //唤醒时需要前驱节点来进行唤醒的,如果当前节点的前驱节点是正常状态,就能保证当前节点可以被正常唤醒,
  //因为在等待队列中的节点有可能退出了所等待,所以需要判断前驱节点状态是否正常。
    if (ws == Node.SIGNAL)
   //如果前驱节点的状态已经是SIGNAL,就直接返回true,接下来就会直接去执行parkAndCheckInterrupt()将线程挂起
    //因为前驱节点状态正常,当前节点可以被挂起。
      return true;
  
        /*
         * 当前驱节点的status大于0说明前驱节点取消了抢锁,退出了队列。
         如果前驱节点取消了抢锁,就继续往前找,找到一个节点是正常状态的节点,然后直接跳过那些不排队的节点,添加到               第一个正常等待节点的后面
         */
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         前驱节点的状态既不是SIGNAL,也不是CANCELLED
         用CAS设置前驱节点的ws为 Node.SIGNAL。
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

Здесь стоит отметить, что true будет возвращено только тогда, когда состояние узла-предшественника текущего узла равно SIGNAL, а в остальных случаях будет возвращено только false.

После возврата false он вернется к методу AcquiQueued для цикла, потому что узел-предшественник текущего узла изменился, возможно, узел-предшественник является головным узлом, пока он не вернет true, то есть SIGNAL, когда статус узла-предшественника равен SIGNAL. , вы можете приостановить текущий поток. В это время будет вызвана функция parkAndCheckInterrupt для приостановки потока.

parkAndCheckInterrupt

Этот метод очень прост, потому что предыдущий возвращает true, поэтому вам нужно приостановить поток, этот метод отвечает за приостановку потока, и здесь было проанализировано получение блокировки.

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); // 线程被挂起,停在这里不再往下执行了
    return Thread.interrupted();
}

NofairSync (недобросовестная блокировка)

Реализация недобросовестных блокировок на самом деле мало чем отличается от реализации честных блокировок Давайте взглянем на код.

 static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
        //非公平锁的lock和公平锁的lock区别在于,非公平锁直接上来就去直接获取锁,不管阻塞队列是有线程等待
        final void lock() {
            if (compareAndSetState(0, 1))
             setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
    /**
     if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
     }
     */ 
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
    
    //这个方法来自于Sync
    final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
             //这里非公平锁直接去获取锁。
             //而公平锁的话,要判断队列中是否有线程在等待。
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) 
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

Существуют тонкие различия в реализации честных и нечестных блокировок, но они не очень велики. Разница между недобросовестной блокировкой и справедливой блокировкой заключается в том, что когда нечестная блокировка находится в функции lock(), добавляется фрагмент кода.

//非公平锁的lock
final void lock() {
   if (compareAndSetState(0, 1))
      setExclusiveOwnerThread(Thread.currentThread());
   else
      acquire(1);
}

//公平锁的lock
final void lock() {
      acquire(1);
}

Когда недобросовестная блокировка находится в замке, она будет напрямую пытаться получить блокировку.Если попытка будет успешной, она будет напрямую занимать блокировку. Это первое отличие.

В методе tryAcquire() будет больше справедливых блокировок!

//公平锁
if (c == 0) {
    if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {
      setExclusiveOwnerThread(current);
      return true;
   }
}
 /**
     public final boolean hasQueuedPredecessors() {
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
*/
//非公平锁。
if (c == 0) {
    if (compareAndSetState(0, acquires)) {
      setExclusiveOwnerThread(current);
      return true;
   }
}

Здесь справедливая блокировка будет определять, есть ли в очереди потоки, ожидающие получения блокировки, и будет пытаться получить блокировку только тогда, когда очередь блокировки пуста.

Но недобросовестные блокировки не будут проверять, есть ли потоки, ожидающие в очереди блокировки, а получат блокировку напрямую.

Различия в реализации между честными и нечестными блокировками — это различия, а другая логика реализации аналогична.

отпустить отпустить замок

Ранее мы говорили, что если блокировку не схватить, поток будет приостановлен LockSupport.park(this) Как разблокировать и разбудить поток, давайте разберемся.

 public void unlock() {
     sync.release(1);
 }
 
 //====此方法来自AQS================
 public final boolean release(int arg) {
        //尝试去释放锁
        if (tryRelease(arg)) {
         
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
 }

Здесь будет вызываться метод tryRelease(), чтобы попытаться снять блокировку.Если блокировка снята успешно, оцените состояние головного узла и разбудите поток. Здесь следует отметить одну вещь, голова! = null Мы можем понять, но почему waitStatus != 0. Ранее мы видели блокировки захвата потока, и только одно место отведено для waitStatus. В методе shouldParkAfterFailedAcquire задайте для состояния ожидания узла-предшественника значение Node.SIGNAL. Вы можете повернуться вперед.

Кроме того, в методе enq() во время инициализации значение по умолчанию для waitStatus равно 0, и нет никакого назначения для waitStatus в другом месте. Если waitStatus != 0, это означает, что за головой нет приостановленных потоков, ожидающих пробуждения, поэтому пробуждаться не нужно.

tryRelease

 protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
           //判断占有锁的线程是不是当前线程。
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            //判断是否完全释放锁,有可能重入
            if (c == 0) {
              free = true;
              //将表示占有锁的线程标志置空。
              setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
 }

Это слишком просто, тут нечего сказать.

unparkSuccessor


 private void unparkSuccessor(Node node) {
     //我们知道阻塞队列是一个先进先出的队列,唤醒的话,也是按照顺序唤醒的,我们可以看到参数的Node是头结点
    //如果头结点的waitStatus < 0 ,说
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
    
        Node s = node.next;
    //下面的代码就是唤醒后继节点,但是有可能后继节点取消了等待(waitStatus==1)
    // 从队尾往前找,找到waitStatus<=0的所有节点中排在最前面的
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //唤醒线程  
        if (s != null)
            LockSupport.unpark(s.thread);
  }

 //唤醒线程以后,被唤醒的线程将从以下代码中继续往前走:
  
 private final boolean parkAndCheckInterrupt() {
         LockSupport.park(this); // 刚刚线程被挂起在这里了
      return Thread.interrupted();
 }
 // 又回到这个方法了:acquireQueued(final Node node, int arg),这个时候,node的前驱是head了

На данный момент, в основном, проанализировано приобретение и освобождение замков ReenTranLock.Некоторые конкретные детали могут не упоминаться.Вы можете просто следовать коду самостоятельно.

Суммировать

В этой статье, основанной на эксклюзивной блокировке ReentranLock, был проанализирован AQS и изучено несколько моментов.

  1. В AQS атрибут состояния используется для представления блокировки.В ReentranLock, когда состояние = 1 получает блокировку, состояние = 0 представляет освобождение блокировки, а состояние> 1 представляет собой повторную блокировку. Свойство ExclusiveOwnerThread представляет поток, которому принадлежит блокировка.
  2. addWaiter отвечает за перенос потока, ожидающего блокировки, в узел и успешное добавление его в конец очереди. Это гарантируется вызываемым им методом enq. Метод enq также отвечает за инициализацию очереди, когда очередь пустой.
  3. МетодAcquireQueued используется для продолжения попыток получить блокировку (в зависимости от того, является ли предшествующий узел узла головным или нет) после того, как узел успешно вошел в очередь, или приостановить поток.
  4. Метод shouldParkAfterFailedAcquire используется для обеспечения того, чтобы значение атрибута waitStatus узла-предшественника текущего потока было SIGNAL, таким образом гарантируя, что после его приостановки узел-предшественник будет отвечать за пробуждение себя в соответствующее время.
  5. Метод parkAndCheckInterrupt используется для приостановки текущего потока и проверки статуса прерывания.
  6. Если блокировка, наконец, успешно получена, поток вернется из метода lock() и продолжит выполнение; в противном случае поток заблокируется и будет ждать.

постскриптум

Первоначальный план состоял в том, чтобы выпускать одну статью в неделю, но возникла острая необходимость сделать это временно. В прошлую субботу я работал сверхурочно, а в воскресенье договорился о встрече с шахматным боссом Мьянджи. На прошлой неделе я возвращался очень поздно каждый раз. день. Суббота Сверхурочные также работают сверхурочно, чтобы удовлетворить спрос. Я должен протестировать на этой неделе, и он будет запущен в конце месяца. Я также нашел время, чтобы написать немного в середине, и, наконец, закончил его вчера . В последние две недели я чувствовал, что моя энергия исчерпана, и мое состояние не очень хорошо, я отрегулировал свое состояние в последние несколько дней. На прошлой неделе я много болтала с chessy boss, что натолкнуло меня на массу мыслей.Планирую написать шеринг о непрерывном обучении и направлении личностного роста.Вы также можете поделиться своим опытом друг с другом, когда придет время.

Нелегко кодировать так много слов, так что ставьте лайки и поддерживайте.

Ссылаться на

https://javadoop.com/2017/06/16/AbstractQueuedSynchronizer/