введение
После изучения ReentrantLock, Semaphore и CountDownLatch в предыдущих статьях есть еще один важный класс инструментов параллелизма, основанный на AQS в пакете J.U.C: блокировка чтения-записи ReentrantReadWriteLock. Блокировка чтения-записи — частая тестовая тема в процессе собеседования по Java, она требует большого количества знаний, из-за чего многие люди не понимают его до конца. Вот несколько распространенных вопросов на собеседовании:
- Разница между ReentrantReadWriteLock и ReentrantLock?
- Можете ли вы реализовать простое управление кешем с помощью ReentrantReadWriteLock?
- Можете ли вы реализовать простую блокировку чтения-записи самостоятельно?
- Произойдет ли голодание записи с ReentrantReadWriteLock? Если это произойдет, есть ли лучшее решение?
Сможете ли вы все ответить на вышеуказанные вопросы? Если вы можете ответить на него хорошо, то эта статья может вам не сильно помочь. Если нет, не волнуйтесь, давайте проанализируем и ответим на приведенные выше вопросы один за другим.
1. В чем разница между ReentrantReadWriteLock и ReentrantLock?
Каждый должен ответить на этот вопрос: ReentrantLock — монопольная блокировка, а ReentrantReadWriteLock — блокировка чтения-записи. Затем возникает следующий вопрос: являются ли блокировки чтения и записи в ReentrantReadWriteLock эксклюзивными или общими? Каковы отношения между ними? Чтобы досконально разобраться в этих двух вопросах, лучше проанализировать исходный код.
1.1 Реализация блокировки чтения и записи в ReentrantReadWriteLock
Используя ReentrantReadWriteLock для чтения и записи блокировок, будут вызваны два метода, readLock() и writeLock(). Взгляните на их исходный код:
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
Вы можете видеть, что используются два статических внутренних класса, WriteLock и ReadLock.Их реализация блокировки выглядит следующим образом:
public static class ReadLock implements Lock, java.io.Serializable {
public void lock() {
sync.acquireShared(1); //共享
}
public void unlock() {
sync.releaseShared(1); //共享
}
}
public static class WriteLock implements Lock, java.io.Serializable {
public void lock() {
sync.acquire(1); //独占
}
public void unlock() {
sync.release(1); //独占
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {}
См. здесь, чтобы найти сходство и различие между ReentrantReadWriteLock и ReentrantLock, то же самое — использование одного и того же ключа для реализации AbstractQueuedSynchronizer, разница в том, что ReentrantReadWriteLock использует две блокировки для реализации AQS, а WriteLock и ReentrantLock используют эксклюзивную блокировку. ReadLock, как и Semaphore, использует разделяемые блокировки. Содержимое ниже должно быть знакомо тем, кто читал предыдущие статьи.Эксклюзивная блокировка определяет, занимает ли поток блокировку через два состояния переменной состояния 0 и 1, а общая блокировка контролирует количество блокировок через переменную состояния. переменная состояния 0 или ненулевая доступ к потоку. В приведенном выше коде ReadLock и WriteLock используют один и тот же AQS, так как же контролировать взаимосвязь между блокировкой чтения и блокировкой записи в ReentrantReadWriteLock?
1.2 Общие переменные ReadLock и WriteLock
Блокировка чтения-записи определяется следующим образом: к ресурсу могут обращаться несколько потоков чтения или один поток записи, но одновременно не может быть потока чтения-записи.
В этом предложении легко представить, что для управления чтением и записью используются две разные переменные: когда получена блокировка чтения, переменная чтения равна +1, а когда получена блокировка чтения, переменная записи равна +1. Но AQS не добавляет дополнительных переменных для ReadLock и WriteLock и реализуется через состояние. Как разделить чтение и запись? Взгляните на следующий код: 1. Но AQS не добавляет дополнительных переменных для ReadLock и WriteLock и реализуется через состояние. Как разделить чтение и запись? Взгляните на следующий код:
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
Этот код находится в статическом внутреннем классе Sync. Есть два ключевых метода: sharedCount и ExclusiveCount. Из названия видно, что sharedCount — это количество общих блокировок, а ExclusiveCount — количество эксклюзивных блокировок. Общие блокировки получаются путем сдвига изображения c вправо на 16 бит, а эксклюзивные блокировки получаются с помощью И с 16 битами. Например, когда есть 3 потока, которые получают блокировки чтения и 1 поток, который пишет блокировки (конечно, это невозможно одновременно), состояние представляется как 0000 0000 0000 0011 0000 0000 0000 0001, а верхние 16 биты представляют блокировку чтения, сдвигая 16 бит вправо (c >>> SHARED_SHIFT), чтобы получить 3 в десятичном виде, и суммируя 0000 0000 0000 0000 1111 1111 1111 1111 и операцию (c & EXCLUSIVE_MASK), получая 1 в десятичном виде. Поняв несколько методов, я понял, почему совместное чтение и запись достигается через состояние.
В этом тоже есть проблема.Поскольку 16-битный максимум все 1 представлен как 65535, максимальное количество блокировок чтения и блокировок записи можно получить 65535.
1.3 Разница между получением блокировок WriteLock и ReentrantLock
Как упоминалось выше, WriteLock также является монопольной блокировкой, так в чем же разница между ней и ReentrantLock? Самая большая разница заключается в том, что при получении блокировки WriteLock должен учитывать не только заняты ли другие блокировки записи, но и другие блокировки чтения, в то время как ReentrantLock нужно учитывать только то, занята ли она. Давайте посмотрим на исходный код WriteLock, получающий блокировку:
public void lock() {
sync.acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) && //尝试获取独占锁
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //获取失败后排队
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState(); //获取共享变量state
int w = exclusiveCount(c); //获取写锁数量
if (c != 0) { //有读锁或者写锁
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread()) //写锁为0(证明有读锁),或者持有写锁的线程不为当前线程
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires); //当前线程持有写锁,为重入锁,+acquires即可
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires)) //CAS操作失败,多线程情况下被抢占,获取锁失败。CAS成功则获取锁成功
return false;
setExclusiveOwnerThread(current);
return true;
}
Этот код звучит знакомо? Он очень похож на код получения блокировок в ReentrantLock, разница в том, что вызывается метод ExclusiveCount, чтобы получить, есть ли блокировка записи, а затем c != 0 и w == 0, чтобы определить, есть ли блокировка чтения . AcquireQueued и addWaiter подробно объясняться не будут, то, что вам нужно знать, вы можете прочитать в предыдущей статье о ReentrantLock. На данный момент каждый должен знать разницу между ReentrantReadWriteLock и ReentrantLock.
1.4 Разница между получением замков ReadLock и Semaphore
WriteLock — это эксклюзивный режим, мы сравнили разницу между ним и эксклюзивным получением блокировки ReentrantLock. Взгляните на исходный код ниже:
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current) //写锁不等于0的情况下,验证是否是当前写锁尝试获取读锁
return -1;
int r = sharedCount(c); //获取读锁数量
if (!readerShouldBlock() && //读锁不需要阻塞
r < MAX_COUNT && //读锁小于最大读锁数量
compareAndSetState(c, c + SHARED_UNIT)) { //CAS操作尝试设置获取读锁 也就是高位加1
if (r == 0) { //当前线程第一个并且第一次获取读锁,
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) { //当前线程是第一次获取读锁的线程
firstReaderHoldCount++;
} else { // 当前线程不是第一个获取读锁的线程,放入线程本地变量
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}ß
В приведенном выше коде процесс попытки получить блокировку чтения аналогичен процессу получения блокировки записи, разница в том, что блокировку чтения можно попытаться получить блокировку чтения до тех пор, пока она не занята блокировкой записи. блокировка и не превышает максимальное количество сборов, в то время как блокировка записи должна учитывать не только блокировку чтения. Независимо от того, занята ли блокировка, также учитывайте, занята ли блокировка записи. В приведенном выше коде, firstReader, firstReaderHoldCount и cachedHoldCounter обслуживают readHolds (ThreadLocalHoldCounter), которые используются для записи количества захватов каждого потока получения блокировки чтения, чтобы облегчить получение информации о количестве блокировок, удерживаемых текущим потоком. нить. На базе ThreadLocal добавлена переменная Int для подсчета количества раз, что можно понять через их реализацию:
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> { //ThreadLocal变量ß
public HoldCounter initialValue() {
return new HoldCounter();
}
}
static final class HoldCounter {
int count = 0; //当前线程持有锁的次数
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread()); //当前线程ID
}
2. Можете ли вы реализовать простой кеш с помощью ReentrantReadWriteLock?
Давайте сначала проанализируем функции, которым должен соответствовать простой кеш.Здесь для простоты мы не рассматриваем сложные факторы, такие как политики истечения срока действия кеша.
- Кэш в основном выполняет две функции: чтение и запись.
- При чтении, если в кеше есть данные, данные возвращаются сразу.
- При чтении, если данных в кеше нет, нужно получать данные из других каналов и одновременно писать в кеш.
- Во время записи в кеш, чтобы предотвратить одновременное получение другими потоками данных, которых нет в кеше, другие потоки чтения должны быть заблокированы. Реализуем вышеуказанные функции через ReentrantReadWriteLock:
public static void ReentrantReadWriteLockCacheSystem() {
//这里为了实现简单,将缓存大小设置为4。
Map<String, String> cacheMap = new HashMap<>(4);
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
for (int i = 0; i < 20; i++) { //同时开启20个线程访问缓存
final String key = String.valueOf(i % 4);
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
//①读取缓存时获取读锁
readWriteLock.readLock().lock();
//获取读锁后通过key获取缓存中的值
String valueStr = cacheMap.get(key);
//缓存值不存在
if (valueStr == null) {
//③释放读锁后再尝试获取写锁
readWriteLock.readLock().unlock();
try {
//④获取写锁来写入不存在的key值,
readWriteLock.writeLock().lock();
valueStr = cacheMap.get(key);
if (valueStr == null) {
valueStr = key + " --- value";
cacheMap.put(key, valueStr); //写入值
System.out.println(Thread.currentThread().getName() + " --------- put " + valueStr);
}
// ⑥锁降级,避免被其他写线程抢占后再次更新值,保证这一次操作的原子性
readWriteLock.readLock().lock();
System.out.println(Thread.currentThread().getName() + " --------- get new " + valueStr);
} finally {
readWriteLock.writeLock().unlock(); //⑤释放写锁
}
} else {
System.out.println(Thread.currentThread().getName() + " ------ get cache value");
}
} finally {
readWriteLock.readLock().unlock(); //②释放读锁
}
}
}, String.valueOf(i));
thread.start();
}
}
Сначала поток попытается получить данные, и ему необходимо получить блокировку чтения ①.Если есть значение, он будет напрямую читать и снимать блокировку чтения ②. Если значения нет, сначала снимите уже полученную блокировку чтения ③, а затем попытайтесь получить блокировку записи ④. После получения блокировки записи снова проверьте значение, поскольку могут быть другие блокировки записи, которые обновили значение в это время, тогда нужно только прочитать, а затем снять блокировку записи ⑤. Если значения по-прежнему нет, получите значение и обновите его с помощью других средств, а затем получите блокировку чтения ⑥.Этот шаг операции понижения блокировки заключается в том, чтобы напрямую вытеснить блокировку чтения, чтобы избежать вытеснения другими потоками записи, когда Блокировка чтения устанавливается снова после снятия блокировки записи, что на этот раз гарантирует атомарность чтения данных. Затем выполните ⑤, чтобы снять блокировку записи, и ②, чтобы снять блокировку чтения.
Результаты вывода после выполнения следующие, каждое выполнение может выводить по-разному:
//1 --------- put 1 --- value
//1 --------- get new 1 --- value
//0 --------- put 0 --- value
//0 --------- get new 0 --- value
//9 ------ get cache value
//4 ------ get cache value
//2 --------- put 2 --- value
//2 --------- get new 2 --- value
//11 --------- put 3 --- value
//11 --------- get new 3 --- value
//5 ------ get cache value
//13 ------ get cache value
//6 ------ get cache value
//8 ------ get cache value
//7 ------ get cache value
//3 --------- get new 3 --- value
//10 ------ get cache value
//12 ------ get cache value
//14 ------ get cache value
//15 ------ get cache value
//16 ------ get cache value
//17 ------ get cache value
//18 ------ get cache value
//19 ------ get cache value
3. Можете ли вы реализовать простую блокировку чтения-записи самостоятельно?
После предварительного анализа и использования принципа блокировки чтения-записи выше, можете ли вы реализовать простую блокировку чтения-записи самостоятельно? Вот пошаговый процесс реализации простой блокировки чтения-записи. Вы можете выполнить эти шаги, чтобы реализовать ее самостоятельно.
- 1 Определите состояние общей переменной блокировки чтения-записи
- 2 Старшие 16 бит состояния — это количество блокировок чтения, младшие 16 бит — количество блокировок записи. Попробуйте смоделировать реализацию ReentrantReadWriteLock
- 3 При получении блокировки чтения сначала определите, есть ли блокировка записи, подождите, если есть, и увеличьте количество блокировок чтения на 1
- 4 Уменьшить количество снятых блокировок чтения на 1 и уведомить все ожидающие потоки
- 5 При получении блокировки записи необходимо оценить, существуют ли и блокировка чтения, и блокировка записи, подождать, если есть, и добавить 1 к количеству блокировок записи, если нет.
- 6 Уменьшите количество снятых блокировок записи на 1 и уведомите все ожидающие потоки. Код реализации, который я дал, выглядит следующим образом:
public class MyReadWriteLock {
private int state = 0; //1. 定义一个读写锁共享变量state
//2. state高16位为读锁数量
private int GetReadCount() {
return state >>> 16;
}
//2. 低16位为写锁数量
private int GetWriteCount() {
return state & ((1 << 16) - 1);
}
//3. 获取读锁时先判断是否有写锁,有则等待,没有则将读锁数量加1
public synchronized void lockRead() throws InterruptedException{
while (GetWriteCount() > 0) {
wait();
}
System.out.println("lockRead ---" + Thread.currentThread().getName());
state = state + (1 << 16);
}
//4. 释放读锁数量减1,通知所有等待线程
public synchronized void unlockRead() {
state = state - ((1 << 16));
notifyAll();
}
//5. 获取写锁时需要判断读锁和写锁是否都存在,有则等待,没有则将写锁数量加1
public synchronized void lockWriters() throws InterruptedException{
while (GetReadCount() > 0 || GetWriteCount() > 0) {
wait();
}
System.out.println("lockWriters ---" + Thread.currentThread().getName());
state++;
}
//6. 释放写锁数量减1,通知所有等待线程
public synchronized void unlockWriters(){
state--;
notifyAll();
}
}
4. Будут ли блокировки чтения-записи страдать от нехватки записи? Если это произойдет, есть ли лучшее решение?
В процессе чтения и записи операция записи обычно имеет приоритет, и операцию записи нельзя ждать, потому что операций чтения слишком много. Теперь давайте подумаем о простой блокировке чтения-записи, которую мы реализовали выше. Можем ли мы это сделать? Ответ очевиден: в случае, когда ожидают потоки чтения и записи, notifyAll не гарантирует выполнение приоритета записи. Итак, как улучшить это в этом примере?
Здесь я достигаю этой цели, добавляя промежуточную переменную.Эта промежуточная переменная записывает запрос на запись перед получением блокировки записи, так что, как только notifyAll, он сначала проверит, есть ли запрос на запись.Если есть, операция записи будет выполнена во-первых, конкретный код выглядит следующим образом:
public class MyReadWriteLock {
private int state = 0; //1. 定义一个读写锁共享变量state
private int writeRequest = 0; //记录写请求数量
//2. state高16位为读锁数量
private int GetReadCount() {
return state >>> 16;
}
//2. 低16位为写锁数量
private int GetWriteCount() {
return state & ((1 << 16) - 1);
}
//3. 获取读锁时先判断是否有写锁,有则等待,没有则将读锁数量加1
public synchronized void lockRead() throws InterruptedException{
//写锁数量大于0或者写请求数量大于0的情况下都优先执行写
while (GetWriteCount() > 0 || writeRequest > 0) {
wait();
}
System.out.println("lockRead ---" + Thread.currentThread().getName());
state = state + (1 << 16);
}
//4. 释放读锁数量减1,通知所有等待线程
public synchronized void unlockRead() {
state = state - ((1 << 16));
notifyAll();
}
//5. 获取写锁时需要判断读锁和写锁是否都存在,有则等待,没有则将写锁数量加1
public synchronized void lockWriters() throws InterruptedException{
writeRequest++; //写请求+1
while (GetReadCount() > 0 || GetWriteCount() > 0) {
wait();
}
writeRequest--; //获取写锁后写请求-1
System.out.println("lockWriters ---" + Thread.currentThread().getName());
state++;
}
//6. 释放写锁数量减1,通知所有等待线程
public synchronized void unlockWriters(){
state--;
notifyAll();
}
}
Вы можете протестировать приведенный выше код, чтобы увидеть, выполняются ли запросы на запись в первую очередь? Теперь давайте рассмотрим эту проблему в ReentrantReadWriteLock.Очевидно, что ReentrantReadWriteLock также будет иметь голодание запросов на запись, потому что запросы на запись также будут поставлены в очередь, будь то справедливая блокировка или нечестная блокировка, в случае блокировки чтения ни то, ни другое не гарантируется. что блокировка записи может быть получена, так что, пока блокировка чтения всегда занята, будет происходить голодание записи. Так разве JDK не предлагает хороший способ решить эту проблему? Конечно, это новая улучшенная блокировка чтения-записи в JDK8 --- StampedLock, о которой будет подробно рассказано в следующей статье.