Углубленный анализ принципа реализации реентерабельной блокировки Java ReentrantLock

Java
Углубленный анализ принципа реализации реентерабельной блокировки Java ReentrantLock

🌹Рекомендация проекта с открытым исходным кодом🌹

Pepper Metricsэто инструмент с открытым исходным кодом, разработанный мной и моими коллегами (Github.com/Live-ActionCool/pep...), который собирает текущую статистику производительности jedis/mybatis/httpservlet/dubbo/motan и предоставляет ее для основных данных, совместимых с базой данных временных рядов, таких как prometheus, и отображает тенденции через grafana. Его подключаемая архитектура также очень удобна для пользователей, поскольку позволяет расширять и интегрировать другие компоненты с открытым исходным кодом.
Пожалуйста, дайте звезду и приветствуйте всех, кто станет разработчиком, чтобы отправить Prote для улучшения проекта.


К точке...

ReentrantLock, реентерабельная блокировка, — это высокопроизводительный инструмент, добавленный в пакет concurrent в JDK5. Как следует из названия, ReentrantLock поддерживает многократное получение блокировки одним и тем же потоком без снятия блокировки.

Каждый внешний вид должен иметь значение. Поскольку уже есть уже синхронизированный старый уровень, и синхронизирован также поддерживает ReentRancy, почему Дуг Леа написал ReentrantLock?

0 Сравнение ReentrantLock и синхронизировано

0.1 Сравнение производительности

Во-первых, производительность ReentrantLock лучше, чем у синхронизированного. Ниже приведено сравнение двух кодов. Первый синхронизирован:

public class LockDemo2 {
    private static final Object lock = new Object(); // 定义锁对象
    private static int count = 0; // 累加数
    public static void main(String[] args) throws InterruptedException {
        long start = System.currentTimeMillis();
        CountDownLatch cdl = new CountDownLatch(100);
        // 启动100个线程对count累加,每个线程累加1000000次
        // 调用add函数累加,通过synchronized保证多线程之间的同步
        for (int i=0;i<100;i++) {
            new Thread(() -> {
                for (int i1 = 0; i1 <1000000; i1++) {
                    add();
                }
                cdl.countDown();
            }).start();
        }
        cdl.await();
        System.out.println("Time cost: " + (System.currentTimeMillis() - start) + ", count = " + count);
    }

    private static void add() {
        synchronized (lock) {
            count++;
        }
    }
}

Тогда это Reentrantlock:

public class LockDemo3 {
    private static Lock lock = new ReentrantLock(); // 重入锁
    private static int count = 0;
    public static void main(String[] args) throws InterruptedException {
        long start = System.currentTimeMillis();
        CountDownLatch cdl = new CountDownLatch(100);
        for (int i=0;i<100;i++) {
            new Thread(() -> {
                for (int i1 = 0; i1 <1000000; i1++) {
                    add();
                }
                cdl.countDown();
            }).start();
        }
        cdl.await();
        System.out.println("Time cost: " + (System.currentTimeMillis() - start) + ", count = " + count);
    }
    // 通过ReentrantLock保证线程之间的同步
    private static void add() {
        lock.lock();
        count++;
        lock.unlock();
    }
}

Ниже приводится сравнение результатов многократного запуска:

synchronized ReentrantLock
первый раз 4620 ms 3360 ms
второй раз 4086 ms 3138 ms
в третий раз 4650 ms 3408 ms

В целом, средняя производительность ReentrantLock примерно на 20% выше, чем у синхронизированной.

PS: спасибо @Wild Seven Uncle за исправление. Более строгое описание этого сравнения производительности: при большом количестве потоков, конкурирующих за блокировки, ReentrantLock в большинстве случаев работает лучше, чем синхронизированный.

Поскольку синхронизация оптимизирована в JDK6, когда конкуренция блокировки не является свирепом, в большинстве случаев замок останется на блокировке смещения и легкими этапами блокировки, а производительность этих двух этапов очень хорошая. Когда есть много соревнований, он может расширяться в тяжелые замки, и производительность будет деградирована. В это время ReentrantLock должен быть лучше синхронизирован.

0.2 Сравнение справедливости с приобретением блокировки

Что такое понятие справедливости? Если блокировка получена справедливо, это означает, что когда несколько потоков получают блокировку, они должны встать в очередь и получить блокировку по очереди; если блокировка получена несправедливо, это означает, что когда несколько потоков получают блокировку, они устремятся вперед. и кто схватится за замок.

Поскольку синхронизация реализована на основе механизма монитора, она поддерживает только несправедливые блокировки, а ReentrantLock поддерживает как честные, так и несправедливые блокировки.

0.3 Обзор

В дополнение к вышеперечисленному, у ReentrantLock есть еще несколько Synchronized, которые здесь суммированы.

synchronized ReentrantLock
представление Относительно плохо Лучше синхронизированного примерно на 20%
справедливость Только блокировка недоступной блокировки Поддерживает как справедливые замки и недобросовестные замки
Попытка получить поддержку блокировки Не поддерживается, как только блок синхронизации достигнут, а блокировка не получена, заблокируйте здесь Поддержка, с помощью метода tryLock вы можете судить об успешном получении блокировки по возвращаемому значению, поэтому даже если получение блокировки не удастся, здесь она не заблокируется.
Тайм-аут получения блокировки Не поддерживается, если блокировка не может быть получена все время, она будет ждать вечно Поддержка достижения с помощью метода tryLock (время, TimeUnit), если не получить тайм-аут блокировки, отказаться от получения блокировки блокировка не пойдет
Следует ли реагировать на прерывания Не поддерживается, не может реагировать на сигнал прерывания потока Поддерживается методом lockInterruptably.После получения блокировки с помощью этого метода поток может ответить на сигнал прерывания и выдать InterruptedException.
Поддержка состояния ожидания Поддержка достигается путем ожидания, уведомления, уведомленияВсе Поддержка, реализованная через интерфейс Conditon, поддержка нескольких условий, более гибкая, чем синхронизированная

1 Принцип реализации реентерабельной функции

Реализация ReentrantLock основана на Синхронизаторе Очередей (AbstractQueuedSynchronizer, далее AQS) Принцип реализации AQS можно посмотреть в другой статье автора:Что такое синхронизатор очереди Java (AQS)

Функциональность повторного входа ReentrantLock основана на синхронном состоянии AQS: состояние.

Принцип примерно таков: когда поток получает блокировку, значение состояния равно +1, и поток, удерживающий блокировку в данный момент, записывается, а когда другой поток получает блокировку, оценивается, являются ли поток и поток, удерживающий блокировку, то же самое Поток, если он есть, увеличивает значение состояния на 1, если нет, то блокирует поток. Когда поток освобождает блокировку, значение состояния равно -1.Когда значение состояния уменьшается до 0, это означает, что текущий поток полностью освободил блокировку, а затем поле, которое записывает поток, в настоящее время удерживающий блокировку, устанавливается на null, и другие потоки пробуждаются, чтобы повторно конкурировать за блокировку.

// acquires的值是1
final boolean nonfairTryAcquire(int acquires) {
	// 获取当前线程
	final Thread current = Thread.currentThread();
	// 获取state的值
	int c = getState();
	// 如果state的值等于0,表示当前没有线程持有锁
	// 尝试将state的值改为1,如果修改成功,则成功获取锁,并设置当前线程为持有锁的线程,返回true
	if (c == 0) {
		if (compareAndSetState(0, acquires)) {
			setExclusiveOwnerThread(current);
			return true;
		}
	}
	// state的值不等于0,表示已经有其他线程持有锁
	// 判断当前线程是否等于持有锁的线程,如果等于,将state的值+1,并设置到state上,获取锁成功,返回true
	// 如果不是当前线程,获取锁失败,返回false
	else if (current == getExclusiveOwnerThread()) {
		int nextc = c + acquires;
		if (nextc < 0) // overflow
			throw new Error("Maximum lock count exceeded");
		setState(nextc);
		return true;
	}
	return false;
}

2 Принцип реализации несправедливой блокировки

ReentrantLock имеет два конструктора:

// 无参构造,默认使用非公平锁(NonfairSync)
public ReentrantLock() {
	sync = new NonfairSync();
}

// 通过fair参数指定使用公平锁(FairSync)还是非公平锁(NonfairSync)
public ReentrantLock(boolean fair) {
	sync = fair ? new FairSync() : new NonfairSync();
}

sync — это переменная-член ReentrantLock, экземпляр его внутреннего класса Sync. И NonfairSync, и FairSync являются подклассами класса Sync. Вы можете обратиться к следующей диаграмме классов:

ReentrantLock类关系图

Синхрон наследует AQS, поэтому у него есть функция AQS. Точно так же NonfairSync и FairSync являются подклассами AQS.

Когда мы получаем экземпляр ReentrantLock через конструктор без аргументов, по умолчанию используется несправедливая блокировка.

Далее будет описан принцип реализации несправедливых блокировок в следующих сценариях: Предположим, что один поток (t1) получает блокировку, а множество других потоков (others_t), которые не получили блокировку, присоединяются к очереди синхронизации AQS и ждут. завершает выполнение, он освобождается.После блокировки другие потоки повторно конкурируют за блокировку.

Сначала опишу способ получения блокировки:

final void lock() {
	// 线程t1成功的将state的值从0改为1,表示获取锁成功
	if (compareAndSetState(0, 1))
		setExclusiveOwnerThread(Thread.currentThread());
	else
	    // others_t线程们没有获取到锁
		acquire(1);
}

В случае сбоя блокировки будет вызван метод AQUIRE AQS.

public final void acquire(int arg) {
    // tryAcquire是个模板方法,在NonfairSync中实现,如果在tryAcquire方法中依然获取锁失败,会将当前线程加入同步队列中等待(addWaiter)
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

Реализация tryAcquire выглядит следующим образом, которая фактически вызывает вышеуказанный метод nonfairTryAcquire.

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

Хорошо, в это время t1 получил блокировку, и все потоки other_t запускаются в очередь синхронизации и ждут.

В определенный момент выполнение собственной задачи t1 завершается, и вызывается метод снятия блокировки (разблокировки).

public void unlock() {
    // 调用AQS的release方法释放资源
    sync.release(1);
}
public final boolean release(int arg) {
    // tryRelease也是模板方法,在Sync中实现
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 成功释放锁后,唤醒同步队列中的下一个节点,使之可以重新竞争锁
            // 注意此时不会唤醒队列第一个节点之后的节点,这些节点此时还是无法竞争锁
            unparkSuccessor(h);
        return true;
    }
    return false;
}
protected final boolean tryRelease(int releases) {
    // 将state的值-1,如果-1之后等于0,释放锁成功
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

В этот момент блокировка снимается, и пробужденный поток повторно конкурирует за блокировку с новым потоком (исключая те потоки, которые находятся за очередью синхронизации).

Вернемся к методу блокировки, так как в это время все потоки могут получить блокировку через CAS, не гарантируется, что пробужденный поток сможет конкурировать с новым потоком, поэтому это несправедливо. Это реализация несправедливой блокировки.

Этот процесс можно описать на фиг примерно так:

非公平锁的竞争

3 Принцип реализации справедливой блокировки

Логика снятия блокировок для справедливых и нечестных блокировок одинакова. Оба вызывают описанный выше метод разблокировки. Самая большая разница заключается в том, когда блокировка получена.

static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;
    // 获取锁,与非公平锁的不同的地方在于,这里直接调用的AQS的acquire方法,没有先尝试获取锁
    // acquire又调用了下面的tryAcquire方法,核心在于这个方法
    final void lock() {
        acquire(1);
    }

    /**
     * 这个方法和nonfairTryAcquire方法只有一点不同,在标注为#1的地方
     * 多了一个判断hasQueuedPredecessors,这个方法是判断当前AQS的同步队列中是否还有等待的线程
     * 如果有,返回true,否则返回false。
     * 由此可知,当队列中没有等待的线程时,当前线程才能尝试通过CAS的方式获取锁。
     * 否则就让这个线程去队列后面排队。
     */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            // #1
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

Из комментариев видно, что при механизме честных блокировок любой поток, желающий получить блокировку, должен встать в очередь, и сократить очередь невозможно. Вот как работают честные замки.

Этот процесс можно примерно описать следующим образом:

公平锁的竞争

4 принцип блокировки

Trylock делает очень просто: пусть нынешняя тема попытается получить блокировку, вернуться к true, в противном случае false.

Реализация фактически называет NonfairtraCquire Method для получения блокировки.

public boolean tryLock() {
    return sync.nonfairTryAcquire(1);
}

Что касается сбоя получения, он не будет добавлять себя в очередь синхронизации для ожидания, а напрямую вернет false, чтобы код бизнес-вызова мог справиться с этим самостоятельно.

5 Прерываемая блокировка получения

Прерывание, то есть прерывание потока с помощью метода прерывания Thread, прерывание потока в заблокированном состоянии вызовет исключение InterruptedException.

Если получение блокировки прерываемо, когда поток не может получить блокировку в течение длительного времени, мы можем активно прерывать его, чтобы избежать взаимоблокировки.

Он реализован следующим образом:

public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}

AquireiNebraphe Method, который называет AQS

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    // 判断当前线程是否已经中断,如果已中断,抛出InterruptedException异常
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

В это время сначала попытаются приобрести замок через Tryacquire. Если приобретение не удается, он добавит себя в очередь, чтобы дождаться, и может реагировать на прерывание в любое время.

private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    // 将自己添加到队列中等待
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        // 自旋的获取锁
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            // 获取锁失败,在parkAndCheckInterrupt方法中,通过LockSupport.park()阻塞当前线程,
            // 并调用Thread.interrupted()判断当前线程是否已经被中断
            // 如果被中断,直接抛出InterruptedException异常,退出锁的竞争队列
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // #1
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

PS: В непрерывном режиме исключение InterruptedException не будет выброшено в позицию кода #1, а просто запишет, что текущий поток прерван.

6 Суммированного замка

Это реализовано следующими методами: тайм-аут — это время ожидания, а единица представляет единицу времени (миллисекунды, секунды...)

public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

Можно обнаружить, что это также метод, который может реагировать на прерывания. Затем вызовите метод tryAcquireNanos AQS:

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

Метод DOACKQUIRENANOS аналогичен методу в прерывании. Различия объясняются в комментариях ниже:

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    // 计算超时截止时间
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            // 计算到截止时间的剩余时间
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L) // 超时了,获取失败
                return false;
            // 超时时间大于1000纳秒时,才阻塞
            // 因为如果小于1000纳秒,基本可以认为超时了(系统调用的时间可能都比这个长)
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            // 响应中断
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

7 резюме

Эта статья сначала сравнивает разницу между ветеранским блокировкой синхронизации и ReentrantLock. ReentrantLock имеет следующие преимущества:

  • Поддерживает как честные, так и нечестные блокировки.
  • Поддержка: попробуйте неблокирующее получение одноразовой блокировки
  • Время ожидания поддержки для получения блокировки
  • Поддержка прерываемых блокировок получения
  • Поддержка дополнительных условий ожидания (условие)

Затем вводит реализацию принципа нескольких ключевых функций, которые основаны на AQS.

Суть ReentrantLock заключается в синхронизации состояния блокировки путем изменения значения состояния в AQS. Таким образом достигается реентерабельность.

ReentrantLock имеет справедливые блокировки и нечестные блокировки, а нечестные блокировки используются по умолчанию. Принцип его реализации во многом зависит от очереди синхронизации в AQS.

Наконец, внутренний механизм может быть прерван потоком. Возможна (), чтобы определить, прервана ли текущий поток, бросить прерываниеException, если прерывают исключение для достижения.


Добро пожаловать в мой публичный аккаунт WeChat

公众号