Оригинальный адрес:Java 8 Concurrency Tutorial: Synchronization and Locks
Для простоты в примере кода для этого руководства используетсяздесьОпределены два вспомогательных метода,sleep(seconds)
иstop(executor)
Synchronized
Особую осторожность необходимо соблюдать, когда мы пишем многопоточный код для доступа к совместно используемым переменным. Ниже приведен пример многопоточности для изменения целого числа.
определить переменнуюcount
, определить методincrement()
сделатьcount
Добавьте 1.
int count = 0;
void increment() {
count = count + 1;
}
Когда несколько потоков вызываются одновременноincrement()
Проблема возникает, когда:
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 10000)
.forEach(i -> executor.submit(this::increment));
stop(executor);
System.out.println(count); // 9965
Результат выполнения приведенного выше кода не10000
, причина в том, что мы разделяем переменную в разных потоках, не устанавливая условие гонки для доступа к этой переменной.
Чтобы увеличить число, необходимо выполнить три шага: (i) прочитать текущее значение; (ii) увеличить значение на 1; (iii) записать новое значение в переменную; если два потока выполняют эти шаги параллельно, два потоки Шаг 1 может выполняться одновременно, таким образом читая одно и то же текущее значение. Это приводит к потере записи, поэтому фактический результат ниже. В приведенном выше примере 35 приращений теряются из-за одновременного подсчета асинхронного доступа, но вы можете увидеть другие результаты при самостоятельном выполнении кода.
К счастью,Java
ранний проходsynchronized
Ключевое слово поддерживает синхронизацию потоков. Мы можем воспользоваться синхронизацией, чтобы решить вышеуказанное состояние гонки при увеличении счетчика:
synchronized void incrementSync() {
count = count + 1;
}
когда мы используемincrementSync()
метод, мы получаем желаемый результат, и результат каждого выполнения выглядит так.
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 10000)
.forEach(i -> executor.submit(this::incrementSync));
stop(executor);
System.out.println(count); // 10000
synchronized
Ключи-значения также можно использовать в блоке операторов.
void incrementSync() {
synchronized (this) {
count = count + 1;
}
}
существуетJVM
Внутренне использует монитор, также известный как блокировка монитора и внутренняя блокировка, для управления синхронизацией. Этот монитор привязан к объекту, и при использовании синхронизированных методов каждый метод совместно использует монитор соответствующего объекта.
Все неявные мониторы реализуют повторный вход. Повторный вход означает, что блокировка привязана к текущему потоку, и поток может безопасно получить одну и ту же блокировку несколько раз без взаимоблокировки (например, синхронизированный метод вызывает другой синхронизированный метод для того же объекта).
Locks
Помимо использования ключевых словsynchronized
Помимо поддерживаемых неявных блокировок (встроенных блокировок объекта),Concurrency API поддерживаетсяLock
Различные блокировки дисплея, указанные интерфейсом. Блокировки отображения могут управлять более тонкой детализацией, поэтому они также имеют лучшую производительность и логически более четкие.
стандартныйJDK
Различные реализации блокировки дисплея представлены в , которые описаны в следующих разделах.
ReentrantLock
ReentrantLock
класс является мьютексом, он иsynchronized
Неявная блокировка для доступа по ключевому слову имеет ту же функциональность, но имеет расширенную функциональность. Он также реализует реентерабельную функциональность.
Давайте посмотрим, как использоватьReentrantLock
ReentrantLock lock = new ReentrantLock();
int count = 0;
void increment() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
блокировать черезlock()
пройти черезunlock()
релиз, инкапсулировать код вtry/finally
Блокировка очень важна для обеспечения того, чтобы блокировка также снималась в случае исключения. Этот метод и использование ключевых словsynchronized
Декорированный метод такой же, как потокобезопасный. Если поток получил блокировку, последующие потоки вызываютlock()
Поток приостанавливается до тех пор, пока блокировка не будет снята, и только один поток может получить блокировку.
lock
Поддерживает более детальное управление синхронизацией метода, как в следующем коде:
ExecutorService executor = Executors.newFixedThreadPool(2);
ReentrantLock lock = new ReentrantLock();
executor.submit(() -> {
lock.lock();
try {
sleep(1000);
} finally {
lock.unlock();
}
});
executor.submit(() -> {
System.out.println("Locked: " + lock.isLocked());
System.out.println("Held by me: " + lock.isHeldByCurrentThread());
boolean locked = lock.tryLock();
System.out.println("Lock acquired: " + locked);
});
stop(executor);
Когда первая задача получает блокировку, вторая задача получает информацию о состоянии блокировки:
Locked: true
Held by me: false
Lock acquired: false
в видеlock()
Альтернатива методуtryLock()
Чтобы попытаться получить блокировку без приостановки текущего потока, вы должны использоватьbool
В результате определяется, действительно ли блокировка получена.
ReadWriteLock
ReadWriteLock
Указан другой тип блокировки, блокировка чтения-записи. Логика, реализованная блокировкой чтения-записи, заключается в том, что, когда ни один поток не записывает переменную, другие потоки могут прочитать эту переменную, поэтому, когда ни один поток не удерживает блокировку записи, блокировку чтения могут удерживать все потоки. Если чтение выполняется чаще, чем запись, это повысит производительность и пропускную способность системы.
ExecutorService executor = Executors.newFixedThreadPool(2);
Map<String, String> map = new HashMap<>();
ReadWriteLock lock = new ReentrantReadWriteLock();
executor.submit(() -> {
lock.writeLock().lock();
try {
sleep(1000);
map.put("foo", "bar");
} finally {
lock.writeLock().unlock();
}
});
В приведенном выше примере сначала устанавливается блокировка записи, затемsleep
через 1 секунду вmap
записать значение в , до того, как эта задача завершится, отправляются еще две задачи, пытающиеся получить отmap
прочитанное значение:
Runnable readTask = () -> {
lock.readLock().lock();
try {
System.out.println(map.get("foo"));
sleep(1000);
} finally {
lock.readLock().unlock();
}
};
executor.submit(readTask);
executor.submit(readTask);
stop(executor);
При выполнении приведенного выше кода вы заметите, что задача с двумя чтениями должна дождаться завершения записи (пока выполняется чтение, запись не может получить блокировку). После снятия блокировки записи две задачи выполняются параллельно, и им не нужно ждать завершения другой, поскольку они могут одновременно удерживать блокировку чтения, пока ни один поток не удерживает блокировку записи.
StampedLock
Java 8
обеспечивает новый тип блокировкиStampedLock
, как и в приведенном выше примере, он также поддерживает блокировки чтения-записи сReadWriteLock
разница в том,StampedLock
Метод блокировки возвращаетlong
значение, вы можете использовать это значение, чтобы проверить, снята ли блокировка и действительна ли блокировка. Кроме тогоStampedLock
Поддерживается еще один режим, называемый оптимистической блокировкой.
использовать нижеStampedLock
заменитьReadWriteLock
ExecutorService executor = Executors.newFixedThreadPool(2);
Map<String, String> map = new HashMap<>();
StampedLock lock = new StampedLock();
executor.submit(() -> {
long stamp = lock.writeLock();
try {
sleep(1000);
map.put("foo", "bar");
} finally {
lock.unlockWrite(stamp);
}
});
Runnable readTask = () -> {
long stamp = lock.readLock();
try {
System.out.println(map.get("foo"));
sleep(1000);
} finally {
lock.unlockRead(stamp);
}
};
executor.submit(readTask);
executor.submit(readTask);
stop(executor);
пройти черезreadLock()
иwriteLock()
метод для получения блокировки чтения-записи возвращаетfinally
Значение в блоке для снятия блокировки. Обратите внимание, что блокировка здесь не является повторной. Новое значение возвращается каждый раз, когда оно заблокировано, и оно блокируется без блокировки. Будьте осторожны, чтобы не заблокировать его при использовании.
как спередиReadWriteLock
Как и в примере в , две задачи чтения должны ждать, пока задача записи снимет блокировку. Затем выводите результаты на консоль одновременно параллельно.
Следующий пример демонстрирует乐观锁
ExecutorService executor = Executors.newFixedThreadPool(2);
StampedLock lock = new StampedLock();
executor.submit(() -> {
long stamp = lock.tryOptimisticRead();
try {
System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
sleep(1000);
System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
sleep(2000);
System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
} finally {
lock.unlock(stamp);
}
});
executor.submit(() -> {
long stamp = lock.writeLock();
try {
System.out.println("Write Lock acquired");
sleep(2000);
} finally {
lock.unlock(stamp);
System.out.println("Write done");
}
});
stop(executor);
позвонивtryOptimisticRead()
получить乐观读写锁
,tryOptimisticRead()
Всегда возвращайте значение, не блокируя текущий поток и не зависимо от того, доступна ли блокировка. Возвращает, если активна блокировка записи0
. в состоянии пройтиlock.validate(stamp)
проверить возвращенный токен (long
значение) является действительным.
Выполнение приведенного выше кода выводит:
Optimistic Lock Valid: true
Write Lock acquired
Optimistic Lock Valid: false
Write done
Optimistic Lock Valid: false
Оптимистическая блокировка вступает в силу, как только блокировка получена. В отличие от обычных блокировок чтения, оптимистичные блокировки не препятствуют немедленному захвату другими потоками блокировок записи. После того, как первый поток заснул на одну секунду, второй поток получает блокировку записи, не дожидаясь снятия оптимистичной блокировки чтения. Оптимистичные блокировки чтения больше недействительны, даже если блокировки записи сняты, оптимистичные блокировки чтения по-прежнему недействительны.
Следовательно, при использовании оптимистической блокировки блокировку необходимо проверять после каждого доступа к любой общей переменной, чтобы гарантировать, что чтение все еще действительно.
Иногда бывает полезно преобразовать блокировку чтения в блокировку записи без необходимости разблокировки и повторной блокировки.StampedLock
предусмотрено для этогоtryConvertToWriteLock()
метод, как показано в следующем примере:
ExecutorService executor = Executors.newFixedThreadPool(2);
StampedLock lock = new StampedLock();
executor.submit(() -> {
long stamp = lock.readLock();
try {
if (count == 0) {
stamp = lock.tryConvertToWriteLock(stamp);
if (stamp == 0L) {
System.out.println("Could not convert to write lock");
stamp = lock.writeLock();
}
count = 23;
}
System.out.println(count);
} finally {
lock.unlock(stamp);
}
});
stop(executor);
Сначала задание получает读锁
, и выводит текущее значение счетчика переменных на консоль. Однако, если текущее значение равно 0, мы присваиваем новое значение 23. Мы должны сначала读锁
преобразовать в写锁
, чтобы не нарушать потенциальный одновременный доступ других потоков. перечислитьtryConvertToWriteLock()
Не блокирует, но может возвращать 0, указывая на то, что в настоящее время нет доступных блокировок записи. В этом случае мы вызываем writeLock(), чтобы заблокировать текущий поток, пока не станет доступной блокировка записи.
Semaphores
В дополнение к блокировкам API параллелизма также поддерживает счетные семафоры. Блокировки обычно предоставляют эксклюзивный доступ к переменным или ресурсам, тогда как семафоры поддерживают полный набор лицензий. В разных ситуациях необходимо ограничить количество одновременных доступов к определенным частям приложения.
Вот пример того, как ограничить доступ к длинным задачам:
ExecutorService executor = Executors.newFixedThreadPool(10);
Semaphore semaphore = new Semaphore(5);
Runnable longRunningTask = () -> {
boolean permit = false;
try {
permit = semaphore.tryAcquire(1, TimeUnit.SECONDS);
if (permit) {
System.out.println("Semaphore acquired");
sleep(5000);
} else {
System.out.println("Could not acquire semaphore");
}
} catch (InterruptedException e) {
throw new IllegalStateException(e);
} finally {
if (permit) {
semaphore.release();
}
}
};
IntStream.range(0, 10)
.forEach(i -> executor.submit(longRunningTask));
stop(executor);
Исполнители могут работать одновременно10
задачи, но мы используем5
семафор, тем самым ограничивая одновременный доступ к5
Кусок. использоватьtry/finally
блок, очень важно правильно освободить семафор даже в случае исключения.
Запуск приведенного выше кода выводит:
Semaphore acquired
Semaphore acquired
Semaphore acquired
Semaphore acquired
Semaphore acquired
Could not acquire semaphore
Could not acquire semaphore
Could not acquire semaphore
Could not acquire semaphore
Could not acquire semaphore
когда есть5
После того, как задача получает количество модели, последующие задачи не могут получить семафор. Но если перед5
задача выполнена,finally 块释放了型号量
, последующие потоки могут получить количество звездочек, и общее количество не превысит5
Кусок. позвони сюдаtryAcquire()
Для получения суммы модели устанавливается тайм-аут в 1 секунду, что означает, что когда потоку не удается получить семафор, он может заблокироваться и ждать 1 секунду, прежде чем получить его.