Принцип реализации блокировки чтения-записи ReadWriteLock

Java
Принцип реализации блокировки чтения-записи ReadWriteLock

Отсканируйте QR-код ниже или WeChat, чтобы найти официальную учетную запись.菜鸟飞呀飞, вы можете следить за публичной учетной записью WeChat, читать далееSpring源码分析иJava并发编程статья.

微信公众号

проблема

Прежде чем читать эту статью, вы можете подумать над несколькими вопросами

    1. Что такое блокировка чтения-записи?
    1. В чем смысл существования ReadWriteLock?
    1. Для каких сценариев подходят блокировки чтения-записи?
    1. Что такое понижение блокировки и обновление блокировки?

Введение

  • Блокировка, реализованная с помощью synchronized и ReentrantLock,排他锁, так называемая монопольная блокировка заключается в том, что только одному потоку разрешен доступ к общим ресурсам одновременно, но в обычных сценариях мы обычно сталкиваемся с读多写少место действия. Для сценария чтения доступ к разделяемым ресурсам одновременно разрешен только одному потоку, очевидно, что эффективность использования монопольных блокировок в этом случае относительно невысока, так как же ее оптимизировать?
  • В настоящее время读写锁Возникла блокировка чтения-записи — это общая технология, не специфичная для Java. Судя по названию, блокировка чтения-записи имеет две блокировки,读锁и写锁. Характеристики блокировок чтения-записи: нескольким потокам разрешено читать общие ресурсы одновременно; только одному потоку разрешено одновременно писать в общие ресурсы; когда выполняется операция записи, операции чтения другие потоки одновременно будут заблокированы; при выполнении операции чтения операция записи всех потоков одновременно будет заблокирована. Для блокировок чтения, поскольку несколько потоков могут получать доступ к общим ресурсам и выполнять операции чтения одновременно, они называются общими блокировками, в то время как для блокировок записи только одному потоку разрешен доступ к общим ресурсам и выполнение операций записи одновременно. это называется эксклюзивной блокировкой.
  • на Java черезReadWriteLockреализовать блокировку чтения-записи. ReadWriteLock — это интерфейс,ReentrantReadWriteLockЯвляется конкретным классом реализации интерфейса ReadWriteLock. В ReentrantReadWriteLock определены два внутренних класса.ReadLock,WriteLock, чтобы реализовать блокировки чтения и записи соответственно. Нижний уровень ReentrantReadWriteLock реализует получение и освобождение блокировки через AQS, поэтому ReentrantReadWriteLock также определяет компонент синхронизации, который наследует класс AQS.Sync, в то время как ReentrantReadWriteLock также支持公平与非公平性, поэтому он также определяет два внутренних класса внутриFairSync、NonfairSync, которые наследуют Sync.
  • Помимо снятия и получения блокировок чтения и записи, ReentrantReadWriteLock также предоставляет некоторые другие методы, связанные со статусом блокировки. Как показано в следующей таблице (таблица из книги «Искусство параллельного программирования на Java», стр. 141).
имя метода Функции
int getReadLockCount() Количество полученных блокировок чтения.В настоящее время количество блокировок чтения не обязательно равно количеству полученных блокировок, потому что блокировка может быть повторно введена, и могут быть потоки, которые повторно входят в блокировку чтения.
int getReadHoldCount() Получить количество повторных входов текущего потока в блокировку чтения
int getWriteHoldCount() Получить количество раз, когда текущий поток перезаписывает блокировку записи
int isWriteLocked() Определить, является ли состояние блокировки блокировкой записи, вернуть true, указав, что состояние блокировки является блокировкой записи.

Принцип реализации

  • В AQS черезint类型Состояние глобальной переменной представляет собой состояние синхронизации, то есть состояние используется для представления блокировки. ReentrantReadWriteLock также реализует блокировки через AQS, но ReentrantReadWriteLock имеет две блокировки: блокировку чтения и блокировку записи, которые защищают один и тот же ресурс, так как же использовать общую переменную, чтобы отличить, является ли блокировка блокировкой записи или блокировкой чтения? ответ按位拆分.
  • Поскольку состояние является переменной типа int, в памяти占用4个字节,也就是32位. Разделите его на две части: старшие 16 бит и младшие 16 бит, где高16位用来表示读锁状态,低16位用来表示写锁状态. Когда блокировка чтения успешно установлена, к старшим 16 битам добавляется 1, а когда блокировка чтения снимается, старшие 16 бит вычитаются на 1; когда блокировка записи успешно устанавливается, к младшим 16 битам добавляется 1 , а 16-й бит добавляется при снятии блокировки записи Бит минус 1. Как показано ниже.

读写锁

  • Итак, как определить, является ли текущее состояние блокировки блокировкой записи или блокировкой чтения в соответствии со значением состояния?
  • Предполагая, что текущее значение состояния блокировки равно S, S и шестнадцатеричное число0x0000FFFFпровести与运算, а именно S&0x0000FFFF, старшие 16 бит будут установлены в 0 во время операции, а результат операции будет записан как с, тогда с представляет количество блокировок записи. Если с равно 0, это означает, что ни один поток не получил блокировку, если с не равно 0, это означает, что поток получил блокировку, а если с равно несколько раз, это означает, что блокировка записи несколько раз возвращался.
  • будет С无符号右移16位(S>>>16), результат读锁的数量. Когда результат, полученный S>>>16, не равен 0, а c не равен 0, это означает, что текущий поток удерживает как блокировку записи, так и блокировку чтения.
  • Когда блокировка чтения успешно получена, как увеличить блокировку чтения на 1? Результат, полученный S + (1
  • Когда блокировка записи успешно получена, пусть S+1 представляет состояние блокировки записи +1; когда блокировка записи снимается, выполняется операция S-1.
  • Поскольку значения состояния как блокировок чтения, так и блокировок записи занимают всего 16 бит, максимальное количество блокировок чтения составляет2^{16}-1, максимальное количество повторных входов в блокировку записи равно2^{16}-1.

Анализ исходного кода

Поймите, как представить состояние блокировки через состояние, а затем проанализируйте реализацию исходного кода блокировки чтения-записи через исходный код.

  • С помощью следующего кода вы можете создавать блокировки чтения и записи. Если в конструктор ReentrantReadWriteLock не передаются никакие параметры,默认创建的是非公平的读写锁. В блокировках чтения-записи по-прежнему非公平的读写锁性能要由于公平的读写锁.
ReadWriteLock lock = new ReentrantReadWriteLock();
// 创建读锁
Lock readLock = lock.readLock();
// 创建写锁
Lock writeLock = lock.writeLock();	

блокировка записи блокировка

  • Когда вызывается метод lock() блокировки записи, поток пытается получить блокировку записи, то есть writeLock.lock(). Поскольку блокировка записи является эксклюзивной блокировкой, процесс получения блокировки записи почти такой же, как и логика получения блокировки ReentrantLock. При вызове метода lock() сначала вызывается метод Acquire() AQS, а в методе Acquire() сначала вызывается метод tryAcquire() подкласса, поэтому метод tryAcquire() внутреннего класса Здесь вызывается метод синхронизации ReentrantReadWriteLock. Исходный код этого метода выглядит следующим образом.
protected final boolean tryAcquire(int acquires) {
    
    Thread current = Thread.currentThread();
    int c = getState();
    // exclusiveCount()方法的作用是将同步变量与0xFFFF做&运算,计算结果就是写锁的数量。
    // 因此w的值的含义就是写锁的数量
    int w = exclusiveCount(c);
    // 如果c不为0就表示锁被占用了,但是占用的是写锁还是读书呢?这个时候就需要根据w的值来判断了。
    // 如果c等于0就表示此时锁还没有被任何线程占用,那就让线程直接去尝试获取锁
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)
        //
        /**
         * 1. 如果w为0,说明写锁的数量为0,而此时又因为c不等于0,说明锁被占用,但是不是写锁,那么此时锁的状态一定是读锁,
         * 既然是读锁状态,那么写锁此时来获取锁时,就肯定失败,因此当w等于0时,tryAcquire()方法返回false。
         * 2. 如果w不为0,说明此时锁的状态时写锁,接着进行current != getExclusiveOwnerThread()判断,判断持有锁的线程是否是当前线程
         * 如果不是当前线程,那么tryAcquire()返回false;如果是当前线程,那么就进行后面的逻辑。为什么是当前线程持有锁,就还能执行后面的逻辑呢?
         * 因为读写锁是支持重入的。
         */
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        // 下面一行代码是判断,写锁的重入次数或不会超过最大限制,这个最大限制是:2的16次方减1
        // 为什么是2的16次方减1呢?因为state的低16位存放的是写锁,因此写锁数量的最大值是2的16次方减1
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        setState(c + acquires);
        return true;
    }
    /**
     * 1. writerShouldBlock()方法的作用是判断当前线程是否应该阻塞,对于公平的写锁和非公平写锁的具体实现不一样。
     * 对于非公平写锁而言,直接返回false,因为非公平锁获取锁之前不需要去判断是否排队
     * 对于公平锁写锁而言,它会判断同步队列中是否有人在排队,有人排队,就返回true,表示当前线程需要阻塞。无人排队就返回false。
     *
     * 2. 当writerShouldBlock()返回true时,表示当前线程还不能直接获取锁,因此tryAcquire()方法直接返回false。
     * 当writerShouldBlock()返回false时,表示当前线程可以尝试去获取锁,因此会执行if判断中后面的逻辑,即通过CAS方法尝试去修改同步变量的值,
     * 如果修改同步变量成功,则表示当前线程获取到了锁,最终tryAcquire()方法会返回true。如果修改失败,那么tryAcquire()会返回false,表示获取锁失败。
     *
     */
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}
  • В методе tryAcquire() первый проходexclusiveCount()метод расчета количества блокировок записи, как его рассчитать? состоит в том, чтобы объединить государство и0x0000FFFFпровести与运算.
  • Затем оцените, равно ли состояние 0. Если оно равно 0, это означает, что ни блокировка чтения, ни блокировка записи не были получены, и текущий поток вызоветwriterShouldBlock()Этот метод определяет, нужно ли потоку ждать. Если ему нужно подождать, метод tryAcquire() возвращает значение false, указывая на то, что получение блокировки не удалось, после чего он возвращается к методу Acquire() AQS. такая же, как логика эксклюзивных блокировок. Если вам не нужно ждать, попробуйте изменить значение состояния.Если изменение прошло успешно, это означает, что блокировка получена успешно, в противном случае это не удается.
  • Если состояние не равно 0, это означает, что есть блокировка чтения или блокировка записи, так это блокировка чтения или блокировка записи? Об этом нужно судить по значению w.
  • Если w равно 0, это означает, что количество блокировок записи равно 0, а поскольку c не равно 0, это означает, что блокировка занята, но это не блокировка записи, то состояние блокировки в это время должна быть блокировка чтения.Поскольку это состояние блокировки чтения, тогда, когда блокировка записи получает блокировку в это время, она определенно выйдет из строя, потому что, когда блокировка чтения существует, блокировка записи не может быть получена. Поэтому, когда w равно 0, метод tryAcquire() возвращает false.
  • Если w не равно 0, это означает, что в настоящее время блокировка находится в состоянии блокировки записи, а затем переходите кcurrent != getExclusiveOwnerThread()Суждение, чтобы определить, является ли поток, удерживающий блокировку, текущим потоком. Если это не текущий поток, tryAcquire() возвращает false; если это текущий поток, то выполняется следующая логика. Почему текущий поток, удерживающий блокировку, может выполнить следующую логику? Потому что блокировки чтения-записи поддерживают повторный вход.
  • Если это блокировка записи, полученная текущим потоком, то оцените, будет ли превышено максимальное количество повторных входов блокировки записи, когда блокировка записи будет повторно введена, и если это так, будет выдано исключение. (Поскольку младшие 16 бит состояния представляют собой блокировку записи, максимальное количество повторных входов блокировки записи равно2^{16}-1).

написать снятие блокировки

  • Логика снятия блокировок записи почти такая же, как и логика снятия монопольных блокировок. При вызове writeLock.unlock() сначала вызывается метод AQS release(), а в методе release() первым вызывается метод tryRelease() подкласса. Здесь вызывается метод tryRelease() внутреннего класса Sync объекта ReentrantReadWriteLock. Логика снятия блокировки записи относительно проста, вы можете обратиться к комментариям в исходном коде ниже. Исходный код и комментарии к методу следующие.
protected final boolean tryRelease(int releases) {
    // 判断是否是当前线程持有锁
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 将state的值减去releases
    int nextc = getState() - releases;
    // 调用exclusiveCount()方法,计算写锁的数量。如果写锁的数量为0,表示写锁被完全释放,此时将AQS的exclusiveOwnerThread属性置为null
    // 并返回free标识,表示写锁是否被完全释放
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}

блокировка чтения блокировки

  • Блокировка чтения является разделяемой блокировкой, поэтому при вызове метода readLock.lock() сначала будет вызываться метод AcquireShared() AQS, а метод tryAcquireShared() подкласса будет вызываться первым в методе AcquireShared(). метод. Здесь будет называться внутренний класс Sync класса ReentrantReadWriteLock.tryAcquireShared()метод. Исходный код этого метода выглядит следующим образом.
protected final int tryAcquireShared(int unused) {
    
    Thread current = Thread.currentThread();
    int c = getState();
    // exclusiveCount(c)返回的是写锁的数量,如果它不为0,说明写锁被占用,如果此时占用写锁的线程不是当前线程,就返回-1,表示获取锁失败
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    // r表示的是读锁的数量
    int r = sharedCount(c);
    /**
     * 在下面的代码中进行了三个判断:
     * 1、读锁是否应该排队。如果没有人排队,就进行if后面的判断。有人排队,就不会进行if后面的判断,而是最终调用fullTryAcquireShared()方法
     * 2、读锁数量是否超过最大值。(最大数量为2的16次方-1)
     * 3、尝试修改同步变量的值
     */
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        // 读锁数量为0时,就将当前线程设置为firstReader,firstReaderHoldCount=1
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            // 读锁数量不为0且firstReader(第一次获取读的线程)为当前线程,就将firstReaderHoldCount累加
            firstReaderHoldCount++;
        } else {
            // 读锁数量不为0,且第一个获取到读锁的线程不是当前线程
            // 下面这一段逻辑就是保存当前线程获取读锁的次数,如何保存的呢?
            // 通过ThreadLocal来实现的,readHolds就是一个ThreadLocal的实例
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        // 返回1表示获取读锁成功
        return 1;
    }
    // 当if中的三个判断均不满足时,就会执行到这儿,调用fullTryAcquireShared()方法尝试获取锁
    return fullTryAcquireShared(current);
}
  • В методе tryAcquireShared() он сначала пройдетexclusiveCount()метод для вычисления количества блокировок записи. Если блокировка записи существует, то определить, является ли поток, удерживающий блокировку записи, текущим потоком. Если это не текущий поток, это означает, что блокировка записи занята другими потоками, и текущий поток не может получить блокировку чтения. Метод tryAcquireShared() возвращает -1, указывая на то, что получение блокировки чтения не удалось. Если блокировка записи не существует или поток, удерживающий блокировку записи, является текущим потоком, это означает, что текущий поток имеет возможность получить блокировку чтения.
  • Далее будет оцениваться, не нужно ли текущему потоку ставить в очередь для получения блокировки чтения, превысит ли количество блокировок чтения максимальное значение и будет ли состояние блокировки чтения успешно изменено через CAS (добавить 1ThreadLocal. Поскольку блокировка чтения-записи обеспечиваетgetReadLockCount()、getReadHoldCount()и другие методы, данные этих методов взяты отсюда.
  • Если одно из трех вышеперечисленных условий не выполняется, блок операторов if не будет введен, и будет вызван метод fullTryAcquireShared(). Функция этого метода состоит в том, чтобы позволить потоку продолжать получать блокировку.Исходный код выглядит следующим образом.
final int fullTryAcquireShared(Thread current) {
    /*
     * This code is in part redundant with that in
     * tryAcquireShared but is simpler overall by not
     * complicating tryAcquireShared with interactions between
     * retries and lazily reading hold counts.
     */
    HoldCounter rh = null;
    // for死循环,直到满足相应的条件才会return退出,否则一直循环
    for (;;) {
        int c = getState();
        // 锁的状态为写锁时,持有锁的线程不等于当期那线程,就说明当前线程获取锁失败,返回-1
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
        } else if (readerShouldBlock()) {
            // Make sure we're not acquiring read lock reentrantly
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 尝试设置同步变量的值,只要设置成功了,就表示当前线程获取到了锁,然后就设置锁的获取次数等相关信息
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}
  • Когда блокировка чтения получена успешно, метод tryAcquireShared() вернет 1, поэтому, когда он вернется к методу AcquireShared() AQS, он завершится напрямую. Если получить блокировку не удается, метод tryAcquireShared() вернет -1, а затем в AQS будет выполнен метод doAcquireShared(). Роль метода doAcquireShared() заключается в добавлении себя в очередь синхронизации и ожидании получения блокировки до тех пор, пока блокировка не будет получена успешно. Этот метод не реагирует на прерывания.

прочитать освобождение блокировки

  • При вызове метода readLock.unlock() сначала будет вызываться метод releaseShared() AQS, а в методе releaseShared() первым будет вызываться метод tryReleaseShared() подкласса. Здесь будет называться внутренний класс Sync класса ReentrantReadWriteLock.tryReleaseShared()метод. Исходный код этого метода выглядит следующим образом.
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    for (;;) {
        int c = getState();
        // 将修改同步变量的值(读锁状态减去1<<16)
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;
    }
}
  • В методе tryReleaseShared() сначала изменяются данные, относящиеся к счетчику блокировки чтения, а затем в бесконечном цикле for значение состояния вычитается на 1
  • Когда метод tryReleaseShared() возвращается, следующие шаги точно такие же, как и логика снятия разделяемой блокировки.

Меры предосторожности

  • Использование блокировок чтения-записи очень простое, но в процессе использования блокировок чтения-записи необходимо обратить внимание на следующие два момента.
  • 1.Блокировка чтения-записи不支持锁升级,支持锁降级. Расширение блокировки означает, что поток получает блокировку чтения и блокировку записи, не освобождая блокировку чтения. Деградация блокировки означает, что поток получает блокировку записи и блокировку чтения, не освобождая блокировку записи. Почему бы не поддерживать укрупнение блокировки? Вы можете обратиться к следующему примеру кода.
public void lockUpgrade(){
    ReadWriteLock lock = new ReentrantReadWriteLock();
    // 创建读锁
    Lock readLock = lock.readLock();
    // 创建写锁
    Lock writeLock = lock.writeLock();
    readLock.lock();
    try{
        // ...处理业务逻辑
        writeLock.lock();   // 代码①
    }finally {
        readLock.unlock();
    }
}
  • В приведенном выше примере кода, если поток T1 сначала получает блокировку чтения, а затем выполняет следующий код, когда выполняется предыдущая строка кода ①, поток T2 также получает блокировку чтения, поскольку блокировка чтения является общей блокировкой. , и в это время блокировка записи не была получена, поэтому поток T2 может получить блокировку чтения в это время.Когда T1 выполняет код ①, он пытается получить блокировку записи.Поскольку поток T2 занимает блокировку чтения, Поток T1 не может получить блокировку записи. Да, вы можете только ждать. Когда T2 также выполняет код ①, поскольку T1 удерживает блокировку чтения, T2 не может получить блокировку записи, поэтому два потока продолжают ждать, то есть блокировка записи не может получить, и блокировка чтения не может быть снята. Таким образом, блокировка не поддерживает расширение блокировки.
  • Блокировки чтения-записи поддерживают понижение уровня блокировки, что обеспечивает видимость. Сделайте изменения данных, сделанные потоками T1, видимыми для других потоков.
  • 2.Блокировки чтения не поддерживают условные очереди ожидания. При вызове метода newCondition() класса ReadLock будет напрямую выброшено исключение.
public Condition newCondition() {
    throw new UnsupportedOperationException();
}
  • Поскольку блокировка чтения является общей блокировкой, максимальное количество2^{16}-1), он может удерживаться несколькими потоками одновременно.Для блокировки чтения другим потокам не нужно ждать, чтобы получить блокировку чтения, а ожидание пробуждения условия бессмысленно.

Суммировать

  • В этой статье кратко представлена ​​функция блокировки чтения-записи, которая состоит из двух блокировок: блокировки чтения и блокировки записи. Затем вводятся некоторые особенности блокировок чтения-записи. Затем он анализирует, как использовать переменную state для представления состояния блокировки чтения-записи.Старшие 16 бит состояния представляют блокировку чтения, а младшие 16 бит представляют блокировку записи.0x0000FFFFпровести与运算, вы получите количество блокировок записи.
  • Наконец, процесс снятия и получения блокировки записи и процесс снятия и получения блокировки чтения анализируются с помощью исходного кода. Блокировка записи является эксклюзивной блокировкой, поэтому ее снятие и получение вызывают методы монопольной блокировки снятия и получения блокировки в AQS, а блокировка чтения является общей блокировкой, поэтому ее снятие и получение вызывают совместное освобождение в AQS Блокировки и методы получения замки.

связанное предложение

微信公众号