Введение
ReentrantReadWriteLock — это реентерабельная блокировка чтения-записи, реализация которой также зависит от AQS. ПредставляемReentrantLockМы знаем, что он зависит от состояния синхронизации AQS, чтобы определить, занята ли блокировка, а ReentrantReadWriteLock имеет блокировку чтения и записи.Как он полагается на одно состояние для его поддержания?
ReentrantReadWriteLock
ReentrantReadWriteLock — это блокировка чтения-записи, которая по умолчанию является несправедливой, как и ReentrantLock.Внутренне определены блокировка чтения ReadLock() и блокировка записи WriteLock().Доступ к нему разрешен для нескольких потоков чтения одновременно, но при доступе к потоку записи все потоки чтения и записи будут заблокированы.. Основные возможности блокировки чтения-записи:Справедливость, повторный вход, деградация блокировки
Получение и снятие блокировок записи
Блокировка записи — это эксклюзивная блокировка, которая поддерживает повторный вход.Основной метод ее получения:
protected final boolean tryAcquire(int acquires) {
// 获取当前线程
Thread current = Thread.currentThread();
// 获取ReentrantReadWriteLock锁整体同步状态
int c = getState();
// 获取写锁同步状态
int w = exclusiveCount(c);
// 存在读锁或写锁
if (c != 0) {
// c != 0 && w == 0 即若存在读锁或写锁持有线程不是当前线程,获取写锁失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 最多65535次重入,若超过报错
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 可重入,设置同步状态
setState(c + acquires);
return true;
}
// 公平与非公平,同步队列是否有节点,同时cas设置状态
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
// 设置获取锁的线程为当前线程
setExclusiveOwnerThread(current);
return true;
}
Из исходного кода мы можем обнаружить, что getState() получает следующее:Общее состояние синхронизации блокировки чтения и записи, а затем получить статус синхронизации блокировки записи отдельно с помощью метода exclusiveCount()
static final int SHARED_SHIFT = 16;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
static int exclusiveCount(int c) {
return c & EXCLUSIVE_MASK;
}
ReentrantReadWriteLock обрезает переменную состояния побитово,Младшие 16 бит состояния синхронизации представляют количество вхождений блокировки записи, а старшие 16 бит представляют количество вхождений блокировки чтения., как показано на рисунке
Таким образом, это объясняет, почему блокировка записи срабатывает до 65535 раз.
Общая идея приобретения блокировки записи:Когда блокировка чтения была получена потоком чтения или блокировка записи была получена другими потоками записи, получение блокировки записи завершается ошибкой; в противном случае получение выполняется успешно и повторно, что увеличивает состояние синхронизации блокировки записи.
protected final boolean tryRelease(int releases) {
// 若释放的线程不为锁的持有者
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 重新设置同步状态
int nextc = getState() - releases;
// 若新的写锁持有线程数为0,则将锁的持有线程置为null
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
// 更新同步状态
setState(nextc);
return free;
}
Снятие блокировки записи в основном похоже на процесс снятия ReentrantLock.Состояние записи уменьшается каждый раз, когда оно снимается.Когда состояние записи равно 0
Когда блокировка записи снята, ожидающий поток чтения-записи может продолжить доступ к блокировке чтения-записи, и модификация предыдущего потока записи будет видна следующему потоку чтения-записи.Получение и освобождение блокировки чтения
Блокировки чтения относятся к блокировкам записи (монопольные блокировки или монопольные блокировки), а блокировки чтения являются реентерабельными.общий замок, она может быть получена несколькими потоками одновременно, когда нет доступа к другим потокам записи (или статус записи равен 0), блокировка чтения всегда будет успешно получена, и все, что она делает, это (поточно-безопасное) увеличение чтения положение дел
protected final int tryAcquireShared(int unused) {
// 获取当前线程
Thread current = Thread.currentThread();
int c = getState();
// 若写锁已被占有,且写锁占有线程不是当前线程,读锁获取失败
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 读锁状态
int r = sharedCount(c);
// 判断读锁是否需要公平,读锁持有线程数是否小于极值,CAS设置读锁状态
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 若读锁未被线程占有,则更新firstReader和firstReaderHoldCount
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
// 如果获取读锁的线程为第一次获取读锁的线程,则firstReaderHoldCount重入数 + 1
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
Общая идея приобретения блокировки чтения:
① Определите, занята ли блокировка записи, и является ли поток, удерживающий блокировку записи, не текущим потоком, если это так, получение блокировки чтения завершается неудачно.
②. Определите, должна ли блокировка чтения быть справедливой, число потоков, удерживаемых блокировкой чтения, меньше предельного значения, и CAS успешно устанавливает статус блокировки чтения.Если условия не выполняются, fullTryAcquireShared() метод будет вызван для вращения и повторной попытки получить блокировку чтения; если условия соблюдены, изменить значение HoldCounter текущего потока.
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
// 若写锁已被占有,且写锁占有线程不是当前线程
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// 公平性
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
// 读锁占有线程达到极值
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// cas设置成功
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 若读锁未被线程占有,则更新firstReader和firstReaderHoldCount
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
// 如果获取读锁的线程为第一次获取读锁的线程,则firstReaderHoldCount重入数 + 1
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 若当前线程为第一个获取读锁的线程
if (firstReader == current) {
// 若只有获取一次,将firstReader置为null
if (firstReaderHoldCount == 1)
firstReader = null;
// 若多次,firstReaderHoldCount-1
else
firstReaderHoldCount--;
} else {
// 更新当前线程获取锁次数
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
// 自旋CAS更新读锁同步状态
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
static final class HoldCounter { int count = 0; final long tid = getThreadId(Thread.currentThread()); }
скопировать кодstatic final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } } private transient HoldCounter cachedHoldCounter; private transient int firstReaderHoldCount; private transient Thread firstReader = null;
заблокировать понижение версии
Понижение уровня блокировки относится к понижению уровня блокировки записи до блокировки чтения, т.е.Процесс первого получения блокировки записи, получения блокировки чтения и снятия блокировки записи., цель состоит в том, чтобы обеспечить видимость данных. Предположим, что есть два потока A и B. Если поток A получает блокировку записи и не получает блокировку чтения, а сразу снимает блокировку записи, то поток B получает блокировку записи и изменяет данные, тогда поток A не может знать данные нить B возобновить. Если поток A получает блокировку чтения, то есть следуя шагам понижения уровня блокировки, поток B будет заблокирован до тех пор, пока поток A не использует данные и не освободит блокировку чтения, после чего поток B может получить блокировку записи для обновления данных.
Суммировать
Когда поток получает блокировку чтения, никакие другие потоки не могут получить блокировку записи.
Не разрешено, когда поток получает блокировку записиразноеПотоки получают блокировки чтения и записи
Блокировку записи можно понизить до блокировки чтения, но блокировку чтения нельзя повысить до блокировки записи.
благодарный
Искусство параллельного программирования на Java
http://cmsblogs.com/?p=2213