«Вы сегодня проходили собеседование» — AQS и повторные блокировки для параллельного программирования

Java
«Вы сегодня проходили собеседование» — AQS и повторные блокировки для параллельного программирования

предисловие

В последней статье «Блокировки и модели памяти для параллельного программирования» были представлены синхронизация и различные блокировки (предвзятые блокировки, спин-блокировки, облегченные блокировки и тяжелые блокировки), представлены три основные функции модели памяти Java, а также введено ключевое слово volatile, объясняющее его функцию и принцип в деталях, а также знакомит с идеей или алгоритмом, используемым во многих местах в JUC: CAS (сравнение и обмен), сегодняшнее внимание сосредоточено на сложном моменте в JUC AQS (я думаю), эта часть должна смотреть на исходный код .

Сессия интервью

  • Интервьюер: Потом я вспомнил, что есть еще один AQS, название которого очень похоже на CAS, не могли бы вы мне сказать?

  • Я могу.
    1. AQS (AbstractQueuedSynchronizer) — синхронизатор очереди, являющийся базовой структурой для создания блокировок или других компонентов синхронизации (таких как ReentrantLock, ReentrantReadWriteLock, Semaphore и т. д.), параллельный пакет JUC Автор Дуг Леа ожидает, что он станет основой для реализации большинства требований синхронизации, однако, как он и ожидал, AQS является основным базовым компонентом параллельных пакетов JUC.
    2. AQS решает множество деталей, связанных с реализацией синхронизаторов, таких как получение статуса синхронизации и очереди синхронизации FIFO. Создание синхронизатора на основе AQS может принести много преимуществ. Это не только значительно сокращает усилия по внедрению, но также устраняет необходимость иметь дело с конфликтами в нескольких местах. В синхронизаторе, построенном на основе AQS, блокировка может происходить только в один момент, тем самым снижая накладные расходы на переключение контекста и улучшая пропускную способность. В то же время масштабируемость полностью учитывается при разработке AQS. Поэтому в JUC все синхронизаторы, построенные на AQS, могут получить это преимущество.
    3. Основной способ использования AQS — наследование.Подкласс управляет состоянием синхронизации, наследуя синхронизатор и реализуя его абстрактный метод. AQS использует состояние переменной-члена типа int для представления состояния синхронизации: 1. Когда состояние > 0, это означает, что блокировка получена. 2. Когда состояние=0, это означает, что блокировка снята.
    Он предоставляет три метода для работы в синхронном состоянии, и AQS может гарантировать безопасность работы в этом состоянии:
    getState();
    setState(int newState);
    compareAndSetState(int expect, int update);
    4. Кроме того, AQS выполняет постановку в очередь потоков получения ресурсов через встроенную очередь синхронизации FIFO.
    (1) Если текущему потоку не удается получить состояние синхронизации (блокировку), AQS создаст узел (узел) с такой информацией, как текущий поток и состояние ожидания, и добавит его в очередь синхронизации, блокируя текущий поток.
    (2) Когда состояние синхронизации (блокировка) освобождается, поток в узле будет разбужен, чтобы снова попытаться получить состояние синхронизации.

  • Интервьюер: Какие полезные методы, по вашему мнению, предлагает AQS?

  • Я: AQS в основном предоставляет следующие методы (API):
    1. getState(): возвращает значение состояния синхронизации.
    2. setState(int newState): установить текущее состояние синхронизации.
    3. compareAndSetState(int expect, int update): используйте CAS для установки текущего состояния, этот метод может обеспечить атомарность настройки состояния.
    4. tryAcquire(int arg): получить исключительно состояние синхронизации.После успешного получения состояния синхронизации другим потокам необходимо дождаться, пока поток освободит состояние синхронизации, чтобы получить состояние синхронизации.
    5. tryRelease(int arg): исключительно освобождает состояние синхронизации.
    6. tryAcquireShared(int arg): общий статус синхронизации сбора данных, если возвращаемое значение больше или равно 0, это означает, что сбор данных выполнен успешно, в противном случае получение данных завершается неудачно.
    7. tryReleaseShared(int arg): Общее состояние синхронизации выпуска. 8. isHeldExclusively(): если текущий синхронизатор занят потоком в эксклюзивном режиме, обычно этот метод указывает, принадлежит ли он исключительно текущему потоку.
    9.Acquire(int arg): исключительно получает состояние синхронизации. Если текущий поток успешно получает состояние синхронизации, он будет возвращен этим методом, в противном случае он войдет в очередь синхронизации для ожидания. Этот метод вызовет переопределяемый метод tryAcquire(int arg).
    10.AcquireInterruptably(int arg): То же, что иAcquire(int arg), но этот метод реагирует на прерывания. Текущий поток входит в очередь синхронизации, чтобы получить состояние синхронизации. Если текущий поток прерывается, этот метод вызывает InterruptedException и возврат.
    11. tryAcquireNanos(int arg, long nanos): время ожидания для получения состояния синхронизации. Если текущий поток не достиг состояния синхронизации в течение наносекунд, он вернет false и вернет true, если он его получил.
    12.AcquireShared(int arg):Общий тип получает состояние синхронизации.Если текущий поток не получает состояние синхронизации, он войдет в очередь синхронизации для ожидания.Основное отличие от монопольного типа заключается в том, что несколько потоков могут получить состояние синхронизации. состояние синхронизации одновременно; 13.AcquireSharedInterruptably(int arg): Общее состояние синхронизации получения и ответ на прерывание.
    14. tryAcquireSharedNanos(int arg, long nanosTimeout): общее состояние синхронизации сбора данных увеличивает лимит времени ожидания.
    15. release(int arg): исключительно освобождает состояние синхронизации.Этот метод разбудит поток, содержащийся в первом узле в очереди синхронизации, после освобождения состояния синхронизации.
    16. releaseShared(int arg): Общее состояние синхронизации выпуска.

  • Интервьюер: Вы только что упомянули, что AQS поддерживает внутреннюю очередь FIFO, можете ли вы рассказать об этой очереди?

  • Я: Очередь — это очередь CLH. Очередь CLH — это двунаправленная очередь FIFO (все, кто изучал структуры данных, должны понять), и AQS полагается на нее для завершения управления состоянием синхронизации.
    1. Если текущему потоку не удается получить состояние синхронизации, AQS создаст состояние ожидания текущего потока и другую информацию в узле (Node), добавит его в очередь синхронизации CLH и одновременно заблокирует текущий поток.
    2. Когда состояние синхронизации освобождается, первый узел просыпается (справедливая блокировка), первый вошел, первый вышел, так что он попытается снова получить состояние синхронизации.

  • Я: В очереди синхронизации CLH узел (Node) представляет поток, который сохраняет ссылку на поток (thread), статус (waitStatus), предшествующий узел (prev) и последующий узел (next). Он определяется следующим образом: Node — это статический внутренний класс AbstractQueuedSynchronizer:

static final class Node {

   // 共享
   static final Node SHARED = new Node();
   // 独占
   static final Node EXCLUSIVE = null;

   /**
    * 因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态
    */
   static final int CANCELLED =  1;
   /**
    * 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
    */
   static final int SIGNAL    = -1;
   /**
    * 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()后,该节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中
    */
   static final int CONDITION = -2;
   /**
    * 表示下一次共享式同步状态获取,将会无条件地传播下去
    */
   static final int PROPAGATE = -3;

   /** 等待状态 */
   volatile int waitStatus;

   /** 前驱节点,当节点添加到同步队列时被设置(尾部添加) */
   volatile Node prev;

   /** 后继节点 */
   volatile Node next;

   /** 等待队列中的后续节点。如果当前节点是共享的,那么字段将是一个 SHARED 常量,也就是说节点类型(独占和共享)和等待队列中的后续节点共用同一个字段 */
   Node nextWaiter;
   
   /** 获取同步状态的线程 */
   volatile Thread thread;

   final boolean isShared() {
       return nextWaiter == SHARED;
   }

   final Node predecessor() throws NullPointerException {
       Node p = prev;
       if (p == null)
           throw new NullPointerException();
       else
           return p;
   }

   Node() { // Used to establish initial head or SHARED marker
   }

   Node(Thread thread, Node mode) { // Used by addWaiter
       this.nextWaiter = mode;
       this.thread = thread;
   }

   Node(Thread thread, int waitStatus) { // Used by Condition
       this.waitStatus = waitStatus;
       this.thread = thread;
   }
   
}

проиллюстрировать:
1. Поле waitStatus: состояние ожидания, используемое для управления блокировкой и пробуждением потока, существует 5 состояний: INITAL/CANCEELLED/SINGAL/CONDITION/PROPAGATE.
2. Поле потока: поток, соответствующий узлу Node.
3. Поле nextWaiter: Узел Node получает модель состояния синхронизации. Методы tryAcquire(int args) и tryAcquireShared(int args) используются для получения состояния синхронизации для монопольного и общего доступа соответственно. Когда выборка не удалась, Все они вызывают метод addWaiter(режим узла) для присоединения к очереди. И nextWaiter используется, чтобы указать, какой режим: SHARED: перечислить общий режим, EXCLUSIVE: перечислить монопольный режим.
4. Метод предшественника(): получить предыдущий узел Node узла Node. Внутри метода локальная копия Node p = prev предназначена для предотвращения параллелизма, когда prev оценивает == null, он просто модифицируется, чтобы обеспечить безопасность потоков.

  • Интервьюер: Как очередь CLH выполняет операции постановки в очередь и удаления из очереди?
  • Я: Поскольку мы все знаем структуру данных, CLH очень просто присоединиться к команде, как показано на следующем рисунке.
    1. Хвост указывает на новый узел.
    2. Оборот нового узла указывает на текущий последний узел.
    3. Следующий из текущего последнего узла указывает на узел в очереди.

На самом деле, метод addWaiter(Node), реализованный логикой постановки в очередь, должен учитывать параллелизм. Он использует метод CAS для обеспечения правильного добавления Node. код показывает, как показано ниже:

 1: private Node addWaiter(Node mode) {
 2:     // 新建节点
 3:     Node node = new Node(Thread.currentThread(), mode);
 4:     // 记录原尾节点
 5:     Node pred = tail;
 6:     // 快速尝试,添加新节点为尾节点
 7:     if (pred != null) {
 8:         // 设置新 Node 节点的尾节点为原尾节点
 9:         node.prev = pred;
10:         // CAS 设置新的尾节点
11:         if (compareAndSetTail(pred, node)) {
12:             // 成功,原尾节点的下一个节点为新节点
13:             pred.next = node;
14:             return node;
15:         }
16:     }
17:     // 失败,多次尝试,直到成功
18:     enq(node);
19:     return node;
20: }

Удаление из очереди: очередь синхронизации CLH следует за FIFO.После того как поток первого узла освобождает состояние синхронизации, он пробуждает следующий узел (Node.next). Узел-преемник установит себя в качестве головного узла, когда состояние синхронизации будет успешно достигнуто. Этот процесс очень прост: головка выполняет узел и отсоединяет следующий из исходного головного узла и предыдущий узла текущего. Примечание. Нет необходимости использовать CAS, чтобы гарантировать этот процесс, поскольку только один поток может успешно получить состояние синхронизации. Метод setHead(Node node) реализует описанную выше логику удаления из очереди, как показано на следующем рисунке:

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}
  • Опрашивающий: Не могли бы вы рассказать мне, как AQS получает и освобождает состояние синхронизации?
  • Я: Как упоминалось ранее, шаблон проектирования AQS — это шаблон шаблонного метода. Подкласс реализует свой абстрактный метод для управления состоянием синхронизации посредством наследования. AQS предоставляет большое количество шаблонных методов для достижения синхронизации, которые в основном делятся на три категории. : исключительное состояние синхронизации получения и выпуска, совместное состояние синхронизации получения и выпуска, а также запрос состояния ожидающих потоков в очереди синхронизации.
    1. Эксклюзивный: только один поток одновременно поддерживает состояние синхронизации.
    Эксклюзивный доступ к статусу синхронизации
 public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

1. tryAcquire: попытаться получить блокировку, установить статус блокировки и вернуть true, если получение прошло успешно, в противном случае вернуть false. Этот метод реализуется самим настраиваемым компонентом синхронизации и должен обеспечивать потокобезопасный доступ к состоянию синхронизации.
2. addWaiter: если tryAcquire возвращает false (не удалось получить статус синхронизации), вызовите этот метод, чтобы добавить текущий поток в конец очереди синхронизации CLH.
3.AcquireQueued: Текущий поток будет вращаться в соответствии с принципом справедливости до тех пор, пока не будет получена блокировка.
4, selfInterrupt: генерировать прерывание
5. Давайте взглянем на методAcquireQueued: этот метод представляет собой вращающийся процесс. После того, как текущий поток (узел) войдет в очередь синхронизации, он войдет в вращающийся процесс. Каждый узел будет наблюдать интроспективно и получать синхронизацию при выполнении условий. выполнено.После состояния можно выйти из процесса отжима. Как видно из следующего кода, текущий поток всегда будет пытаться получить состояние синхронизации, конечно, только если его предшествующий узел является головным узлом, может попытаться получить состояние синхронизации, причина: сохранение принципа синхронизации FIFO очередь. После того, как головной узел выйдет из состояния синхронизации, он разбудит свой узел-преемник.После пробуждения узла-преемника ему необходимо проверить, является ли он головным узлом.

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            //中断标志
            boolean interrupted = false;
            /*
             * 自旋过程,其实就是一个死循环而已
             */
            for (;;) {
                //当前线程的前驱节点
                final Node p = node.predecessor();
                //当前线程的前驱节点是头结点,且同步状态成功
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //获取失败,线程等待--具体后面介绍
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

Существуют также методы прерывания ответа на эксклюзивное получение и эксклюзивного получения тайм-аута, которые здесь подробно не описываются.
Эксклюзивное состояние синхронизации выпуска
После того, как поток получает состояние синхронизации, ему необходимо освободить состояние синхронизации после выполнения соответствующей логики. AQS предоставляет метод release(int arg) для сброса состояния синхронизации:

 public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

Этот метод также вызывает метод tryRelease(int arg) для сброса состояния синхронизации.После успешного сброса вызывается метод unparkSuccessor(узел узла) для пробуждения узла-преемника.
Резюме: Внутри AQS поддерживается очередь синхронизации FIFO.Если потоку не удается получить состояние синхронизации, он присоединяется к хвосту очереди CLH и продолжает вращаться. Поток в очереди синхронизации CLH будет определять, является ли его предшествующий узел первым узлом, когда он вращается. Если это первый узел, он постоянно пытается получить состояние синхронизации, и если получение выполняется успешно, он выходит из очереди синхронизации CLH. Когда поток завершает выполнение логики, он освобождает состояние синхронизации и пробуждает узлы-преемники после освобождения.
2. Общий
Основное различие между общим типом и эксклюзивным типом заключается в том, что только один поток эксклюзивного типа может одновременно получать состояние синхронизации, в то время как общий тип может иметь несколько потоков для одновременного получения состояния синхронизации. Например, операция чтения может быть прочитана несколькими потоками одновременно, а операция записи может быть записана только одним потоком за раз.
Общий доступ к статусу синхронизации
AQS предоставляет метод acceShared(int arg) для обмена статусом синхронизации:

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            //获取失败,自旋获取同步状态
            doAcquireShared(arg);
    }

Вышеупомянутый метод состоит в том, чтобы сначала вызвать метод tryAcquireShared(int arg), чтобы попытаться получить состояние синхронизации.Если получение не удается, вызовите doAcquireShared(int arg), чтобы прокрутить, чтобы получить состояние синхронизации.

private void doAcquireShared(int arg) {
        /共享式节点
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //前驱节点
                final Node p = node.predecessor();
                //如果其前驱节点,获取同步状态
                if (p == head) {
                    //尝试获取同步
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

addWaiter(Node.SHARED) здесь сначала добавляет текущий поток в хвост очереди синхронизации CLH, а затем зацикливается (прокручивается), чтобы попытаться получить состояние синхронизации: node.predecessor() представляет узел-предшественник текущего узла, если (p == head) Если предшествующий узел Если это первый узел, вызовите метод tryAcquireShared(int args), чтобы попытаться получить состояние синхронизации.Если получение прошло успешно (r >=0), он выйдет из цикла (прокрутка) и разбудит следующий ожидающий узел. узел перед выходом (то есть установка следующего узла. Узел-предшественник является первым узлом).
Общее состояние синхронизации выпуска

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
  • Интервьюер: Из приведенного выше исходного кода я вижу, что метод shouldParkAfterFailedAcquire, по-видимому, используется для блокировки потоков.Можете ли вы рассказать мне, как AQS блокирует и пробуждает потоки?
  • Я: Как упоминалось выше, после того, как потоку не удалось получить состояние синхронизации, он присоединяется к очереди синхронизации CLH и постоянно получает состояние синхронизации путем вращения, но в процессе вращения необходимо определить, нужно ли блокировать текущий поток. . Получение статуса синхронизации После сбоя поток не блокируется немедленно, и необходимо проверить состояние потока.Метод проверки — shouldParkAfterFailedAcquire(предшествующий узел, узел узла).Этот метод в основном полагается на предшествующий узел, чтобы определить, должен ли текущий поток быть заблокирован.
    Например, часть исходного кода выше:
if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //前驱节点
        int ws = pred.waitStatus;
        //状态为signal,表示当前线程处于等待状态,直接放回true
        if (ws == Node.SIGNAL)
            return true;
        //前驱节点状态 > 0 ,则为Cancelled,表明该节点已经超时或者被中断了,需要从同步队列中取消
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } 
        //前驱节点状态为Condition、propagate
        else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

Этот код в основном проверяет, нужно ли блокировать текущий поток.Конкретные правила таковы:
1. Если статус узла-предшественника текущего потока — SINNAL, это означает, что текущий поток необходимо заблокировать, вызвать метод unpark() для пробуждения и напрямую вернуть значение true, а текущий поток заблокирован.
2. Если состояние узла-предшественника текущего потока ОТМЕНЕНО (ws > 0), это означает, что узел-предшественник потока дождался тайм-аута или был прерван, и узел-предшественник необходимо удалить из списка. Очередь CLH до возврата к узлу-предшественнику Статус 3. Если предшествующий узел не является SINNAL или ОТМЕНЕН, установите его предшествующий узел в SINNAL с помощью CAS и верните false. Если метод shouldParkAfterFailedAcquire(Node pre, Node node) возвращает false, вызовите метод parkAndCheckInterrupt(), чтобы заблокировать текущий поток.

 private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

Метод parkAndCheckInterrupt() в основном приостанавливает текущий поток, тем самым блокируя стек вызовов потока, и возвращает состояние прерывания текущего потока. Внутренне для блокировки вызывается метод park() класса инструментов LockSupport.

  • Интервьюер: Давайте поговорим о применении AQS в JUC — ReentrantLock. Можете ли вы рассказать мне, что вы знаете о ReentrantLock?
  • Я: ReentrantLock — это реентерабельная блокировка, рекурсивный неблокирующий механизм синхронизации. Это может быть эквивалентно использованию синхронизированного, но обеспечивает более мощный и гибкий механизм, чем синхронизированный, что может снизить вероятность взаимоблокировки.
    1. ReentrantLock будет принадлежать потоку, который недавно успешно получил блокировку и не снял блокировку. Если блокировка не удерживается потоком, поток, вызвавший блокировку, успешно получит блокировку. Если текущий поток уже владеет блокировкой, вызов метода lock() немедленно завершится.
    2. Reentrant предоставляет выбор честных и нечестных блокировок. Конструктор принимает необязательный параметр справедливости (true означает справедливую блокировку, в противном случае — несправедливую блокировку). Разница между честными и нечестными блокировками заключается в том, что получение честных блокировок происходит последовательно. Однако эффективность честных блокировок часто не так высока, как у нечестных, и в случае многопоточного доступа честные блокировки показывают меньшую пропускную способность.
    Давайте посмотрим на исходный код
    1. метод блокировки
public void lock() {
    sync.lock();
}

Sync — это внутренний класс в ReentrantLock, он наследует AQS, имеет два подкласса: справедливая блокировка FairSync и нечестная блокировка NonFairSync. Большинство функций в ReenTrantLock делегированы Sync, и Sync Определяет абстрактный метод lock(), который будет реализован его подклассами, и реализует метод nonfairTryAcquire(int Acquires) по умолчанию. Давайте посмотрим на метод блокировки несправедливых блокировок.

final void lock() {
    //尝试获取锁
    if (compareAndSetState(0,1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
    //获取失败,调用AQS的acquire(int arg)方法
        acquire(1);
}

Сначала попытайтесь получить блокировку, если она будет успешной, установите блокировку исключительно для текущего потока. В случае сбоя сбора данных вызывается методAcquire(1), который определяется в AQS следующим образом:

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

Здесь первым вызывается tryAcquire(int arg) Как упоминалось в AQS, этот метод должен быть реализован самим компонентом синхронизации. Реализация в NonfairSync выглядит следующим образом:

protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            //获取同步状态
            int c = getState();
            //state==0 表示该锁处于空闲状态
            if (c == 0) {
                //用CAS方式占用该锁
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //判断锁持有的线程是否为当前线程
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                //成功获得锁的线程再次获得锁,增加同步状态
                setState(nextc);
                return true;
            }
            return false;
        }

2. Отпустите метод разблокировки блокировки

public void unlock() {
        sync.release(1);
    }

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;

            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
}

protected final boolean tryRelease(int releases) {
            //减掉releases
            int c = getState() - releases;
            //如果释放的不是持有锁的线程,抛异常
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            //state == 0 表示已经释放完全了,其他线程可以获取同步状态了
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            //重新设置同步状态的值
            setState(c);
            return free;
}

unlock() внутренне использует Sync's release(int arg) для снятия блокировки, а release(int arg) определен в AQS. tryRelease также должен быть реализован самим компонентом синхронизации. Если выпуск выполнен успешно, а затем установлено, что за первым узлом находятся потоки, ожидающие состояния синхронизации, вызовите метод unparkSuccessor(Node node), чтобы разбудить следующий поток.

  • Интервьюер: Итак, каковы сходства и различия между ReentrantLock и Synchronized?

  • Я: Во-первых, у них определенно одинаковая функциональность и семантика памяти. Отличие заключается в следующих моментах:
    1. По сравнению с синхронизированным, ReentrantLock предоставляет все больше и больше полных функций и обладает большей масштабируемостью. Например, ожидание блокировки по времени, которое может прервать ожидание блокировки и голосование за блокировку.
    2. ReentrantLock также предоставляет Condition, который является более подробным и гибким для операций ожидания и пробуждения потока, поэтому ReentrantLock больше подходит для нескольких переменных условий и высококонкурентных блокировок.
    3. ReentrantLock предоставляет запрос на блокировку с возможностью опроса. Он попытается получить блокировку. В случае успеха он продолжит работу, в противном случае он будет ждать, пока следующая среда выполнения не обработает его, в то время как синхронизированный либо преуспеет, либо заблокируется после входа в блокировку. запрос. Таким образом, по сравнению с синхронизированным, ReentrantLock будет нелегко заблокировать.
    4. ReentrantLock поддерживает более гибкие блоки кода синхронизации, но при использовании синхронизированного его можно получить и освободить только в структуре синхронизированного блока.
    5. ReentrantLock поддерживает обработку прерываний, и его производительность относительно лучше.

  • Интервьюер: Вы когда-нибудь использовали ReentrantReadWirteLock? Можете представить?

  • Я: Просто посмотрите на исходный код. Первые несколько строк ReentrantReadWrite очень просты, посмотрите на класс Sync, сначала посмотрите на все свойства Sync:

abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 6317671515068378041L;

        /*
        下面这块就是将state一分为二,高16位用于共享模式,低16位用于独占模式。
         */

        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

        /** 取c的高16位值,代表读锁的获取次数 */
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        /** 取c的低16位值,代表写锁的冲入次数,因为写锁是独占模式 */
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

        /**
         这个静态内部类的实例用来记录每个线程持有的读锁数量(读锁重入)
         */
        static final class HoldCounter {
            int count = 0;
            // Use id, not reference, to avoid garbage retention
            final long tid = getThreadId(Thread.currentThread());
        }

        /**
         ThreadLocal的子类
         */
        static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }

        /**
         组合使用上面两个类,用一个ThreadLocal来记录当前线程持有的读锁数量
         */
        private transient ThreadLocalHoldCounter readHolds;

        /**
         用于缓存,记录“最后一个获取读锁的线程”的读锁重入次数,所以不管哪个线程获取到读锁后,就把这个值占用,这样
         就不用到ThreadLocal中查询map了。在获取-释放读锁的这段时间,如果没有其他线程获取读锁的话,此缓存可以帮助提高性能。
         */
        private transient HoldCounter cachedHoldCounter;

        /**
        第一个获取读锁的线程(并且其未获取读锁),以及它持有的读锁数量。
         */
        private transient Thread firstReader = null;
        private transient int firstReaderHoldCount;

        Sync() {
        //初始化readHolds这个ThreadLocal属性
            readHolds = new ThreadLocalHoldCounter();
        //为了保证readHolds内存可见性
            setState(getState()); // ensures visibility of readHolds
        }

ReentrantReadWriteLock — это то же самое, что и ReentrantLock. Его телом блокировки по-прежнему является Sync. Его блокировка чтения и блокировка записи реализованы Sync, поэтому ReentrantReadWriteLock на самом деле имеет только одну блокировку, но получает только блокировки чтения. Это не то же самое, что и способ написания замков. Его блокировка чтения-записи на самом деле состоит из двух классов ReadLock и WriteLock. Состояние типа int используется в ReentrantLock для представления состояния синхронизации, а значение представляет количество раз, когда блокировка неоднократно запрашивалась потоком. Однако блокировка чтения-записи ReentrantReadWriteLock поддерживает пару внутренних блокировок, и для поддержания нескольких состояний необходимо использовать переменную, поэтому блокировка чтения-записи использует метод «побитового вырезания и использования» для поддержания этой переменной. , который разделен на две части, старшие 16 бит указывают на чтение, а младшие 16 бит указывают на запись. После деления состояние блокировки чтения и записи определяется битовой операцией. Если текущее состояние синхронизации S, то состояние записи = S&0x0000FFFF (старшие 16 бит стираются), а состояние чтения = S >>> 16 (беззнаковый 0 сдвинут вправо на 16 бит).

  • Я: Давайте взглянем на приобретение блокировки записи. Блокировка записи — это эксклюзивная блокировка, поддерживающая повторный вход.
protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            //当前锁个数
            int c = getState();
            //写锁
            int w = exclusiveCount(c);
            if (c != 0) {
                // c !=0 && w == 0表示存在读锁,当前线程不是已经获取写锁的线程
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                //超出最大范围
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // 尝试获取写锁
                setState(c + acquires);
                return true;
            }
            //是否需要阻塞
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            //设置获取锁的线程为当前线程
            setExclusiveOwnerThread(current);
            return true;
        }
  • Я: Давайте посмотрим на снятие блокировки записи.
       protected final boolean tryRelease(int releases) {
           //如果释放的线程不是锁的持有者,抛异常
           if (!isHeldExclusively())
               throw new IllegalMonitorStateException();
           //同步状态更新
           int nextc = getState() - releases;
           //若写锁的新线程数为0,则将锁的持有者设置为null
           boolean free = exclusiveCount(nextc) == 0;
           if (free)
               setExclusiveOwnerThread(null);
           setState(nextc);
           return free;
       }
  • Я: Давайте взглянем на приобретение блокировки чтения
public void lock() {
   sync.acquireShared(1);
}

public final void acquireShared(int arg) {
   if (tryAcquireShared(arg) < 0)
       doAcquireShared(arg);
}

protected final int tryAcquireShared(int unused) {
           //当前线程
           Thread current = Thread.currentThread();
           //锁的个数
           int c = getState();
           //计算写锁,如果存在写锁且锁的持有者不是当前线程,直接返回-1
           if (exclusiveCount(c) != 0 &&
               getExclusiveOwnerThread() != current)
               return -1;
           //计算读锁
           int r = sharedCount(c);
           //readerShouldBlock:读锁是否需要等待(公平锁原则),且小于最大线程数,且CAS设置读取锁状态成功
           if (!readerShouldBlock() &&
               r < MAX_COUNT &&
               compareAndSetState(c, c + SHARED_UNIT)) {
               如果锁没有被任何线程获取,那么当前线程就是第一个获取读锁的线程
               if (r == 0) {
                   firstReader = current;
                   firstReaderHoldCount = 1;
               }
               //如果获取读锁的线程为第一次获取读锁的线程,则 firstReaderHoldCount重入数+1
               else if (firstReader == current) {
                   firstReaderHoldCount++;
               } 
               else {
                   HoldCounter rh = cachedHoldCounter;
                   if (rh == null || rh.tid != getThreadId(current))
                       cachedHoldCounter = rh = readHolds.get();
                   else if (rh.count == 0)
                       readHolds.set(rh);
                   rh.count++;
               }
               return 1;
           }
           return fullTryAcquireShared(current);
}
  • Я: Давайте посмотрим на снятие блокировки чтения.
public void unlock() {
           sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
   if (tryReleaseShared(arg)) {
       doReleaseShared();
        return true;
   }
   return false;
}

protected final boolean tryReleaseShared(int unused) {
           //当前线程
           Thread current = Thread.currentThread();
           //如果想要释放锁的线程为第一个获取锁的线程
           if (firstReader == current) {
               // 仅获取了一次,则需要将firstReader设置为Null,否则-1
               if (firstReaderHoldCount == 1)
                   firstReader = null;
               else
                   firstReaderHoldCount--;
           } 
           //获取rh对象,并更新“当前线程获取锁的信息”
           else {
               HoldCounter rh = cachedHoldCounter;
               if (rh == null || rh.tid != getThreadId(current))
                   rh = readHolds.get();
               int count = rh.count;
               if (count <= 1) {
                   readHolds.remove();
                   if (count <= 0)
                       throw unmatchedUnlockException();
               }
               --rh.count;
           }
           //CAS更新同步状态
           for (;;) {
               int c = getState();
               int nextc = c - SHARED_UNIT;
               if (compareAndSetState(c, nextc))
                   // Releasing the read lock has no effect on readers,
                   // but it may allow waiting writers to proceed if
                   // both read and write locks are now free.
                   return nextc == 0;
           }
}
  • Я: В процессе получения и снятия блокировки чтения вы можете увидеть переменную rh (HoldCounter), которая очень важна в блокировке чтения. Чтобы лучше понять HoldCounter, давайте временно подумаем, что это не вероятность блокировки, а эквивалент счетчика. Операция общей блокировки эквивалентна операции со счетчиком. Если общая блокировка получена, счетчик равен +1, а если общая блокировка снята, счетчик равен -1. Общая блокировка может быть освобождена и повторно введена только после того, как поток получит общую блокировку, поэтому функция HoldCounter — это количество общих блокировок, удерживаемых текущим потоком.Это число должно быть привязано к потоку, иначе блокировки других потоков будут быть выброшено исключение.
/**
HoldConter定义比较简单,就是一个计数器count和线程id两个变量。
*/
static final class HoldCounter {
            int count = 0;
            
            final long tid = getThreadId(Thread.currentThread());
}
/**
通过ThreadLocal,HoldCounter就可以与线程绑定了,故而,HoldCounter应该就是绑定线程上的一个计数器,而ThreadLocalHoldCounter则是线程绑定的ThreadLocal。
*/
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
    public HoldCounter initialValue() {
        return new HoldCounter();
    }
}
  • Интервьюер: Можете ли вы сказать, что существуют другие методы синхронизации между потоками помимо ожидания/уведомления объекта?
  • Я: И состояние. Блокировка предоставляет условное условие, которое является более подробным и гибким для операций ожидания и пробуждения потока. Условие — это обобщенная условная очередь. Он обеспечивает более гибкий шаблон ожидания/уведомления для потоков, Поток выполняет операцию приостановки после вызова метода await и не будет разбужен до тех пор, пока какое-либо условие, которого ожидает поток, не станет истинным. Условие должно использоваться с блокировками, поскольку доступ к общим переменным состояния происходит в многопоточной среде. Экземпляр Condition должен быть привязан к Lock, поэтому Condition обычно реализуется как внутренняя реализация Lock. Изображение, сравнивающее методы монитора Condition и Object.

  • Интервьюер: Какие методы предоставляет Condition для блокировки и пробуждения потоков?

  • Я: Условие предоставляет ряд методов для блокировки и пробуждения потоков:
    1. await(): заставляет текущий поток находиться в состоянии ожидания, пока он не получит сигнал или не будет прерван.
    2. awiat (долгое время, единица измерения TimeUnit): заставляет текущий поток находиться в состоянии ожидания между получением сигнала, прерыванием или достижением времени ожидания выполнения.
    3. awaitNanos(long nanosTimeout): заставляет текущий поток находиться в состоянии ожидания, пока он не получит сигнал, не будет прерван или не достигнет указанного времени ожидания. Возвращаемое значение указывает оставшееся время, если в Просыпаться раньше nanosTimeout, тогда возвращаемое значение = nanosTimeout - время потребления, если возвращаемое значение 4. awaitUninterruptably(): переводит текущий поток в состояние ожидания до тех пор, пока он не получит сигнал. (Этот метод не чувствителен к прерываниям).
    5. awaitUntil(дата крайнего срока): переводит текущий поток в состояние ожидания до тех пор, пока он не получит сигнал, не будет прерван или не достигнет указанного крайнего срока. Возвращает true, если уведомление не было получено в течение указанного времени. В противном случае он указывает, что указанное время пришло, и возвращает false.
    6, сигнал (): разбудить ожидающий поток. Поток должен получить блокировку, связанную с условием, прежде чем вернуться из метода ожидания.
    7, signalAll (): разбудить все ожидающие потоки. Поток, способный вернуться из метода ожидания, должен получить блокировку, связанную с условием.

  • Интервьюер: Как Condition обеспечивает блокировку и пробуждение потока? (Принцип)

  • Я: Посмотрите, источник должен получить метод Condition newCondition Lock, метод, определенный в блокировке интерфейса, возвращаемый результат привязан к этому новому экземпляру блокировки экземпляра Condition. Условие представляет собой интерфейс, только один класс реализации ConditionObject. ConditionObject — это внутренний класс AQS. В ReentrantLock

    public Condition newCondition() {
        return sync.newCondition();
}

В синхронизации

final ConditionObject newCondition() {
        return new ConditionObject();
}

В AbstractQueuedSynchronizer:

public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        
        private transient Node firstWaiter;
        
        private transient Node lastWaiter;

        
        public ConditionObject() { }
  • Я продолжил: Очередь ожидания AQS и очередь условий — это две независимые очереди.
    1. await() освобождает ресурс блокировки на основе блокировки, удерживаемой текущим потоком, и добавляет новый узел условия в конец очереди условий, чтобы заблокировать текущий поток.
    2. signal() должен переместить головной узел условия в хвост ожидающего узла AQS и позволить ему подождать, чтобы снова получить блокировку.
    Ниже приведена схема узлов входа и выхода очередей AQS и Condition.Вы можете увидеть отношения входа и выхода и условия узлов потоков в двух очередях на этих изображениях.
    1. Состояние инициализации: очередь ожидания AQS имеет 3 узла, а очередь условий имеет 1 узел (или ни один из них).

2. Узел 1 выполняет Condition.await()
(1) Откиньте голову назад.
(2) Снимите блокировку узла 1 и удалите его из очереди ожидания AQS.
(3) Добавьте узел 1 в очередь ожидания условия.
(4) Обновите lastWaiter до узла 1.

3. Узел 2 выполняет операцию Condition.signal().
(5) Переместите первого официанта назад.
(6) Удалить узел 4 из очереди условий.
(7) Добавьте узел 4 в очередь ожидания AQS.
(8) Обновите конец очереди ожидания AQS.

  • Интервьюер: Легче сказать, чем сделать, но вам нужно попрактиковаться.Можете ли вы использовать Condition для реализации производитель-потребитель?
  • Я: Взял ручку и пять минут писал демо:
/**
 * Condition实现简单的生产者消费者
 */
public class ConditionDemo {
    
    private LinkedList<String> buffer; // 容器
    
    private int maxSize; //容量
    
    private Lock lock;
    
    private Condition fullCondition;
    
    private Condition notFullCondition;
    
    ConditionDemo(int maxSize) {
        this.maxSize = maxSize;
        buffer = new LinkedList<>();
        lock = new ReentrantLock();
        fullCondition = lock.newCondition();
        notFullCondition = lock.newCondition();
    }

    /**
     * 生产者
     * @param produceStr
     * @throws InterruptedException
     */
    public void set(String produceStr) throws InterruptedException {
        //获得锁
        lock.lock();
        try {
            while (maxSize == buffer.size()) {
                notFullCondition.await();
            }
            
            buffer.add(produceStr);
            fullCondition.signal();
        } finally {
            //释放锁
            lock.unlock();
        }
    }

    /**
     * 消费者
     * @return
     * @throws InterruptedException
     */
    public String get() throws InterruptedException {
        String consumeStr;
        lock.lock();
        try {
            while (buffer.size() == 0) {
                fullCondition.await();
            }
            consumeStr = buffer.pollFirst();
            notFullCondition.signal();
        } finally {
            lock.unlock();
        }
        return consumeStr;
    }
}  

Замечательный отзыв о прошлом

Подпишитесь на официальный аккаунт, чтобы читать больше интересных статей

Вы сегодня брали интервью?
Серия параллельных программ Java:
nuggets.capable/post/684490…
редис:
nuggets.capable/post/684490…
весна:
nuggets.capable/post/684490…
MyBatis:
nuggets.capable/post/684490…

Серия баз данных
индекс mysql:
nuggets.capable/post/684490…
Блокировка базы данных:
nuggets.capable/post/684490…
Подтаблица подбиблиотеки:
nuggets.capable/post/684490…
Транзакции базы данных:
nuggets.capable/post/684490…

Серия онлайн-вопросов
nuggets.capable/post/684490…

основы Java
nuggets.capable/post/684490…
nuggets.capable/post/684490…