Принцип реализации ReentrantLock

Node.js интервью Java задняя часть

использоватьsynchronizeПри обработке синхронизации получение и освобождение блокировок неявно, и принцип реализации заключается в добавлении различных машинных инструкций после компиляции.

иReentrantLockобычный класс, основанный наAQS(AbstractQueuedSynchronizer)быть реализованным.

ЯвляетсяЗадний замок: После того, как поток получит блокировку, он все еще можетнеоднократноЗамок не будет блокировать сам себя.

AQSдаJavaВажная базовая структура для реализации блокировок и синхронизации параллельных пакетов.

тип замка

ReentrantLock делится начестный замокинесправедливый замок, вы можете указать конкретный тип через конструктор:

    //默认非公平锁
    public ReentrantLock() {
        sync = new NonfairSync();
    }
    
    //公平锁
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

Общее использование по умолчаниюнесправедливый замок, его эффективность и пропускная способность намного выше, чем у честных блокировок (конкретные причины будут проанализированы позже).

получить замок

Обычное использование выглядит следующим образом:

    private ReentrantLock lock = new ReentrantLock();
    public void run() {
        lock.lock();
        try {
            //do bussiness
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

честная блокировка получить блокировку

Первый взгляд на процесс получения замка:

    public void lock() {
        sync.lock();
    }

Видно, что использованиеsyncметод, и этот метод является абстрактным методом, который определяется его подклассом (FairSync) для достижения следующей реализации справедливой блокировки:

        final void lock() {
            acquire(1);
        }
        
        //AbstractQueuedSynchronizer 中的 acquire()
        public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    	}

Первый шаг — попытаться получить блокировку (tryAcquire(arg)), который также реализуется его подклассами:

        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

первый судьяAQSсерединаstateЕсли он равен 0, 0 означает, что ни один другой поток в настоящее время не получает блокировку, и текущий поток может попытаться получить блокировку.

Уведомление: Воспользуюсь перед попыткойhasQueuedPredecessors()метод, чтобы определить, есть ли другие потоки в очереди AQS, и если да, то он не будет пытаться получить блокировку (Это случай, характерный для честных замков).

Если в очереди нет потока, используйте CAS, чтобы изменить состояние в AQS на 1, то есть получить блокировку, и, если получение прошло успешно, установить текущий поток в качестве эксклюзивного потока, который получает блокировку (setExclusiveOwnerThread(current)).

еслиstateКогда он больше 0, это означает, что блокировка была получена, и необходимо определить, является ли поток, получивший блокировку, текущим потоком (ReentrantLockподдерживать повторный вход), если да, вам нужноstate + 1и обновите значение.

очередь записи

еслиtryAcquire(arg)Если получение блокировки не удается, вам нужно использоватьaddWaiter(Node.EXCLUSIVE)Записать текущий поток в очередь.

Текущий поток должен быть обернут какNodeобъект (addWaiter(Node.EXCLUSIVE)).

Очередь в AQS реализуется двусвязным списком, состоящим из узлов Node.

Код упаковки:

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

Сначала определите, пуста ли очередь, если нет, то она будет упакованаNodeиспользоватьCASНапишите в конец очереди, если есть одновременная ошибка записи, вам нужно позвонитьenq(node);написать.

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

Эта логика обработки эквивалентна自旋плюсCASГарантированная возможность записи в очередь.

приостановить ожидающий поток

После записи в очередь текущий поток необходимо приостановить (используяacquireQueued(addWaiter(Node.EXCLUSIVE), arg)):

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

сначала согласноnode.predecessor()Получите, является ли предыдущий узел головным узлом, если да, попробуйте получить блокировку один раз, и если получение пройдет успешно, все будет хорошо.

Если это не головной узел или получить блокировку не удается, узел в соответствии сwaitStatusсостояние для обработки (shouldParkAfterFailedAcquire(p, node)).

waitStatusИспользуется для записи состояния текущего узла, такого как отмена узла, ожидание узла и т. д.

shouldParkAfterFailedAcquire(p, node)Возвращает, нужно ли приостановить текущий поток, и вызывает при необходимостиparkAndCheckInterrupt():

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

он используетLockSupportизpartметод для приостановки текущего потока до его пробуждения.

Недобросовестная блокировка получения блокировки

Разница между честными блокировками и нечестными блокировками в основном заключается в приобретении блокировок:

Честный замок эквивалентен покупке билетов, позже люди должны стоять в очереди до конца очереди, чтобы купить билеты по очереди.Не могу прыгнуть в очередь.

У недобросовестных замков нет этих правил, даприоритетный режим, и каждый человек не будет заботиться об очереди и попытается получить блокировку напрямую.

Несправедливая блокировка:

        final void lock() {
            //直接尝试获取锁
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

Честный замок:

        final void lock() {
            acquire(1);
        }

Еще одно важное отличие заключается в том, что при попытке получить блокировкуtryAcquire(arg), несправедливая блокировка не должна оценивать, есть ли в очереди другие потоки, но также напрямую пытается получить блокировку:

        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                //没有 !hasQueuedPredecessors() 判断
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

разблокировать замок

Процесс освобождения честных и нечестных блокировок одинаков:

    public void unlock() {
        sync.release(1);
    }
    
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
            	   //唤醒被挂起的线程
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    //尝试释放锁
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }        

Во-первых, он определяет, является ли текущий поток тем потоком, который получил блокировку.stateУменьшите значение до 0, чтобы полностью снять блокировку.

Нужно позвонить после освобожденияunparkSuccessor(h)чтобы разбудить приостановленную нить.

Суммировать

Так как справедливые блокировки должны заботиться о ситуации в очереди, блокировки должны быть получены в порядке очереди (что приведет к большому количеству переключений контекста потока), в то время как нечестные блокировки не имеют этого ограничения.

Следовательно, можно объяснить, что эффективность недобросовестных блокировок будет выше, чем у честных блокировок.

Дополнительный

Недавно я обобщил некоторые знания, связанные с Java, и заинтересованные друзья могут поддерживать их вместе.

адрес:GitHub.com/crossover J я…