Оригинальная статья, краткое изложение опыта и жизненные перипетии на всем пути от набора в школу до фабрики А
Нажмите, чтобы узнать подробностиwww.codercc.com
1. Введение в блокировки чтения-записи
Чтобы решить проблему безопасности потоков в параллельных сценариях, мы почти с высокой частотой используем монопольные блокировки, обычно используя ключевое слово synchronized, предоставляемое java (о synchronized можносм. эту статью) или пакет concurrents, который реализует интерфейс блокировкиReentrantLock. Все они получают блокировки исключительно, то есть только один поток может получить блокировку за раз. В некоторых бизнес-сценариях большинство из них только считывают данные и записывают очень мало данных. Если читаются только данные, это не повлияет на правильность данных (происходит грязное чтение). В этом бизнес-сценарии по-прежнему используются эксклюзивные блокировки. Если да , ясно, что именно здесь возникает узкое место в производительности. В ответ на эту ситуацию больше читать и меньше писать, java также предоставляет другой ReentrantReadWriteLock (блокировка чтения-записи), который реализует интерфейс блокировки.Доступ к чтению и записи разрешен для нескольких потоков чтения одновременно, но когда поток записи обращается, все потоки чтения и другие потоки записи будут заблокированы.. Анализируя взаимное исключение между WirteLock и ReadLock, вы можете анализировать между WriteLock и WriteLock, между WriteLock и ReadLock и между ReadLock и ReadLock. Для получения дополнительной информации о введении функции блокировки чтения-записи вы можете прочитать введение в исходный код (лучший способ узнать при чтении исходного кода, я тоже учусь, и я поделюсь с вами), вот резюме:
- честный выбор: Поддерживает несправедливые (по умолчанию) и справедливые методы получения блокировки, и пропускная способность по-прежнему несправедлива, чем справедлива;
- повторный вход: поддержка повторного входа, блокировка чтения может быть получена снова после получения блокировки чтения, блокировка записи может быть получена снова после получения блокировки записи, и блокировка чтения также может быть получена одновременно;
- заблокировать понижение версии: Следуя последовательности получения блокировки записи, получения блокировки чтения и снятия блокировки записи, блокировку записи можно понизить до блокировки чтения.
Чтобы полностью понять блокировку чтения-записи, вы должны быть в состоянии понять следующие вопросы: 1. Как блокировка чтения-записи записывает статус чтения-записи отдельно? 2. Как устанавливаются и снимаются блокировки записи? 3. Как устанавливаются и снимаются блокировки чтения? Ответив на эти три вопроса, давайте узнаем о блокировках чтения-записи.
2. Напишите детали блокировки
2.1 Приобретение блокировок записи
Реализация компонента синхронизации объединяет синхронизатор (AQS) и реализует семантику синхронизации компонента синхронизации путем перезаписи метода в синхронизаторе перезаписи (AQS) (иерархия реализации компонента синхронизации может бытьсм. эту статью, базовый анализ реализации AQS можетсм. эту статью). Поэтому реализация блокировки записи пока такая. Блокировка записи не может быть получена несколькими потоками одновременно.Очевидно, что блокировка записи является монопольной блокировкой, а семантика синхронизации блокировки записи реализована путем перезаписи метода tryAcquire в AQS. Исходный код:
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
// 1. 获取写锁当前的同步状态
int c = getState();
// 2. 获取写锁获取的次数
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
// 3.1 当读锁已被读线程获取或者当前线程不是已经获取写锁的线程的话
// 当前线程获取写锁失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
// 3.2 当前线程获取写锁,支持可重复加锁
setState(c + acquires);
return true;
}
// 3.3 写锁未被任何线程获取,当前线程可获取写锁
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
Пожалуйста, смотрите комментарии для логики этого кода.Есть одно место, на котором следует сосредоточиться, метод exclusiveCount(c).Исходный код этого метода:
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
вEXCLUSIVE_MASKза:static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
EXCLUSIVE_MASK — это 1, сдвинутая влево на 16 бит, а затем вычтенная на 1, что равно 0x0000FFFF. Метод exclusiveCount добавляет состояние синхронизации (состояние типа int) с 0x0000FFFF, то есть берет младшие 16 бит состояния синхронизации. Так что же представляют младшие 16 бит? Согласно аннотации к методу exclusiveCount, количество эксклюзивных захватов — это количество раз, когда была получена блокировка записи, и теперь можно сделать вывод.Младшие 16 бит состояния синхронизации используются для указания количества захватов блокировки записи.. В то же время есть еще один метод, достойный нашего внимания:
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
Этот метод должен получить количество раз, когда была получена блокировка чтения, и он состоит в том, чтобы сдвинуть состояние синхронизации (int c) вправо 16 раз, то есть взять старшие 16 бит состояния синхронизации.Теперь мы можем сделать другой вывод.Старшие 16 бит состояния синхронизации используются для указания того, сколько раз была получена блокировка чтения.. Помните первый вопрос, который нам нужно было понять в начале? Каким образом блокировка чтения-записи записывает состояние блокировки чтения и блокировки записи отдельно?Теперь ответ на этот вопрос нами выяснен.Схема-схема представлена на следующем рисунке:
Теперь вернемся к методу получения блокировки записи tryAcquire, основная логика которого такова:Когда блокировка чтения была получена потоком чтения или блокировка записи была получена другими потоками записи, получение блокировки записи завершается ошибкой; в противном случае получение выполняется успешно, и поддерживается повторный вход, увеличивая состояние записи.
2.2. Снятие блокировки записи
Блокировка записи снимается путем переопределения метода tryRelease AQS. Исходный код:
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//1. 同步状态减去写状态
int nextc = getState() - releases;
//2. 当前写状态是否为0,为0则释放写锁
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
//3. 不为0则更新同步状态
setState(nextc);
return free;
}
Логику реализации исходного кода смотрите в комментариях, нетрудно понять, что это в основном то же самое, что и ReentrantLock, здесь нужно отметить, что состояние записи уменьшено.int nextc = getState() - releases;
просто используйтеПричина, по которой текущее состояние синхронизации напрямую вычитает состояние записи, заключается именно в том, что мы только что сказали о состоянии записи, представленном младшими 16 битами состояния синхронизации..
3. Прочитайте информацию о замке
3.1 Приобретение блокировок чтения
После чтения блокировки записи давайте посмотрим на блокировку чтения.Блокировка чтения не является монопольной блокировкой, то есть блокировка может быть получена несколькими потоками чтения одновременно, что является общей блокировкой. Согласно предыдущему введению в AQS, для реализации семантики синхронизации общих компонентов синхронизации необходимо переопределить методы AQS tryAcquireShared и tryReleaseShared. Метод получения блокировки чтения следующий:
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
//1. 如果写锁已经被获取并且获取写锁的线程不是当前线程的话,当前
// 线程获取读锁失败返回-1
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
//2. 当前线程获取读锁
compareAndSetState(c, c + SHARED_UNIT)) {
//3. 下面的代码主要是新增的一些功能,比如getReadHoldCount()方法
//返回当前获取读锁的次数
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
//4. 处理在第二步中CAS操作失败的自旋已经实现重入性
return fullTryAcquireShared(current);
}
Пожалуйста, смотрите комментарии для логики кода.Следует отметить, чтоКогда блокировка записи получена другим потоком, получение блокировки чтения завершается ошибкой., в противном случае статус синхронизации успешно обновляется с помощью CAS. Кроме того, текущее состояние синхронизации необходимо добавить с помощью SHARED_UNIT ((1 << SHARED_SHIFT)
То есть 0x00010000) Причина в том, что старшие 16 бит состояния синхронизации, о которых мы упоминали выше, используются для указания того, сколько раз была получена блокировка чтения. Если CAS дает сбой или поток, который получил блокировку чтения, получает блокировку чтения снова, это реализуется методом fullTryAcquireShared, этот код не будет расширяться, если вам интересно, вы можете посмотреть.
3.2. Снятие блокировки чтения
Реализация снятия блокировки чтения в основном осуществляется с помощью метода tryReleaseShared.Исходный код выглядит следующим образом.Пожалуйста, смотрите комментарии для основной логики:
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 前面还是为了实现getReadHoldCount等新功能
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
// 读锁释放 将同步状态减去读状态即可
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
4. Блокировка перехода на более раннюю версию
Блокировки чтения-записи поддерживают понижение блокировки,Следуя последовательности получения блокировки записи, получения блокировки чтения и снятия блокировки записи, блокировку записи можно понизить до блокировки чтения., не поддерживает укрупнение блокировки, следующий пример кода об укрупнении блокировки взят из исходного кода ReentrantWriteReadLock:
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// Must release read lock before acquiring write lock
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// Recheck state because another thread might have
// acquired write lock and changed state before we did.
if (!cacheValid) {
data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}
try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
}