Параллелизм Java (5) — ReentrantLock и AQS

Node.js Java задняя часть Безопасность

введение

существуетsynchronizedПеред оптимизацией класс инструмента синхронизации, который мы чаще всего используем в кодировании, должен бытьReentrantLockсвоего рода,ReentrantLockПосле оптимизацииsynchronizedПроизводительность ключевого слова, в свою очередь, обеспечивает большую гибкость. по сравнению сsynchronized, он более мощный по функциям, с ожиданием прерывания, справедливой блокировкой и связыванием нескольких условий и т. д.synchronizedФункция, которая не имеет, является ключевым классом параллелизма, который мы должны освоить в процессе разработки.

ReentrantLockОн играет ключевую роль в параллельных пакетах JDK не только из-за собственной частоты использования, но также обеспечивает базовую поддержку большого количества параллельных классов в параллельных пакетах JDK, включаяCopyOnWriteArrayLit,CyclicBarrierиLinkedBlockingDequeи Т. Д. теперь, когдаReentrantLockЭто настолько важно, что понимание его основного принципа реализации полезно для нас, чтобы мы могли гибко использовать его в различных сценариях.ReentrantLockПомимо обнаружения различных проблем с параллелизмом, это имеет решающее значение. Эта статья поможет вам шаг за шагом проанализироватьReentrantLockБазовая логика реализации, как ее лучше использовать после понимания логики реализацииReentrantLock.

Связь между ReentrantLock и AbstractQueuedSynchronizer

в настоящее время используетReentrantLockкласс, первый шаг — создать его экземпляр, то есть использоватьnew ReentrantLock(), давайте посмотрим на исходный код его создания:

public ReentrantLock() {
    sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

Как вы можете видеть в коде,ReentrantLockПредоставляет 2 метода создания экземпляров, метод создания без параметров по умолчаниюNonfairSync()инициализированsyncПоля, экземпляры методов с параметрами используются через область параметровNonfairSync()илиFairSync()инициализацияsyncполе.

Из названия видно, что мы обычно используем реализацию недобросовестных блокировок и справедливых блокировок.Честные блокировки должны получать блокировки путем постановки в очередь FIFO.Несправедливые блокировки означают, что они могут быть вставлены в очередь.По умолчаниюReentrantLockРеализации, использующие несправедливые блокировки. тогда даsyncКакова логика реализации поля? посмотриsyncкод:

private final Sync sync;

abstract static class Sync extends AbstractQueuedSynchronizer {......}

static final class NonfairSync extends Sync {......}

static final class FairSync extends Sync {......}

Нашел здесьAbstractQueuedSynchronizerКлассы, честные блокировки и нечестные блокировки на самом деле вAbstractQueuedSynchronizerОн реализован на базе AQS. AQS предоставляетReentrantLockоснова реализации.

метод lock() ReentrantLock

АнализыReentrantLockПосле создания экземпляра давайте посмотрим, как он реализует функцию блокировки:

//ReentrantLock的lock方法
public void lock() {
    sync.lock();
}

//调用了Sync中的lock抽象方法
abstract static class Sync extends AbstractQueuedSynchronizer {
    ......
    /**
        * Performs {@link Lock#lock}. The main reason for subclassing
        * is to allow fast path for nonfair version.
        */
    abstract void lock();
    ......
}

называетсяsyncизlock()метод,SyncКатегорияlock()метод является абстрактным методом,NonfairSync()иFairSync()соответственноlock()метод реализован.

//非公平锁的lock实现
static final class NonfairSync extends Sync {
    ......
    /**
        * Performs lock.  Try immediate barge, backing up to normal
        * acquire on failure.
        */
    final void lock() {
        if (compareAndSetState(0, 1)) //插队操作,首先尝试CAS获取锁,0为锁空闲
            setExclusiveOwnerThread(Thread.currentThread()); //获取锁成功后设置当前线程为占有锁线程
        else
            acquire(1);
    }
    ......
}

//公平锁的lock实现
static final class FairSync extends Sync {
    ......
    final void lock() {
        acquire(1);
    }
    ......
}

Обратите внимание на разницу между ними,NonfairSync()Операция CAS будет выполнена первой, а состояние состояния будет установлено от 0 до 1. Это операция «вырезания очереди» упомянутой выше несправедливой блокировки.Как упоминалось ранее, операция CAS является атомарной по умолчанию, что гарантирует что установлен потокобезопасность. Это первое различие между недобросовестными замками и честными замками.

Так для чего это состояние? Что значит установить от 0 до 1? Давайте посмотрим на исходный код, связанный с состоянием:

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

/**
    * The synchronization state.
    */
private volatile int state;

protected final int getState() {
    return state;
}

protected final void setState(int newState) {
    state = newState;
}

Сначала переменная состояния представляет собойvolatileдекоративныйintТип variable, который обеспечивает видимость этой переменной в многопоточной среде. Из аннотации «Состояние синхронизации» переменной видно, что состояние представляет собой состояние синхронизации. вернуться к вершинеlock()метод, после успешной настройки вызовитеsetExclusiveOwnerThreadМетод устанавливает текущий поток в закрытую переменную, которая представляет поток, который в настоящее время получает блокировку и помещается в родительский класс AQS.AbstractOwnableSynchronizerреализованы в классе.

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {
    ......

    /**
     * The current owner of exclusive mode synchronization.
     */
    private transient Thread exclusiveOwnerThread;

    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

Если состояние настройки успешное,lock()После выполнения метода блокировка получена. Можно видеть, что состояние состояния — это состояние синхронизации, используемое для управления получением блокировки, 0 означает, что блокировка простаивает, а 1 означает, что блокировка получена. Так что, если установка состояния состояния не удалась? буду звонить дальшеacquire(1)метод, справедливая блокировка вызывается напрямуюacquire(1)метод, не будет использовать операцию CAS для сокращения очереди.acquire(1)Метод реализован в AQS, посмотрите его исходный код:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

Этот метод очень важен и прост для понимания.Есть несколько шагов для работы, первый вызовtryAcquireПопытка получить блокировку, в случае успеха выполнение завершено, если получение не удалось, вызовитеaddWaiterМетод добавляет текущий поток в очередь ожидания и выполняет его после добавления.acquireQueuedметод приостанавливает поток. Выполнить, если требуется прерывание во время ожиданияselfInterruptПрервите нить. Давайте посмотрим на детали выполнения этого процесса, в первую очередьtryAcquireметод:

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

//NonfairSync
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) { //锁空闲
        if (compareAndSetState(0, acquires)) { //再次cas操作获取锁
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) { //当前线程重复获取锁,也就是锁重入
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

//FairSync
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() && //判断队列中是否已经存在等待线程,如果存在则获取锁失败,需要排队
            compareAndSetState(0, acquires)) { //不存在等待线程,再次cas操作获取锁
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) { //当前线程重复获取锁,也就是锁重入
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

//AQS中实现,判断队列中是否已经存在等待线程
public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

AQS не предоставляет конкретную реализацию,ReentrantLockСправедливые блокировки и нечестные блокировки имеют свои собственные реализации соответственно. Несправедливые блокировки пытаются снова получить блокировку с помощью операции CAS, когда блокировка простаивает, чтобы обеспечить потокобезопасность. Если текущая блокировка не простаивает, т.е.state状态不为0,则判断是否是重入锁,也就是同一个线程多次获取锁,是重入锁则将state状态+1,这也是ReentrantLock` поддерживает логику повторного входа в блокировку.

Существует второе различие между справедливыми и нечестными блокировками: честные блокировки будут вызываться первыми, когда блокировка свободна.hasQueuedPredecessorsМетод определяет, есть ли ожидающий поток в очереди ожидания блокировки, и если есть, он не будет пытаться получить блокировку, а перейдет к следующему процессу очереди. На этом этапе разница между несправедливыми блокировками и честными блокировками должна быть понятна всем. Если во время интервью вы спросите, в чем разница между честным и нечестным замком, думаю, каждый без труда на него ответит.

пройти черезtryAcquireПосле неудачного получения блокировки он вызоветacquireQueued(addWaiter), давайте сначала посмотримaddWaiterметод:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);   //用EXCLUSIVE模式初始化一个Node节点,代表是一个独占锁节点
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) { //如果尾节点不为空,代表等待队列中已经有线程节点在等待
        node.prev = pred; //将当前节点的前置节点指向尾节点
        if (compareAndSetTail(pred, node)) { //cas设置尾节点为当前节点,将当前线程加入到队列末尾,避免多线程设置导致数据丢失
            pred.next = node;
            return node;
        }
    }
    enq(node); //如果队列中无等待线程,或者设置尾节点不成功,则循环设置尾节点
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node())) //空队列,初始化头尾节点都为一个空节点
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) { //重复addWaiter中的设置尾节点,也是cas的经典操作--自旋,避免使用Synchronized关键字导致的线程挂起
                t.next = node;
                return t;
            }
        }
    }
}

static final class Node {
    /** Marker to indicate a node is waiting in shared mode */
    static final Node SHARED = new Node(); //共享模式
    /** Marker to indicate a node is waiting in exclusive mode */
    static final Node EXCLUSIVE = null;  //独占模式

    ......
}

addWaiterМетод сначала инициализирует узел Node в режиме EXCLUSIVE. Узлы должны быть знакомы всем Многие из цепочек, представленных в статьях серии сборников, которые я написал, реализованы таким образом. Node в AQS не является исключением, его структура очереди также реализована путем реализации внутреннего класса Node, который представляет собой двустороннюю очередь. Узлы узла разделены на два режима: режим общей блокировки SHARED, режим эксклюзивной блокировки EXCLUSIVE,ReentrantLockИспользуется режим монопольной блокировки EXCLUSIVE, а EXCLUSIVE используется для инициализации. В следующих статьях мы подробно расскажем о режиме общей блокировки.

После инициализации узла Node узел добавляется в очередь.Здесь следует отметить, что в многопоточной среде, если настройка CAS хвостового узла не удалась, необходимо прокрутить операцию CAS, чтобы установить хвостовой узел, который обеспечивает потокобезопасность. , что гарантирует успешную настройку. Это оптимистичный режим блокировки. Конечно, вы можете заблокировать этот метод через ключевое слово synchronized, но это снизит эффективность и является пессимистичным режимом блокировки.

Я опишу процесс настройки узлов через следующие картинки, чтобы вы имели более наглядное представление:

После добавления текущего потока в очередь ожидания необходимо вызватьacquireQueuedПриостановить текущий поток:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor(); //获取当前节点的前置节点
            if (p == head && tryAcquire(arg)) { //如果前置节点是头节点,说明当前节点是第一个挂起的线程节点,再次cas尝试获取锁
                setHead(node); //获取锁成功设置当前节点为头节点,当前节点占有锁
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) && //非头节点或者获取锁失败,检查节点状态,查看是否需要挂起线程
                parkAndCheckInterrupt())  //挂起线程,当前线程阻塞在这里!
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

Можно видеть, что этот метод представляет собой вращающийся процесс. Сначала получите передний узел текущего узла. Если передний узел является головным узлом, попробуйте снова получить блокировку. Если это не удается, он приостановит блокировку и закрутит процесс после снятия блокировки. Можно ли заблокировать черезshouldParkAfterFailedAcquireметод судить, блокируя черезparkAndCheckInterruptметод для выполнения.

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL) //代表继任节点需要挂起
        /*
            * This node has already set status asking a release
            * to signal it, so it can safely park.
            */
        return true;
    if (ws > 0) { //代表前置节点已经退出(超时或中断等情况) 
        /*
            * Predecessor was cancelled. Skip over predecessors and
            * indicate retry.
            */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0); //前置节点退出,循环设置到最近的一个未退出节点
        pred.next = node;
    } else { //非可挂起状态或退出状态则尝试设置为Node.SIGNAL状态
        /*
            * waitStatus must be 0 or PROPAGATE.  Indicate that we
            * need a signal, but don't park yet.  Caller will need to
            * retry to make sure it cannot acquire before parking.
            */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);//挂起当前线程
    return Thread.interrupted();
}

Потоки могут быть приостановлены только тогда, когда узел находится в состоянии SIGNAL.У состояния ожидания узла есть 4 состояния:

/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED =  1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL    = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
    * waitStatus value to indicate the next acquireShared should
    * unconditionally propagate
    */
static final int PROPAGATE = -3;

Комментарий очень ясен, и четыре состояния здесь подробно не объясняются. Мы завершили весь процесс блокировки здесь.Разницу между справедливыми и нечестными блокировками также легко найти из процесса блокировки.Несправедливые блокировки также нужно ставить в очередь, но CAS попытается получить блокировку непосредственно перед постановкой в ​​очередь. После получения блокировки давайте посмотрим на процесс снятия блокировки.

Метод unLock() ReentrantLock

unLock()Метод проще для понимания, так как ему не нужно рассматривать проблему многопоточности, еслиunLock()не раньше, чемlockТема, просто выйдите напрямую. ВзгляниunLock()Исходный код:

public class ReentrantLock implements Lock, java.io.Serializable {
    ......
    public void unlock() {
        sync.release(1);
    }
    ......
}

public abstract class AbstractQueuedSynchronizer {
    ......
    public final boolean release(int arg) {
        if (tryRelease(arg)) { //尝试释放锁
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h); //释放锁成功后启动后继线程
            return true;
        }
        return false;
    }
    ......
}

abstract static class Sync extends AbstractQueuedSynchronizer {
    ......
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread()) //释放锁必须要是获取锁的线程,否则退出,保证了这个方法只能单线程访问
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) { //独占锁为0后代表锁释放,否则为重入锁,不释放
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
    ......
}

abstract static class Sync extends AbstractQueuedSynchronizer {
    ......
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread); //挂起当前线程
    }
    ......
}

такой жеlock()Точно так же будет называться AQSreleaseметод, первый вызовtryReleaseПытаться освободить, в первую очередь, должен быть поток, который в данный момент получает блокировку, а затем судить, является ли это блокировкой с повторным входом, и если это блокировка без повторного входа, освобождать блокировку текущего потока. Вызывается после снятия блокировкиunparkSuccessorметод запускает следующий поток.

Суммировать

ReentrantLockНа этом получение и снятие блокировок завершено. В целом это относительно четкий процесс. Состояние получения и снятия блокировки контролируется состоянием состояния AQS. AQS внутренне использует двусвязный список для поддержания приостановленных потоков. AQS и ReentrantLock разделены по состоянию и поведению. AQS управляет различными состояниями и внутренне управляет очередями потоков через связанные списки. ReentrantLock обеспечивает внешние функции получения и освобождения блокировки. Конкретная реализация находится в AQS. Ниже я суммирую процесс справедливой блокировки и несправедливой блокировки с помощью двух блок-схем.

Несправедливая блокировка:

Честный замок: