Серия параллельных программ на Java: Awesome AQS (часть 1)

Java

Метка: статья в публичном аккаунте "Мы все маленькие лягушки"

Дяди, которые разработали java, предоставили нам большой киллер, чтобы мы могли настроить различные инструменты синхронизации для нашего удобства.AbstractQueuedSynchronizerкласс, это абстрактный класс, мы будем ссылаться на него ниже для краткостиAQS, в переводе на китайский抽象队列同步器. Этот парень очень полезен и инкапсулирует различные основные детали синхронизации.Когда наши программисты хотят настроить свои собственные инструменты синхронизации, нам нужно толькоПросто определите подкласс этого класса и переопределите некоторые из предоставляемых им методов.. Явная блокировка, которую мы использовали ранееReentrantLockпросто с помощьюAQSВолшебная сила , теперь давайте посмотрим на принцип реализации этого класса и на то, как его использовать для настройки инструмента синхронизации.

состояние синхронизации

существуетAQSподдерживаетstateполе, определяетсяvolatileМодифицированный, он называется同步状态:

private volatile int state;

И предоставляет несколько методов доступа к этому полю:

имя метода описывать
protected final int getState() Получатьstateзначение
protected final void setState(int newState) настраиватьstateзначение
protected final boolean compareAndSetState(int expect, int update) использоватьCASспособ обновленияstateзначение

Видно, что эти методыfinalУкрашены, указывая на то, что они не могут быть переопределены в подклассах. Кроме того, ониprotectedРазукрасили, заявив, что эти методы можно использовать только в подклассах.

В некоторых сценариях координации потоков, когда поток выполняет определенные операции, другие потоки не могут выполнять операции, например операции при удержании блокировки.В то же время только один поток может удерживать блокировку.Мы называем это Сценарий называется独占模式; В других сценариях координации потоков нескольким потокам может быть разрешено выполнять определенные операции одновременно, мы называем этот сценарий共享模式.

Мы можем изменитьstateполе представляет同步状态реализовать многопоточность独占模式или共享模式.

например, в独占模式вниз, мы можем положитьstateНачальное значение установлено равным0, всякий раз, когда поток хочет что-то сделать独占Перед операцией необходимо оценитьstateЯвляется ли значение0,если не0Если это означает, что другой поток вступил в операцию, поток должен заблокироваться и ждать; если это0тогда поставьstateустановлено значение1, введите операцию самостоятельно. В этом процессе сначала суждения, а затем установки мы можем пройтиCASОперация гарантированно будет атомарной, и мы называем этот процесс尝试获取同步状态. если нить获取同步状态удалось, тогда в другой теме尝试获取同步状态найдено, когдаstateЗначение уже1заблокировать и дождаться获取同步状态После того, как успешный поток завершил операцию, которую необходимо синхронизировать释放同步状态, то есть поставитьstateЗначение установлено на0и уведомлять последующие ожидающие потоки.

существует共享模式Рассуждения ниже аналогичны, например, мы разрешаем некую операцию10Потоки выполняются одновременно, большее количество потоков необходимо заблокировать и ждать. Тогда мы можем поставитьstateНачальное значение установлено равным10, нить尝试获取同步状态значит сначала судитьstateЯвляется ли значение больше, чем0, если не больше0Если это означает, что в настоящее время 10 потоков выполняют операцию одновременно, поток должен быть заблокирован и ждать; еслиstateзначение больше, чем0, то можно поставитьstateзначение минус1Затем введите операцию, которая требуется каждый раз, когда поток завершает операцию.释放同步状态, то есть положитьstateзначение плюс1и уведомлять последующие ожидающие потоки.

Итак, для нашего пользовательского инструмента синхронизацииВам необходимо настроить способ получения состояния синхронизации и сброса состояния синхронизации.AQSДля этого используются несколько методов:

имя метода описывать
protected boolean tryAcquire(int arg) Эксклюзивно получает состояние синхронизации, возвращает true, если получение прошло успешно, в противном случае — false.
protected boolean tryRelease(int arg) Эксклюзивное состояние синхронизации релиза, возвращает true, если релиз успешен, иначе false
protected int tryAcquireShared(int arg) Общее состояние синхронизации сбора данных, возвращает true, если сбор данных прошел успешно, в противном случае — false.
protected boolean tryReleaseShared(int arg) Общее состояние синхронизации релиза, возвращает true, если релиз успешен, иначе false
protected boolean isHeldExclusively() В монопольном режиме возвращает true, если текущий поток получил состояние синхронизации; в противном случае возвращает false

мы говоримAQSявляется абстрактным классом, мы начинаем сtryAcquireНапример, см. вAQSРеализация в:

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

О 😯 Боже мой, это просто исключение, это ненаучно. да, вAQSне реализует этот метод вРазличные инструменты синхронизации предназначены для разных сценариев параллелизма, поэтому способы получения состояния синхронизации и сброса состояния синхронизации необходимо настраивать.AQSреализован в подклассах, если наш пользовательский инструмент синхронизации должен быть в独占模式работаем, потом переписываемtryAcquire,tryReleaseиisHeldExclusivelyметод, если в共享模式работаем, потом переписываемtryAcquireSharedиtryReleaseSharedметод. Например, в эксклюзивном режиме мы можем определитьAQSПодкласс:

public class Sync extends AbstractQueuedSynchronizer {

    @Override
    protected boolean tryAcquire(int arg) {
        return compareAndSetState(0, 1);
    }

    @Override
    protected boolean tryRelease(int arg) {
        setState(0);
        return true;
    }

    @Override
    protected boolean isHeldExclusively() {
        return getState() == 1;
    }
}

tryAcquireвыражать尝试获取同步状态, мы определяем чрезвычайно простой способ получить его здесь, то есть использоватьCASспособ поставитьstateЗначение устанавливается равным 1, в случае успеха возвращаетсяtrue, возврат в случае неудачиfalse,tryReleaseвыражать尝试释放同步状态, здесь также используется предельно простой алгоритм выпуска, непосредственноstateустановлено значение0Достаточно.isHeldExclusivelyЭто означает, получил ли какой-либо поток состояние синхронизации.Если у вас есть более сложные сценарии, вы можете переопределить эти методы с помощью более сложных алгоритмов получения и выпуска..

Через вышеприведенное нытье мы просто понимаем, что такое同步状态, научились передавать наследованиеAQSЧтобы настроить различные методы получения и сброса состояния синхронизации в монопольном и совместно используемом режимах,Но вы будете удивлены, узнав, что они все еще бесполезны. Мы ожидаем, что поток вернется сразу же после получения состояния синхронизации.true, и продолжить выполнение некоторых операций, которые необходимо синхронизировать, отменить состояние синхронизации после завершения операции и немедленно вернуться, если получение состояния синхронизации не удается.false, и войти в состояние ожидания блокировки, как поток входит в состояние ожидания? Не уходите, следующая сессия будет более захватывающей.

очередь синхронизации

AQSтакже поддерживает так называемый同步队列, очередь节点类Он определен как статический внутренний класс, и его основные поля следующие:

static final class Node {
    volatile int waitStatus;
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
    Node nextWaiter;
    
    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;
}

AQSопределить头节点引用,Один尾节点引用:

private transient volatile Node head;
private transient volatile Node tail;

Через эти два узла можно управлять очередью, что означает, что в очереди могут выполняться такие операции, как вставка и удаление. можно увидетьNodeкласс имеетThreadполе типа, которое указываетКаждый узел представляет поток. Наш желаемый эффектКогда потоку не удается получить состояние синхронизации, поток блокируется и заворачивается какNodeвставьте узел в это同步队列, когда поток, который успешно получает состояние синхронизации, освобождает состояние синхронизации, он уведомляет следующий узел в очереди, который не получил состояние синхронизации в то же время, так что поток этого узла может снова получить состояние синхронизации..

это节点类Значение других полей будет подробно рассмотрено позже, давайте посмотрим独占模式и共享模式При каких обстоятельствах он пойдет на это同步队列В него добавляются узлы, когда из него удаляются узлы, и детали реализации блокировки и восстановления потоков.

Эксклюзивное получение и освобождение состояния синхронизации

существует独占模式В то же время только один поток может получить состояние синхронизации, а другие потоки, которые одновременно приобретут состояние синхронизации, будут упакованы вNodeузел на同步队列, пока поток, получивший состояние синхронизации, не освободит состояние синхронизации, он не может продолжать выполнение. в исходном состоянии同步队列Это пустая очередь, в ней нет узла, выглядит это так:

image_1c3cqmj2g1ve01vd36i71ui91jo645.png-5.5kB

Далее мы подробно рассмотрим, как поток, которому не удалось получить состояние синхронизации, упаковывается какNodeУзлы вставляются в очередь при блокировке ожидания.

Я сказал раньше,Способ получения и сброса состояния синхронизации настраивается нами.,существует独占模式нам нужно определитьAQSподкласс и переопределить следующие методы:

protected boolean tryAcquire(int arg)
protected boolean tryRelease(int arg)
protected boolean isHeldExclusively()

После определения этих методов, кто их вызывает?AQSЕсть несколько методов, которые вызывают их, определенные вpublic finalдекоративный:

имя метода описывать
void acquire(int arg) Эксклюзивно получает состояние синхронизации, возвращается, если получение прошло успешно, и заключает текущий поток вNodeУзлы вставляются в очередь синхронизации.
void acquireInterruptibly(int arg) Он имеет то же значение, что и предыдущий метод, за исключением того, что поток прерывается другими потоками во время выполнения этого метода и выбрасываетInterruptedExceptionаномальный.
boolean tryAcquireNanos(int arg, long nanos) На основе предыдущего метода добавлено ограничение по тайм-ауту, если состояние синхронизации не будет получено в течение заданного времени, оно вернетсяfalse, иначе возвратtrue.
boolean release(int arg) Эксклюзивное состояние синхронизации выпуска.

Может быть немного неловко внезапно добавить столько методов, давайте сначала посмотримacquireИсходный код метода:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

дисплей кодаacquireметод фактически выполняетсяtryAcquireспособ получить статус синхронизации, еслиtryAcquireметод возвращаетtrueконец, если вернутьсяfalseпродолжать выполнять. этоtryAcquireПуть этоТо, как мы определяем себя, чтобы получить состояние синхронизации. Предположим теперь, что поток получил состояние синхронизации, и потокt1звонить в то же времяtryAcquireМетод пытается получить состояние синхронизации, и в результате происходит сбой получения, он будет выполнен первымaddWaiterметод, давайте посмотрим на этот метод:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);  //构造一个新节点
    Node pred = tail;
    if (pred != null) { //尾节点不为空,插入到队列最后
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {       //更新tail,并且把新节点插入到列表最后
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) {    //tail节点为空,初始化队列
            if (compareAndSetHead(new Node()))  //设置head节点
                tail = head;
        } else {    //tail节点不为空,开始真正插入节点
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

Как видите, этоaddWaiterМетод — это метод вставки узла в очередь. сначала построитNodeузел, предполагая, что этот узел节点1,этоthreadполе - текущий потокt2, этот узел просто создается следующим образом:

image_1c3dakn72sg8n8s1ib71jeu19pp3t.png-19.6kB

Затем мы анализируем конкретный процесс вставки. еслиtailЕсли узел не пуст, вставьте новый узел прямо в конец очереди и вернитесь, еслиtailУзел пуст, звонитеenqметод инициализацииheadиtailЗатем узел вставляет новый узел в конец очереди.enqЭти строки кода в методе, инициализирующем очередь, требуют особого внимания:

if (t == null) {    //tail节点为空,初始化队列
    if (compareAndSetHead(new Node()))  //设置head节点
        tail = head;
} else {
    //真正插入节点的过程
}

То есть, когда очередь пуста,headиtailСсылка указывает на тот же узел, а затем выполняет операцию вставки, и этот узел на самом деле прост.new Node(), действительно без всяких добавок~ Сначала вызовем этот узел0号节点吧, ни одно из полей этого узла не было присвоено, поэтому после вставки первого узла队列На самом деле это выглядит так:

image_1c3dapsu21h36kbj1rev135718d05q.png-42.3kB

один из них节点1Это узел, который мы действительно вставили, представляющий поток, которому не удалось получить состояние синхронизации.0号节点создается во время инициализации, мы увидим, что он делает позже.

addWaiterПосле вызова метода он вернет только что вставленный узел, который节点1,acquireзатем будет вызываться методacquireQueuedметод:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();  //获取前一个节点
            if (p == head && tryAcquire(arg)) { 前一个节点是头节点再次尝试获取同步状态
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

Можно видеть, что если предыдущий узел вновь вставленного узла头节点Если это так, он будет вызван сноваtryAcquireПопытайтесь получить состояние синхронизации. В основном это связано с тем, что поток, получивший состояние синхронизации, вскоре освободит состояние синхронизации. Поэтому, прежде чем текущий поток будет заблокирован, попробуйте получить состояние синхронизации по счастливой случайности. Если вам повезет, вы может получить состояние синхронизации. , затем вызовитеsetHeadметод положить头节点Замените на себя:

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

В то же время этоNodeузлаthreadполе установлено наnull, а это значит, что он стал0号节点.

если текущийNodeЕсли узел не является головным узлом или поток, получивший состояние синхронизации, не освобождает состояние синхронизации, то он будет выполняться послушно.shouldParkAfterFailedAcquireметод:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;   //前一个节点的状态
    if (ws == Node.SIGNAL)  //Node.SIGNAL的值是-1
        return true;
    if (ws > 0) {   //当前线程已被取消操作,把处于取消状态的节点都移除掉
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {    //设置前一个节点的状态为-1
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

Этот метод правильныйNodeв узлеwaitStatusразличные операции. Если текущий узел предыдущего узлаwaitStatusдаNode.SIGNAL, что равно -1, это означает, что текущий узел может быть заблокирован, если предыдущий узелwaitStatusбольше, чем0, что означает, что поток, представленный узлом, был отменен, и всеwaitStatusбольше, чем0удаляются, если предыдущий узелwaitStatusесли не-1, не более0, принять, если предыдущий узелwaitStatusустановлен вNode.SIGNAL. мы знаемNodeКласс определяет некоторых представителейwaitStatusстатических переменных, давайте посмотримwaitStatusЧто означает каждое значение:

статическая переменная ценность описывать
Node.CANCELLED 1 Поток, соответствующий узлу, был отменен (позже мы подробно расскажем, как поток отменяется)
Node.SIGNAL -1 Указывает, что поток, соответствующий следующему узлу, находится в состоянии ожидания.
Node.CONDITION -2 Указывает, что узел находится в очереди ожидания (подробнее о том, что такое очередь ожидания, см. ниже).
Node.PROPAGATE -3 Указывает, что следующее получение общего состояния синхронизации будет распространяться безоговорочно (мы подробно поговорим о получении и выпуске общего состояния синхронизации позже).
никто 0 начальное состояние

Теперь мы сосредоточимся наwaitStautsза0или-1Случай. В настоящее время наш текущий узел节点1, что соответствует текущему потоку, предыдущий узел текущего узла0号节点. В начале всеNodeузлаwaitStatusоба0, поэтому при первом вызовеshouldParkAfterFailedAcquireметод, предыдущий узел текущего узла, то есть0号节点изwaitStatusбудет установлен наNode.SIGNALвернуться сейчасfalse, это состояние означает, что0号节点Узлы позади находятся в состоянии ожидания, а текущая очередь стала такой:

image_1c3datk24q84tg05qi14di14qk67.png-59.7kB

так какacquireQueuedМетод представляет собой цикл, который выполняется во второй раз, покаshouldParkAfterFailedAcquireметод, так как0号节点изwaitStatusуже дляNode.SIGNAL, такshouldParkAfterFailedAcquireметод вернетtrue, затем продолжитьparkAndCheckInterruptметод:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

LockSupport.park(this)метод:

public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    UNSAFE.park(false, 0L);     //调用底层方法阻塞线程
    setBlocker(t, null);
}

один из нихUNSAFE.park(false, 0L)Методы, как показано ниже:

public native void park(boolean var1, long var2);

Это означает, что поток блокируется немедленно, это низкоуровневый метод, и нашим программистам не нужно заботиться о том, как операционная система блокирует поток. Фу~ Пока что мы独占模式Как поток, который не может вставить состояние синхронизации после запуска同步队列и заблокированные процессы. Этот процесс требует, чтобы каждый смотрел его несколько раз, в конце концов, это более хлопотно~

Если в это время появится новая темаt2перечислитьacquireЕсли метод требует получить состояние синхронизации, он также будет обернут какNodeвставлять同步队列, эффект подобен следующему изображению:

image_1c3dbas6bkii33r5c0g3f3ss6k.png-60.4kB

Всем обратить внимание节点1изwaitStautsстал-1не забывайwaitStautsзначение-1когда, то естьNode.SIGNALозначает, что его следующий узел находится в состоянии ожидания, потому что0号节点и节点1изwaitStautsзначения-1, что означает, что их два узла-преемника, то есть节点1и节点2находятся в состоянии ожидания.

Выше есть веткаt1иt2Вызывается, когда поток получил состояние синхронизацииacquireпоследствия метода,acquireInterruptiblyиacquireМетод в основном тот же, за исключением того, что он可中断, то есть вызов в потокеacquireInterruptiblyПосле блокировки из-за того, что состояние синхронизации не получено, если другой поток прерывает поток, тоacquireInterruptiblyметод выдастInterruptedExceptionисключение и возврат.tryAcquireNanosОн также поддерживает прерывания, но также имеет тайм-аут, если время превышеноtryAcquireNanosне вернулся, верниfalse.

Если поток находится в разныхacquireЕсли в методе не удается получить состояние синхронизации, оно будет обернуто какNodeузел на同步队列, это можно рассматривать как процесс вставки. Есть вход и выход, если поток завершается独占операции, вам нужно освободить состояние синхронизации, и в то же время поставить同步队列первый (не-0号节点) пробуждается поток, представленный узлом, который в нашем примере выше节点1, пусть он продолжает выполняться, процесс выхода из состояния синхронизации должен вызыватьсяreleaseметод:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

Видно, что этот метод будет использоваться в нашейAQSПереопределено в подклассеtryReleaseметод, в случае успешного выпуска同步状态, то продолжить выполнение, если головной узелheadне дляnullиheadизwaitStatusне для0, затем выполнитеunparkSuccessorметод:

private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;   //节点的等待状态
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next; 
        if (s == null || s.waitStatus > 0) {    //如果node为最后一个节点或者node的后继节点被取消了
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)   
                if (t.waitStatus <= 0)  //找到离头节点最近的waitStatus为负数的节点
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);   //唤醒该节点对应的线程
    }

наш текущий головной узелheadуказывает на0号节点, его состояние-1, так что этоwaitStatusсначала будет установлено значение0, то его последующий узел, который节点1Делегированный поток будет вызываться такLockSupport.unpark(s.thread), этот метод означает проснуться节点1соответствующий потокt2,Пучок节点1изthreadУстановить какnullИ установите его как головной узел, измененная очередь будет выглядеть так:

image_1c3dfd25n1nc1acd1c7btcvd7u71.png-41.9kB

а сейчас等待队列только одинt2Ветка заблокирована. Это процесс выхода из состояния синхронизации.

Пример эксклюзивного инструмента синхронизации

Ознакомившись с принципом получения и освобождения состояния эксклюзивной синхронизации, мы можем попробовать настроить простой инструмент эксклюзивной синхронизации.Это эксклюзивный инструмент синхронизации. Давайте определим простую блокировку ниже:

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class PlainLock {

    private static class Sync extends AbstractQueuedSynchronizer {

        @Override
        protected boolean tryAcquire(int arg) {
            return compareAndSetState(0, 1);
        }

        @Override
        protected boolean tryRelease(int arg) {
            setState(0);
            return true;
        }

        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }
    }

    private Sync sync = new Sync();


    public void lock() {
        sync.acquire(1);
    }

    public void unlock() {
        sync.release(1);
    }
}

мы вPlainLockопределяетAQSПодклассSync, переопределить некоторые методы для настройки独占模式Следующий метод получения и сброса состояния синхронизации:статический внутренний классAQSНаиболее распространенный способ определения подклассов в наших пользовательских инструментах синхронизации.. затем вPlainLockопределено вlockМетод представляет собой блокировку,unlockКод метода разблокирован, а конкретный вызов метода нас вот-вот вырвет. Я не хочу, чтобы вас тут снова стошнило. Давайте посмотрим на применение этой блокировки:

public class Increment {

    private int i;

    private PlainLock lock = new PlainLock();

    public void increase() {
        lock.lock();
        i++;
        lock.unlock();
    }

    public int getI() {
        return i;
    }

    public static void test(int threadNum, int loopTimes) {
        Increment increment = new Increment();

        Thread[] threads = new Thread[threadNum];

        for (int i = 0; i < threads.length; i++) {
            Thread t = new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int i = 0; i < loopTimes; i++) {
                        increment.increase();
                    }
                }
            });
            threads[i] = t;
            t.start();
        }

        for (Thread t : threads) {  //main线程等待其他线程都执行完成
            try {
                t.join();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        System.out.println(threadNum + "个线程,循环" + loopTimes + "次结果:" + increment.getI());
    }

    public static void main(String[] args) {
        test(20, 1);
        test(20, 10);
        test(20, 100);
        test(20, 1000);
        test(20, 10000);
        test(20, 100000);
        test(20, 1000000);
    }
}

Результаты:

20个线程,循环1次结果:20
20个线程,循环10次结果:200
20个线程,循环100次结果:2000
20个线程,循环1000次结果:20000
20个线程,循环10000次结果:200000
20个线程,循环100000次结果:2000000
20个线程,循环1000000次结果:20000000

Очевидно, эта блокировка, для которой мы написали всего несколько строк кода, сработала, и этоAQSСила , нам нужно всего лишь написать несколько вещей, чтобы создать инструмент синхронизации, и нам не нужно учитывать базовое сложное управление состоянием синхронизации, очередь потоков, ожидание и пробуждение и другие механизмы.

Получение и освобождение общего состояния синхронизации

共享式Получить с独占式Самая большая разница в том, чтоМогут ли несколько потоков получить состояние синхронизации одновременно. Потоки, которые не могут получить состояние синхронизации, также должны быть обернуты какNodeПосле того, как узел заблокирован, методы, которые могут получить доступ к очереди синхронизации, следующие:

|void acquireShared(int arg)|Shared для получения состояния синхронизации, в случае сбоя заключить текущий поток вNodeУзлы вставляются в очередь синхронизации. . | |void acquireSharedInterruptibly(int arg)|То же значение, что и у предыдущего метода, за исключением того, что поток прерывается другим потоком во время выполнения этого метода, а затем выбрасываетсяInterruptedExceptionаномальный. | |boolean tryAcquireSharedNanos(int arg, long nanos)|Ограничение времени ожидания добавлено на основе предыдущего метода, если состояние синхронизации не будет получено в течение заданного времени, оно вернетсяfalse, иначе возвратtrue. | |boolean releaseShared(int arg)|Общее состояние синхронизации выпуска. |

Ха, и独占模式Приведенные ниже методы очень похожи, за исключением того, что к каждому методу добавляется по одному.Sharedслово. Их функция одинакова, т.acquireSharedМетод например:

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

Этот метод вызовет наш пользовательскийAQSв подклассеtryAcquireSharedспособ получить состояние синхронизации, ноtryAcquireSharedВозвращаемое значение представляет собойintзначение, когда значение не меньше 0, это означает, что статус синхронизации получен успешно, затемacquireSharedМетод возвращает напрямую и ничего не делает; если возвращаемое значение больше 0, это означает, что получение состояния синхронизации не удалось, и поток будет завернут какNodeВставка узла同步队列, процесс вставки и独占模式Дальнейший процесс почти такой же, не будем нести чушь.

другие дваacquireМетод не так уж и бесполезен, но один прерываемый, а другой поддерживает тайм-аут~

Способ выхода из состояния синхронизации такой же, как独占模式почти:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

Этот метод вызовет наш пользовательскийAQSв подклассеtryReleaseSharedметод освобождения состояния синхронизации, если выпуск будет успешным, он будет удален同步队列Блокирующий узел в . и独占模式Отличие состоит в том, что несколько потоков могут отменить состояние синхронизации одновременно, то есть несколько потоков могут быть удалены одновременно.同步队列Блокирующий узел в , ха-ха, как обеспечить безопасность процесса удаления? Этот вопрос не смотрит на исходный код, каждый пытается написать его сам.

Пример общего инструмента синхронизации

Предполагая, что операция может выполняться только двумя потоками одновременно, а другие потоки должны находиться в состоянии ожидания, мы можем определить эту блокировку следующим образом:

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class DoubleLock {


    private static class Sync extends AbstractQueuedSynchronizer {

        public Sync() {
            super();
            setState(2);    //设置同步状态的值
        }

        @Override
        protected int tryAcquireShared(int arg) {
            while (true) {
                int cur = getState();
                int next = getState() - arg;
                if (compareAndSetState(cur, next)) {
                    return next;
                }
            }
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            while (true) {
                int cur = getState();
                int next = cur + arg;
                if (compareAndSetState(cur, next)) {
                    return true;
                }
            }
        }
    }
    
    private Sync sync = new Sync();
    
    public void lock() {
        sync.acquireShared(1);     
    }
    
    public void unlock() {
        sync.releaseShared(1);
    }
}

stateначальное значение2, всякий раз, когда поток вызываетtryAcquireSharedКогда состояние синхронизации достигнуто,stateзначение уменьшится1,когдаstateзначение0, другие потоки не могут получить состояние синхронизации и упаковываются какNodeзапись узла同步队列ждать.

AQSДругие важные методы синхронизации очередей в

Помимо рядаacquireиreleaseметод,AQSСуществует также ряд методов прямого доступа к этой очереди, все из которыхpublic finalдекоративный:

имя метода описывать
boolean hasQueuedThreads() Существуют ли какие-либо потоки, ожидающие получения состояния синхронизации.
boolean hasContended() Был ли поток когда-либо заблокирован, потому что он не мог получить состояние синхронизации
Thread getFirstQueuedThread() Возвращает первый (самый длинный ожидающий) поток в очереди или ноль, если в данный момент в очереди нет ни одного потока.
boolean isQueued(Thread thread) Возвращает true, если текущая очередь данного потока синхронизирована.
int getQueueLength() Возвращает оценку количества потоков, ожидающих получения статуса синхронизации, поскольку фактический набор потоков в многопоточной среде может значительно измениться при построении этого результата.
Collection<Thread> getQueuedThreads() Возвращает набор потоков, которые могут ожидать выборки, так как фактический набор потоков в многопоточной среде мог значительно измениться при построении результата.

При необходимости их можно использовать в нашем пользовательском инструменте синхронизации.

Не по теме

Написание статей очень утомительно, и иногда вы чувствуете, что чтение идет очень гладко, что на самом деле является результатом бесчисленных правок за ним. Если вы думаете, что это хорошо, пожалуйста, помогите переслать его.Большое спасибо~ Вот мой публичный аккаунт "Мы все маленькие лягушки".

буклет

Кроме того, автор также написал буклет MySQL:Ссылка на статью «Как работает MySQL: понимание MySQL у истоков». Содержание буклета в основном представлено с точки зрения Xiaobai, объясняя некоторые основные концепции MySQL на относительно распространенном языке, такие как записи, индексы, страницы, табличные пространства, оптимизация запросов, транзакции и блокировки и т. д. Общее количество слов составляет от 300 000 до 400 000 слов с сотнями оригинальных иллюстраций. Основная цель состоит в том, чтобы облегчить обычным программистам изучение MySQL для продвинутых пользователей и сделать кривую обучения немного более плавной. Студенты, у которых есть сомнения относительно MySQL для продвинутых пользователей, могут взглянуть на: