🌹Рекомендация проекта с открытым исходным кодом🌹
Pepper Metricsэто инструмент с открытым исходным кодом, разработанный мной и моими коллегами (Github.com/Live-ActionCool/pep...), который собирает текущую статистику производительности jedis/mybatis/httpservlet/dubbo/motan и предоставляет ее для основных данных, совместимых с базой данных временных рядов, таких как prometheus, и отображает тенденции через grafana. Его подключаемая архитектура также очень удобна для пользователей, поскольку позволяет расширять и интегрировать другие компоненты с открытым исходным кодом.
Пожалуйста, дайте звезду и приветствуйте всех, кто станет разработчиком, чтобы отправить Prote для улучшения проекта.
К точке...
ReentrantLock, реентерабельная блокировка, — это высокопроизводительный инструмент, добавленный в пакет concurrent в JDK5. Как следует из названия, ReentrantLock поддерживает многократное получение блокировки одним и тем же потоком без снятия блокировки.
Каждый внешний вид должен иметь значение. Поскольку уже есть уже синхронизированный старый уровень, и синхронизирован также поддерживает ReentRancy, почему Дуг Леа написал ReentrantLock?
0 Сравнение ReentrantLock и синхронизировано
0.1 Сравнение производительности
Во-первых, производительность ReentrantLock лучше, чем у синхронизированного. Ниже приведено сравнение двух кодов. Первый синхронизирован:
public class LockDemo2 {
private static final Object lock = new Object(); // 定义锁对象
private static int count = 0; // 累加数
public static void main(String[] args) throws InterruptedException {
long start = System.currentTimeMillis();
CountDownLatch cdl = new CountDownLatch(100);
// 启动100个线程对count累加,每个线程累加1000000次
// 调用add函数累加,通过synchronized保证多线程之间的同步
for (int i=0;i<100;i++) {
new Thread(() -> {
for (int i1 = 0; i1 <1000000; i1++) {
add();
}
cdl.countDown();
}).start();
}
cdl.await();
System.out.println("Time cost: " + (System.currentTimeMillis() - start) + ", count = " + count);
}
private static void add() {
synchronized (lock) {
count++;
}
}
}
Тогда это Reentrantlock:
public class LockDemo3 {
private static Lock lock = new ReentrantLock(); // 重入锁
private static int count = 0;
public static void main(String[] args) throws InterruptedException {
long start = System.currentTimeMillis();
CountDownLatch cdl = new CountDownLatch(100);
for (int i=0;i<100;i++) {
new Thread(() -> {
for (int i1 = 0; i1 <1000000; i1++) {
add();
}
cdl.countDown();
}).start();
}
cdl.await();
System.out.println("Time cost: " + (System.currentTimeMillis() - start) + ", count = " + count);
}
// 通过ReentrantLock保证线程之间的同步
private static void add() {
lock.lock();
count++;
lock.unlock();
}
}
Ниже приводится сравнение результатов многократного запуска:
synchronized | ReentrantLock | |
---|---|---|
первый раз | 4620 ms | 3360 ms |
второй раз | 4086 ms | 3138 ms |
в третий раз | 4650 ms | 3408 ms |
В целом, средняя производительность ReentrantLock примерно на 20% выше, чем у синхронизированной.
PS: спасибо @Wild Seven Uncle за исправление. Более строгое описание этого сравнения производительности: при большом количестве потоков, конкурирующих за блокировки, ReentrantLock в большинстве случаев работает лучше, чем синхронизированный.
Поскольку синхронизация оптимизирована в JDK6, когда конкуренция блокировки не является свирепом, в большинстве случаев замок останется на блокировке смещения и легкими этапами блокировки, а производительность этих двух этапов очень хорошая. Когда есть много соревнований, он может расширяться в тяжелые замки, и производительность будет деградирована. В это время ReentrantLock должен быть лучше синхронизирован.
0.2 Сравнение справедливости с приобретением блокировки
Что такое понятие справедливости? Если блокировка получена справедливо, это означает, что когда несколько потоков получают блокировку, они должны встать в очередь и получить блокировку по очереди; если блокировка получена несправедливо, это означает, что когда несколько потоков получают блокировку, они устремятся вперед. и кто схватится за замок.
Поскольку синхронизация реализована на основе механизма монитора, она поддерживает только несправедливые блокировки, а ReentrantLock поддерживает как честные, так и несправедливые блокировки.
0.3 Обзор
В дополнение к вышеперечисленному, у ReentrantLock есть еще несколько Synchronized, которые здесь суммированы.
synchronized | ReentrantLock | |
---|---|---|
представление | Относительно плохо | Лучше синхронизированного примерно на 20% |
справедливость | Только блокировка недоступной блокировки | Поддерживает как справедливые замки и недобросовестные замки |
Попытка получить поддержку блокировки | Не поддерживается, как только блок синхронизации достигнут, а блокировка не получена, заблокируйте здесь | Поддержка, с помощью метода tryLock вы можете судить об успешном получении блокировки по возвращаемому значению, поэтому даже если получение блокировки не удастся, здесь она не заблокируется. |
Тайм-аут получения блокировки | Не поддерживается, если блокировка не может быть получена все время, она будет ждать вечно | Поддержка достижения с помощью метода tryLock (время, TimeUnit), если не получить тайм-аут блокировки, отказаться от получения блокировки блокировка не пойдет |
Следует ли реагировать на прерывания | Не поддерживается, не может реагировать на сигнал прерывания потока | Поддерживается методом lockInterruptably.После получения блокировки с помощью этого метода поток может ответить на сигнал прерывания и выдать InterruptedException. |
Поддержка состояния ожидания | Поддержка достигается путем ожидания, уведомления, уведомленияВсе | Поддержка, реализованная через интерфейс Conditon, поддержка нескольких условий, более гибкая, чем синхронизированная |
1 Принцип реализации реентерабельной функции
Реализация ReentrantLock основана на Синхронизаторе Очередей (AbstractQueuedSynchronizer, далее AQS) Принцип реализации AQS можно посмотреть в другой статье автора:Что такое синхронизатор очереди Java (AQS)
Функциональность повторного входа ReentrantLock основана на синхронном состоянии AQS: состояние.
Принцип примерно таков: когда поток получает блокировку, значение состояния равно +1, и поток, удерживающий блокировку в данный момент, записывается, а когда другой поток получает блокировку, оценивается, являются ли поток и поток, удерживающий блокировку, то же самое Поток, если он есть, увеличивает значение состояния на 1, если нет, то блокирует поток. Когда поток освобождает блокировку, значение состояния равно -1.Когда значение состояния уменьшается до 0, это означает, что текущий поток полностью освободил блокировку, а затем поле, которое записывает поток, в настоящее время удерживающий блокировку, устанавливается на null, и другие потоки пробуждаются, чтобы повторно конкурировать за блокировку.
// acquires的值是1
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取state的值
int c = getState();
// 如果state的值等于0,表示当前没有线程持有锁
// 尝试将state的值改为1,如果修改成功,则成功获取锁,并设置当前线程为持有锁的线程,返回true
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// state的值不等于0,表示已经有其他线程持有锁
// 判断当前线程是否等于持有锁的线程,如果等于,将state的值+1,并设置到state上,获取锁成功,返回true
// 如果不是当前线程,获取锁失败,返回false
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
2 Принцип реализации несправедливой блокировки
ReentrantLock имеет два конструктора:
// 无参构造,默认使用非公平锁(NonfairSync)
public ReentrantLock() {
sync = new NonfairSync();
}
// 通过fair参数指定使用公平锁(FairSync)还是非公平锁(NonfairSync)
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
sync — это переменная-член ReentrantLock, экземпляр его внутреннего класса Sync. И NonfairSync, и FairSync являются подклассами класса Sync. Вы можете обратиться к следующей диаграмме классов:
Синхрон наследует AQS, поэтому у него есть функция AQS. Точно так же NonfairSync и FairSync являются подклассами AQS.
Когда мы получаем экземпляр ReentrantLock через конструктор без аргументов, по умолчанию используется несправедливая блокировка.
Далее будет описан принцип реализации несправедливых блокировок в следующих сценариях: Предположим, что один поток (t1) получает блокировку, а множество других потоков (others_t), которые не получили блокировку, присоединяются к очереди синхронизации AQS и ждут. завершает выполнение, он освобождается.После блокировки другие потоки повторно конкурируют за блокировку.
Сначала опишу способ получения блокировки:
final void lock() {
// 线程t1成功的将state的值从0改为1,表示获取锁成功
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// others_t线程们没有获取到锁
acquire(1);
}
В случае сбоя блокировки будет вызван метод AQUIRE AQS.
public final void acquire(int arg) {
// tryAcquire是个模板方法,在NonfairSync中实现,如果在tryAcquire方法中依然获取锁失败,会将当前线程加入同步队列中等待(addWaiter)
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
Реализация tryAcquire выглядит следующим образом, которая фактически вызывает вышеуказанный метод nonfairTryAcquire.
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
Хорошо, в это время t1 получил блокировку, и все потоки other_t запускаются в очередь синхронизации и ждут.
В определенный момент выполнение собственной задачи t1 завершается, и вызывается метод снятия блокировки (разблокировки).
public void unlock() {
// 调用AQS的release方法释放资源
sync.release(1);
}
public final boolean release(int arg) {
// tryRelease也是模板方法,在Sync中实现
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 成功释放锁后,唤醒同步队列中的下一个节点,使之可以重新竞争锁
// 注意此时不会唤醒队列第一个节点之后的节点,这些节点此时还是无法竞争锁
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
// 将state的值-1,如果-1之后等于0,释放锁成功
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
В этот момент блокировка снимается, и пробужденный поток повторно конкурирует за блокировку с новым потоком (исключая те потоки, которые находятся за очередью синхронизации).
Вернемся к методу блокировки, так как в это время все потоки могут получить блокировку через CAS, не гарантируется, что пробужденный поток сможет конкурировать с новым потоком, поэтому это несправедливо. Это реализация несправедливой блокировки.
Этот процесс можно описать на фиг примерно так:
3 Принцип реализации справедливой блокировки
Логика снятия блокировок для справедливых и нечестных блокировок одинакова. Оба вызывают описанный выше метод разблокировки. Самая большая разница заключается в том, когда блокировка получена.
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
// 获取锁,与非公平锁的不同的地方在于,这里直接调用的AQS的acquire方法,没有先尝试获取锁
// acquire又调用了下面的tryAcquire方法,核心在于这个方法
final void lock() {
acquire(1);
}
/**
* 这个方法和nonfairTryAcquire方法只有一点不同,在标注为#1的地方
* 多了一个判断hasQueuedPredecessors,这个方法是判断当前AQS的同步队列中是否还有等待的线程
* 如果有,返回true,否则返回false。
* 由此可知,当队列中没有等待的线程时,当前线程才能尝试通过CAS的方式获取锁。
* 否则就让这个线程去队列后面排队。
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// #1
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
Из комментариев видно, что при механизме честных блокировок любой поток, желающий получить блокировку, должен встать в очередь, и сократить очередь невозможно. Вот как работают честные замки.
Этот процесс можно примерно описать следующим образом:
4 принцип блокировки
Trylock делает очень просто: пусть нынешняя тема попытается получить блокировку, вернуться к true, в противном случае false.
Реализация фактически называет NonfairtraCquire Method для получения блокировки.
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
Что касается сбоя получения, он не будет добавлять себя в очередь синхронизации для ожидания, а напрямую вернет false, чтобы код бизнес-вызова мог справиться с этим самостоятельно.
5 Прерываемая блокировка получения
Прерывание, то есть прерывание потока с помощью метода прерывания Thread, прерывание потока в заблокированном состоянии вызовет исключение InterruptedException.
Если получение блокировки прерываемо, когда поток не может получить блокировку в течение длительного времени, мы можем активно прерывать его, чтобы избежать взаимоблокировки.
Он реализован следующим образом:
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
AquireiNebraphe Method, который называет AQS
public final void acquireInterruptibly(int arg)
throws InterruptedException {
// 判断当前线程是否已经中断,如果已中断,抛出InterruptedException异常
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
В это время сначала попытаются приобрести замок через Tryacquire. Если приобретение не удается, он добавит себя в очередь, чтобы дождаться, и может реагировать на прерывание в любое время.
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
// 将自己添加到队列中等待
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
// 自旋的获取锁
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
// 获取锁失败,在parkAndCheckInterrupt方法中,通过LockSupport.park()阻塞当前线程,
// 并调用Thread.interrupted()判断当前线程是否已经被中断
// 如果被中断,直接抛出InterruptedException异常,退出锁的竞争队列
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// #1
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
PS: В непрерывном режиме исключение InterruptedException не будет выброшено в позицию кода #1, а просто запишет, что текущий поток прерван.
6 Суммированного замка
Это реализовано следующими методами: тайм-аут — это время ожидания, а единица представляет единицу времени (миллисекунды, секунды...)
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
Можно обнаружить, что это также метод, который может реагировать на прерывания. Затем вызовите метод tryAcquireNanos AQS:
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
Метод DOACKQUIRENANOS аналогичен методу в прерывании. Различия объясняются в комментариях ниже:
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
// 计算超时截止时间
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 计算到截止时间的剩余时间
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) // 超时了,获取失败
return false;
// 超时时间大于1000纳秒时,才阻塞
// 因为如果小于1000纳秒,基本可以认为超时了(系统调用的时间可能都比这个长)
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 响应中断
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
7 резюме
Эта статья сначала сравнивает разницу между ветеранским блокировкой синхронизации и ReentrantLock. ReentrantLock имеет следующие преимущества:
- Поддерживает как честные, так и нечестные блокировки.
- Поддержка: попробуйте неблокирующее получение одноразовой блокировки
- Время ожидания поддержки для получения блокировки
- Поддержка прерываемых блокировок получения
- Поддержка дополнительных условий ожидания (условие)
Затем вводит реализацию принципа нескольких ключевых функций, которые основаны на AQS.
Суть ReentrantLock заключается в синхронизации состояния блокировки путем изменения значения состояния в AQS. Таким образом достигается реентерабельность.
ReentrantLock имеет справедливые блокировки и нечестные блокировки, а нечестные блокировки используются по умолчанию. Принцип его реализации во многом зависит от очереди синхронизации в AQS.
Наконец, внутренний механизм может быть прерван потоком. Возможна (), чтобы определить, прервана ли текущий поток, бросить прерываниеException, если прерывают исключение для достижения.
Добро пожаловать в мой публичный аккаунт WeChat