Введение в ReentrantReadWriteLock
Reentrant Lock ReentrantLock — монопольная блокировка. Одновременно к монопольной блокировке может получить доступ только один поток. Однако в большинстве сценариев большую часть времени предоставляется служба чтения, а служба записи занимает меньше времени. Однако в сервисе чтения нет проблемы гонки данных, и если один поток запрещает другим потокам чтение во время чтения, это неизбежно приведет к падению производительности. Поэтому предусмотрена блокировка чтения-записи.
Блокировки чтения-записи поддерживают пару блокировок, блокировку чтения и блокировку записи. Разделяя блокировки чтения и записи, параллелизм значительно улучшается по сравнению с обычными монопольными блокировками:
在同一时间,可以允许多个读线程同时访问。
但是,在写线程访问时,所有读线程和写线程都会被阻塞。
Основные возможности блокировки чтения-записи:
公平性:支持公平性和非公平性。
重入性:支持重入。读写锁最多支持 65535 个递归写入锁和 65535 个递归读取锁。
锁降级:遵循获取写锁,再获取读锁,最后释放写锁的次序,如此写锁能够降级成为读锁。
ReadWriteLock
java.util.concurrent.locks.ReadWriteLock , интерфейс блокировки чтения-записи. Метод определения следующий:
Lock readLock();
Lock writeLock();
Пара методов для получения блокировки чтения и блокировки записи объекта Lock соответственно.
ReentrantReadWriteLock
java.util.concurrent.locks.ReentrantReadWriteLock реализует интерфейс ReadWriteLock, класс реализации реентерабельной блокировки чтения-записи. Внутри него поддерживается пара связанных блокировок, одна для операций только для чтения, а другая для операций записи. Блокировки чтения могут одновременно удерживаться несколькими потоками чтения, если нет потоков записи.
Другими словами, блокировки записи являются эксклюзивными, а блокировки чтения — общими.
Общая структура класса ReentrantReadWriteLock выглядит следующим образом:
/** 内部类 读锁 */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** 内部类 写锁 */
private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync;
/** 使用默认(非公平)的排序属性创建一个新的 ReentrantReadWriteLock */
public ReentrantReadWriteLock() {
this(false);
}
/** 使用给定的公平策略创建一个新的 ReentrantReadWriteLock */
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
/** 返回用于写入操作的锁 */
@Override
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
/** 返回用于读取操作的锁 */
@Override
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
abstract static class Sync extends AbstractQueuedSynchronizer {
/**
* 省略其余源代码
*/
}
public static class WriteLock implements Lock, java.io.Serializable {
/**
* 省略其余源代码
*/
}
public static class ReadLock implements Lock, java.io.Serializable {
/**
* 省略其余源代码
*/
}
ReentrantReadWriteLock аналогичен ReentrantLock, его тело блокировки также является Sync, а его блокировка чтения и блокировка записи реализованы через Sync. Таким образом, ReentrantReadWriteLock на самом деле имеет только одну блокировку, но способ получения блокировки чтения и блокировки записи отличается.
Его блокировки чтения-записи соответствуют двум классам: ReadLock и WriteLock. Оба класса являются подклассами реализаций Lock.
В ReentrantLock состояние типа int для Sync (фактически AQS) используется для представления состояния синхронизации, которое представляет количество повторных захватов блокировки потоком. Однако блокировка чтения-записи ReentrantReadWriteLock внутренне поддерживает пару блокировок чтения-записи. его на две части: высокие 16 для чтения, нижние 16 для записи.
Как после разделения блокировка чтения-записи быстро определяет состояние блокировок чтения и записи? побитовыми операциями. Если текущее состояние синхронизации S, то:
写状态,等于 S & 0x0000FFFF(将高 16 位全部抹去)
读状态,等于 S >>> 16 (无符号补 0 右移 16 位)。
код показывает, как показано ниже:
// Sync.java
static final int SHARED_SHIFT = 16; // 位数
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; // 每个锁的最大重入次数,65535
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
ExclusiveCount(int c) Статический метод, сколько раз была получена блокировка, удерживающая состояние записи.
sharedCount(int c) Статический метод, количество потоков, получивших блокировку, удерживающую состояние чтения. В отличие от блокировок записи, блокировки чтения могут удерживаться несколькими потоками одновременно. Блокировки чтения, поддерживаемые каждым потоком, поддерживают функцию повторного входа, поэтому количество блокировок чтения, удерживаемых каждым потоком, необходимо подсчитывать отдельно, что требует использования счетчика HoldCounter.
Метод строительства
В приведенном выше конструкторе мы видели, создавать ли объект FairSync или NonfairSync на основе параметра Fair.
getThreadId
Статический метод getThreadId(Thread thread) для получения номера потока. код показывает, как показано ниже:
/**
* Returns the thread id for the given thread. We must access
* this directly rather than via method Thread.getId() because
* getId() is not final, and has been known to be overridden in
* ways that do not preserve unique mappings.
*/
static final long getThreadId(Thread thread) {
return UNSAFE.getLongVolatile(thread, TID_OFFSET);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long TID_OFFSET;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> tk = Thread.class;
TID_OFFSET = UNSAFE.objectFieldOffset
(tk.getDeclaredField("tid"));
} catch (Exception e) {
throw new Error(e);
}
}
Само собой разумеется, что метод Thread#getId(), соответствующий потоку, вызывается напрямую, код выглядит следующим образом:
private long tid;
public long getId() {
return tid;
}
Но на самом деле этот метод Thread не является финальной модификацией, то есть, если у нас есть подкласс, реализующий Thread, этот метод можно полностью переопределить, что может привести к невозможности получения атрибута tid. Поэтому в приведенном выше методе используйте Unsafe для прямого получения свойства tid. Он достоин быть исходным кодом JDK, и подумать об этом очень страшно.
Кроме того, в JDK-6346938 также обсуждалась проблема «java.lang.Thread.getId() должна быть окончательной», которая в настоящее время рассматривается JDK как ОШИБКА, но не была исправлена без причины.
Другие методы реализации
public final boolean isFair() {
return sync instanceof FairSync;
}
public int getReadLockCount() {
return sync.getReadLockCount();
}
public boolean isWriteLocked() {
return sync.isWriteLocked();
}
public boolean isWriteLockedByCurrentThread() {
return sync.isHeldExclusively();
}
public int getReadHoldCount() {
return sync.getReadHoldCount();
}
protected Collection<Thread> getQueuedWriterThreads() {
return sync.getExclusiveQueuedThreads();
}
protected Collection<Thread> getQueuedReaderThreads() {
return sync.getSharedQueuedThreads();
}
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public final boolean hasQueuedThread(Thread thread) {
return sync.isQueued(thread);
}
public final int getQueueLength() {
return sync.getQueueLength();
}
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
public boolean hasWaiters(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
}
public int getWaitQueueLength(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
}
protected Collection<Thread> getWaitingThreads(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
}
Чтение замков и запись замков
Выше мы также упомянули, что блокировка чтения и блокировка записи ReentrantReadWriteLock основана на его внутренней реализации Sync, поэтому конкретный метод реализации заключается в вызове внутреннего метода Sync.
ReadLock
ReadLock — это внутренний статический класс ReentrantReadWriteLock, который реализует интерфейс java.util.concurrent.locks.Lock и класс реализации блокировки чтения.
Метод строительства
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
Поле синхронизации через конструктор ReentrantReadWriteLock передается и использует свой объект синхронизации.
lock
@Override
public void lock() {
sync.acquireShared(1);
}
Вызовите метод #acquireShared(int arg) AQS, чтобы получить состояние синхронизации путем совместного использования. Следовательно, блокировка чтения может быть получена несколькими потоками одновременно.
lockInterruptibly
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
tryLock
/**
* Acquires the read lock only if the write lock is not held by
* another thread at the time of invocation.
*
* <p>Acquires the read lock if the write lock is not held by
* another thread and returns immediately with the value
* {@code true}. Even when this lock has been set to use a
* fair ordering policy, a call to {@code tryLock()}
* <em>will</em> immediately acquire the read lock if it is
* available, whether or not other threads are currently
* waiting for the read lock. This "barging" behavior
* can be useful in certain circumstances, even though it
* breaks fairness. If you want to honor the fairness setting
* for this lock, then use {@link #tryLock(long, TimeUnit)
* tryLock(0, TimeUnit.SECONDS) } which is almost equivalent
* (it also detects interruption).
*
* <p>If the write lock is held by another thread then
* this method will return immediately with the value
* {@code false}.
*
* @return {@code true} if the read lock was acquired
*/
@Override
public boolean tryLock() {
return sync.tryReadLock();
}
На самом деле причина та же, что и у tryLock ReentrantLock.
Метод реализации tryLock(), когда он реализован, надеется быстро узнать, можно ли получить блокировку, поэтому, даже если fair = true (с использованием честной блокировки), метод Sync#tryReadLock() все равно вызывается.
Если вы действительно хотите, чтобы #tryLock() была честной или нет, вы можете вызвать метод #tryLock(0, TimeUnit) для его реализации.
tryLock
@Override
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
unlock
@Override
public void unlock() {
sync.releaseShared(1);
}
Вызовите метод #releaseShared(int arg) AQS, чтобы поделиться состоянием синхронизации выпуска.
newCondition
@Override
public Condition newCondition() {
throw new UnsupportedOperationException();
}
Условия условий не поддерживаются.
WriteLock
Код для WriteLock аналогичен коду для ReadLock, за исключением того, что приобретается исключительно состояние синхронизации.
WriteLock — это внутренний статический класс ReentrantReadWriteLock, реализующий интерфейс java.util.concurrent.locks.Lock и реализующий класс блокировки записи.
Метод строительства
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
Поле синхронизации через конструктор ReentrantReadWriteLock передается и использует свой объект Sync.
lock
@Override
public void lock() {
sync.acquire(1);
}
Вызовите метод #.acquire(int arg) AQS, чтобы исключительно получить состояние синхронизации. Таким образом, блокировка записи может быть получена только одним потоком за раз.
lockInterruptibly
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
tryLock
/**
* Acquires the write lock only if it is not held by another thread
* at the time of invocation.
*
* <p>Acquires the write lock if neither the read nor write lock
* are held by another thread
* and returns immediately with the value {@code true},
* setting the write lock hold count to one. Even when this lock has
* been set to use a fair ordering policy, a call to
* {@code tryLock()} <em>will</em> immediately acquire the
* lock if it is available, whether or not other threads are
* currently waiting for the write lock. This "barging"
* behavior can be useful in certain circumstances, even
* though it breaks fairness. If you want to honor the
* fairness setting for this lock, then use {@link
* #tryLock(long, TimeUnit) tryLock(0, TimeUnit.SECONDS) }
* which is almost equivalent (it also detects interruption).
*
* <p>If the current thread already holds this lock then the
* hold count is incremented by one and the method returns
* {@code true}.
*
* <p>If the lock is held by another thread then this method
* will return immediately with the value {@code false}.
*
* @return {@code true} if the lock was free and was acquired
* by the current thread, or the write lock was already held
* by the current thread; and {@code false} otherwise.
*/
@Override
public boolean tryLock( ) {
return sync.tryWriteLock();
}
По сути, то же самое, что и tryLoc от ReentrantLock.
Метод реализации tryLock(), когда он реализован, надеется быстро узнать, можно ли получить блокировку, поэтому, даже если fair = true (с использованием честной блокировки), метод Sync#tryWriteLock() все равно вызывается. Если вы действительно хотите, чтобы #tryLock() была честной или нет, вы можете вызвать метод #tryLock(0, TimeUnit) для его реализации.
tryLock
@Override
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
unlock
@Override
public void unlock() {
sync.release(1);
}
Вызовите метод #release(int arg) AQS, чтобы исключительно отменить состояние синхронизации.
newCondition
@Override
public Condition newCondition() {
return sync.newCondition();
}
Вызовите метод Sync#newCondition(), чтобы создать объект Condition.
isHeldByCurrentThread
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
Вызовите метод Sync#isHeldExclusively(), чтобы определить, заблокирован ли он исключительно текущим потоком.
getHoldCount
public int getHoldCount() {
return sync.getWriteHoldCount();
}
Вызовите метод Sync#getWriteHoldCount(), чтобы вернуть количество эксклюзивных блокировок, удерживаемых текущим потоком.
Синхронизировать абстрактный класс
Синхронизация — это внутренний статический класс ReentrantReadWriteLock, который реализует абстрактный класс AbstractQueuedSynchronizer и абстрактный класс синхронизатора. Он использует поле состояния AQS для представления текущего количества удерживаемых блокировок, тем самым реализуя характеристики повторных блокировок и блокировок чтения-записи.
Метод строительства
private transient ThreadLocalHoldCounter readHolds; // 当前线程的读锁持有数量
private transient Thread firstReader = null; // 第一个获取读锁的线程
private transient int firstReaderHoldCount; // 第一个获取读锁的重入数
private transient HoldCounter cachedHoldCounter; // 最后一个获得读锁的线程的 HoldCounter 的缓存对象
Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}
writerShouldBlock
abstract boolean writerShouldBlock();
При получении блокировки записи, следует ли блокировать, если предыдущий узел также получает блокировку. Существуют разные реализации для NonefairSync и FairSync.
readerShouldBlock
abstract boolean readerShouldBlock();
При получении блокировки чтения, следует ли блокировать, если узел предварительного заказа также получает блокировку. Существуют разные реализации для NonefairSync и FairSync.
【Блокировка записи】попробуйтеПолучить
@Override
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
//当前锁个数
int c = getState();
//写锁
int w = exclusiveCount(c);
if (c != 0) {
//c != 0 && w == 0 表示存在读锁
//当前线程不是已经获取写锁的线程
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//超出最大范围
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
setState(c + acquires);
return true;
}
// 是否需要阻塞
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
//设置获取锁的线程为当前线程
setExclusiveOwnerThread(current);
return true;
}
Этот метод примерно такой же, как #tryAcquire(int arg) ReentrantLock, разница в том, что при оценке реентерабельности добавляется условие: существует ли блокировка чтения. Потому что необходимо сделать так, чтобы работа блокировки записи была видна блокировке чтения. Если разрешено получение блокировки записи при наличии блокировки чтения, другие потоки, получившие блокировку чтения, могут не знать об операциях текущего потока записи. Таким образом, блокировка записи может быть получена текущим потоком только после того, как блокировка чтения будет полностью снята.После получения блокировки записи все остальные потоки чтения и записи будут заблокированы.
Вызовите абстрактный метод #writerShouldBlock(), и если он вернет true, он не сможет получить блокировку записи.
[Блокировка чтения] tryAcquireShared
tryAcquireShared(int arg), попробуйте получить состояние синхронизации чтения, в случае успеха получите результат >= 0, в противном случае верните результат
protected final int tryAcquireShared(int unused) {
//当前线程
Thread current = Thread.currentThread();
int c = getState();
//exclusiveCount(c)计算写锁
//如果存在写锁,且锁的持有者不是当前线程,直接返回-1
//存在锁降级问题,后续阐述
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//读锁
int r = sharedCount(c);
/*
* readerShouldBlock():读锁是否需要等待(公平锁原则)
* r < MAX_COUNT:持有线程小于最大数(65535)
* compareAndSetState(c, c + SHARED_UNIT):设置读取锁状态
*/
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) { //修改高16位的状态,所以要加上2^16
/*
* holdCount部分后面讲解
*/
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
Процесс получения блокировки чтения немного сложнее, чем эксклюзивной блокировки, весь процесс выглядит следующим образом:
- Поскольку существует ситуация ухудшения блокировки, если есть блокировка записи, а держатель блокировки не является текущим потоком, он сразу вернется к сбою, в противном случае продолжится.
- В соответствии с принципом справедливости вызовите метод readerShouldBlock(), чтобы определить, не нужно ли блокировать блокировку чтения, количество потоков, удерживающих блокировку чтения, меньше максимального значения (65535) и CAS устанавливает блокировку статус до
fullTryAcquireShared
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
// 锁降级
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
}
// 读锁需要阻塞,判断是否当前线程已经获取到读锁
else if (readerShouldBlock()) {
//列头为当前线程
if (firstReader == current) {
}
//HoldCounter后面讲解
else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0) // 计数为 0 ,说明没得到读锁,清空线程变量
readHolds.remove();
}
}
if (rh.count == 0) // 说明没得到读锁
return -1;
}
}
//读锁超出最大范围
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//CAS设置读锁成功
if (compareAndSetState(c, c + SHARED_UNIT)) { //修改高16位的状态,所以要加上2^16
//如果是第1次获取“读取锁”,则更新firstReader和firstReaderHoldCount
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
}
//如果想要获取锁的线程(current)是第1个获取锁(firstReader)的线程,则将firstReaderHoldCount+1
else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
//更新线程的获取“读取锁”的共享计数
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
Этот метод будет обрабатываться в зависимости от того, «нужно ли блокировать ожидание», «превышает ли общий счетчик блокировки чтения лимит» и так далее. Если ожидание блокировки не требуется и количество долей блокировки не превышает лимита, делается попытка получить блокировку через CAS, и возвращается 1. так,Метод fullTryAcquireShared(Thread) представляет собой логику повторной попытки вращения метода #tryAcquireShared(int unused).
[Блокировка записи] tryRelease
protected final boolean tryRelease(int releases) {
//释放的线程不为锁的持有者
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
//若写锁的新线程数为0,则将锁的持有者设置为null
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
Весь процесс снятия блокировки с помощью блокировки записи аналогичен эксклюзивной блокировке ReentrantLock.Каждое освобождение уменьшает состояние записи.Когда состояние записи равно 0, это означает, что блокировка записи была полностью снята, так что другие ожидающие потоки могут продолжайте доступ к чтению и записи. заблокируйте, чтобы получить состояние синхронизации. В то же время модификация этого потока записи видна последующим потокам.
[Блокировка чтения]
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
//如果想要释放锁的线程为第一个获取锁的线程
if (firstReader == current) {
//仅获取了一次,则需要将firstReader 设置null,否则 firstReaderHoldCount - 1
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
}
//获取rh对象,并更新“当前线程获取锁的信息”
else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
//CAS更新同步状态
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
Метод unmatchedUnlockException() возвращает исключение IllegalMonitorStateException. код показывает, как показано ниже:
private IllegalMonitorStateException unmatchedUnlockException() {
return new IllegalMonitorStateException(
"attempt to unlock read lock, not locked by current thread");
}
Что происходит, так это то, что поток, который снимает блокировку чтения, не является потоком, который получил блокировку чтения. При обычном использовании этого не произойдет.
tryWriteLock
tryWriteLock(), попробуйте получить блокировку записи.
- Возвращает true, если получение прошло успешно.
- Если это не удается, верните false и не ждите постановки в очередь.
код показывает, как показано ниже:
final boolean tryWriteLock(){
Thread current = Thread.currentThread();
int c = getState();
if(c != 0){
int w = exclusiveCount(c); // 获得现在写锁获取的数量
if(w == 0 || current != getExclusiveOwnerThread()){ // 判断是否是其他的线程获取了写锁。若是,返回 false
return false;
}
if(w == MAX_COUNT){ // 超过写锁上限,抛出 Error 错误
throw new Error("Maximum lock count exceeded");
}
}
if(!compareAndSetState(c, c + 1)){ // CAS 设置同步状态,尝试获取写锁。若失败,返回 false
return false;
}
setExclusiveOwnerThread(current); // 设置持有写锁为当前线程
return true;
}
tryReadLock
tryReadLock(), попробуйте получить блокировку чтения.
Возвращает true, если получение прошло успешно. Если это не удается, верните false и не ждите постановки в очередь. код показывает, как показано ниже:
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
//exclusiveCount(c)计算写锁
//如果存在写锁,且锁的持有者不是当前线程,直接返回-1
//存在锁降级问题,后续阐述
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
// 读锁
int r = sharedCount(c);
/*
* HoldCount 部分后面讲解
*/
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}
isHeldExclusively
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
newCondition
final ConditionObject newCondition() {
return new ConditionObject();
}
Другие методы реализации
final Thread getOwner() {
// Must read state before owner to ensure memory consistency
return ((exclusiveCount(getState()) == 0) ?
null :
getExclusiveOwnerThread());
}
final int getReadLockCount() {
return sharedCount(getState());
}
final boolean isWriteLocked() {
return exclusiveCount(getState()) != 0;
}
final int getWriteHoldCount() {
return isHeldExclusively() ? exclusiveCount(getState()) : 0;
}
final int getReadHoldCount() {
if (getReadLockCount() == 0)
return 0;
Thread current = Thread.currentThread();
if (firstReader == current)
return firstReaderHoldCount;
HoldCounter rh = cachedHoldCounter;
if (rh != null && rh.tid == getThreadId(current))
return rh.count;
int count = readHolds.get().count;
if (count == 0) readHolds.remove();
return count;
}
/**
* Reconstitutes the instance from a stream (that is, deserializes it).
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
readHolds = new ThreadLocalHoldCounter();
setState(0); // reset to unlocked state
}
final int getCount() { return getState();
}
Класс реализации синхронизации
NonfairSync
NonfairSync — это внутренний статический класс ReentrantReadWriteLock, который реализует абстрактный класс Sync и класс реализации недобросовестной блокировки. код показывает, как показано ниже:
static final class NonfairSync extends Sync {
@Override
final boolean writerShouldBlock() {
return false; // writers can always barge
}
@Override
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
return apparentlyFirstQueuedIsExclusive();
}
}
Поскольку блокировка записи является эксклюзивной блокировкой, в случае несправедливой блокировки необходимо вызвать метод AQS по-видимомуFirstQueuedIsExclusive(), чтобы определить, была ли получена текущая блокировка записи. код показывает, как показано ниже:
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() && // 非共享,即独占
s.thread != null;
}
FairSync
FairSync — это внутренний статический класс ReentrantReadWriteLock, который реализует абстрактный класс Sync и класс реализации справедливой блокировки. код показывает, как показано ниже:
static final class FairSync extends Sync {
@Override
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
@Override
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
Вызовите метод #hasQueuedPredecessors() AQS, чтобы узнать, существует ли узел предварительного заказа, т. е. не является ли он первым узлом, ожидающим получения состояния синхронизации.
HoldCounter
В процессе получения и снятия блокировки чтения мы всегда можем видеть переменную rh (HoldCounter), которая играет очень важную роль в блокировке чтения.
Мы понимаем, что внутренний механизм разблокировки на самом деле является общей блокировкой.Чтобы лучше понять HoldCounter, мы временно считаем, что это не вероятность блокировки, а эквивалент счетчика. Операция общей блокировки эквивалентна операции со счетчиком. Для получения общей блокировки счетчик +1, для снятия общей блокировки счетчик -1. Общая блокировка может быть снята и повторно введена только после того, как поток получит общую блокировку. Следовательно, функцией HoldCounter является количество общих блокировок, удерживаемых текущим потоком.Это число должно быть привязано к потоку, иначе при работе с другими блокировками потока будет выдано исключение.
HoldCounter — это внутренний статический класс Sync.
static final class HoldCounter {
int count = 0; // 计数器
final long tid = getThreadId(Thread.currentThread()); // 线程编号
}
ThreadLocalHoldCounter — это внутренний статический класс Sync.
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
@Override
public HoldCounter initialValue() {
return new HoldCounter();
}
}
С помощью класса ThreadLocalholdCounter HoldCounter можно привязать к потоку. Следовательно, Holdcounter должен быть счетчиком связанного потока, а ThreadLocalholdCounter — Threadlocal, который является многопоточным.
Из приведенного выше видно, что ThreadLocal связывает HoldCounter с текущим потоком, а HoldCounter также содержит номер потока, так что при снятии блокировки мы можем узнать, является ли предыдущий поток чтения (cachedHoldCounter), кэшированный в ReadWriteLock, текущим потоком. . Преимущество этого заключается в том, что это может уменьшить количество вызовов метода ThreadLocal.get(), поскольку это также трудоемкая операция.
Следует отметить, что причина, по которой HoldCounter привязывает номер потока, а не объект потока, заключается в том, чтобы избежать привязки HoldCounter и ThreadLocal друг к другу, что затрудняет их освобождение сборщиком мусора (хотя сборщик мусора может интеллектуально находить такие ссылки). и перерабатывать их, но это требует определенной цены), так что по сути это просто помощь GC быстро перерабатывать объекты.
Видя это, мы понимаем роль HoldCounter и смотрим на сегмент кода, который получает блокировку чтения:
//如果获取读锁的线程为第一次获取读锁的线程,则firstReaderHoldCount重入数 + 1
else if (firstReader == current) {
firstReaderHoldCount++;
} else {
//非firstReader计数
if (rh == null)
rh = cachedHoldCounter;
//rh == null 或者 rh.tid != current.getId(),需要获取rh
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
//加入到readHolds中
else if (rh.count == 0)
readHolds.set(rh);
//计数+1
rh.count++;
cachedHoldCounter = rh; // cache for release
}
Вот объяснение того, почему вводятся переменные firstReader и firstReaderHoldCount. Это связано с проблемой эффективности, firstReader не будет помещаться в readHolds, если есть только одна блокировка чтения, он не будет искать readHolds.
заблокировать понижение версии
Особенностью блокировок чтения-записи является деградация блокировки. Понижение уровня блокировки означает, что уровень блокировки записи может быть понижен до блокировки чтения, но необходимо соблюдать порядок получения блокировки записи, получения блокировки чтения и снятия блокировки записи. Обратите внимание, что если текущий поток сначала получает блокировку записи, затем освобождает блокировку записи, а затем получает блокировку чтения, этот процесс нельзя назвать понижением уровня блокировки, и понижение уровня блокировки должно следовать этому порядку.
В методе получения блокировки чтения #tryAcquireShared(int unused) есть фрагмент кода для интерпретации деградации блокировки:
int c = getState();
//exclusiveCount(c)计算写锁
//如果存在写锁,且锁的持有者不是当前线程,直接返回-1
//存在锁降级问题,后续阐述
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//读锁
int r = sharedCount(c);
Требуется ли приобретение и снятие блокировок чтения при понижении уровня блокировки? Определенно необходимо. Только представьте, если текущий поток A не получает блокировку чтения, а сразу снимает блокировку записи, в это время другой поток B получает блокировку записи, то модификация данных этим потоком B не будет видна текущему потоку. А. Если блокировка чтения получена, поток B определяет, что, если блокировка чтения не была снята, она будет заблокирована в процессе получения блокировки записи.Только после того, как текущий поток A освободит блокировку чтения, поток B получит блокировку записи. заблокировать успешно.