Что такое семафор?
Семафор — счетный семафор. Semaphore управляет рядом лицензий. Каждый метод получения блокируется до тех пор, пока не будет доступна лицензия, а затем принимает лицензию; каждый метод выпуска добавляет лицензию, которая может освободить блокирующий метод получения. Однако фактического объекта лицензии нет, Semaphore просто поддерживает ряд доступных лицензий.
Сценарии применения
Семафор можно использовать для управления потоком, особенно в сценариях приложений с ограниченными общедоступными ресурсами, такими как подключения к базе данных. Если требуется прочитать данные десятков тысяч файлов, поскольку все они являются задачами с интенсивным вводом-выводом, мы можем запустить десятки потоков для одновременного чтения, но если память читается, ее необходимо хранить в базе данных. , и количество подключений к базе данных составляет всего 10. В настоящее время мы должны контролировать только десять потоков, чтобы получать подключения к базе данных и одновременно сохранять данные, в противном случае будет сообщено об ошибке, и подключение к базе данных не может быть получено. На данный момент мы можем использовать Semaphore для управления потоком, код выглядит следующим образом:
package org.java.base.thread;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;public class SemaphoreTest {
private static final int THREAD_COUNT = 30;
private static ExecutorService threadPool = Executors
.newFixedThreadPool(THREAD_COUNT);private static Semaphore s = new Semaphore(10);
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
s.acquire();
System.out.println("сохранить данные");
s.release();
} catch (InterruptedException e) {
}
}
});
}threadPool.shutdown();
}
}
В коде, несмотря на то, что выполняется 30 потоков, разрешено только 10 одновременных исполнений. Конструктор семафора Semaphore(int Permits) принимает целое число, представляющее количество доступных разрешений. Semaphore(10) означает, что 10 потокам разрешено получать лицензии, то есть максимальное количество одновременных потоков равно 10. Использование Semaphore также очень простое.Сначала поток использует функцию accept() Semaphore для получения лицензии, а затем вызывает функцию release() для возврата лицензии после использования. Вы также можете попробовать получить лицензию с помощью метода tryAcquire().
Другие методы
Semaphore также предлагает некоторые другие методы:
- int availablePermits() : Возвращает количество лицензий, доступных в данный момент в этом семафоре.
- int getQueueLength(): возвращает количество потоков, ожидающих получения лицензии.
- boolean hasQueuedThreads() : ожидают ли какие-либо потоки получения лицензии.
- void reducePermits(int reduce) : Уменьшить количество разрешений. является защищенным методом.
- Коллекция getQueuedThreads() : возвращает коллекцию всех потоков, ожидающих получения лицензии. является защищенным методом.
Анализ исходного кода
Семафор имеет два режима: честный и нечестный. Справедливый режим заключается в том, что порядок вызова получения является порядком получения лицензии, следующим за FIFO; нечестный режим является вытесняющим, то есть возможно, что новый поток получения получает лицензию сразу после освобождения лицензии. и предыдущий Есть также ожидающие потоки.
Метод строительства
Semaphore имеет два конструктора, а именно:
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Как видно из вышеизложенного, два метода построения должны предоставлять количество лицензий.Второй метод построения может указывать, является ли режим честным или несправедливым, и по умолчанию используется несправедливый режим.
Семафор внутренне основан на общем режиме AQS, поэтому реализация делегирована классу Sync.
Вот посмотрите на метод построения NonfairSync:
NonfairSync(int permits) {
super(permits);
}
Вы можете видеть, что конструктор родительского класса вызывается напрямую.Конструктор Sync выглядит следующим образом:
Sync(int permits) {
setState(permits);
}
Вы видите, что вызывается метод setState, а это значит, что ресурс в AQS — это количество лицензий.
Получать разрешение
Начнем с приобретения лицензии, и посмотрим на реализацию в недобросовестном режиме. Сначала взгляните на метод Acquisition, у него есть несколько перегруженных версий, но основная из них — это следующий метод.
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
Как видно из вышеизложенного, вызывается методAcquireSharedInterruptably из Sync, который находится в родительском классе AQS, следующим образом:
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//如果线程被中断了,抛出异常
if (Thread.interrupted())
throw new InterruptedException();
//获取许可失败,将线程加入到等待队列中
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
Если подкласс AQS хочет использовать общий режим, ему необходимо реализовать метод tryAcquireShared Давайте посмотрим на реализацию этого метода NonfairSync:
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
Этот метод вызывает метод nonfairTyAcquireShared в родительском классе следующим образом:
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
//获取剩余许可数量
int available = getState();
//计算给完这次许可数量后的个数
int remaining = available - acquires;
//如果许可不够或者可以将许可数量重置的话,返回
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
Как видно из вышеизложенного, возвращаемое значение будет меньше 0 только тогда, когда лицензии не хватит, а остальные вернут количество оставшихся лицензий, что объясняет, как только лицензии не хватит, последующие потоки будут заблокирован. После прочтения недобросовестного приобретения, давайте посмотрим на честное приобретение.Код выглядит следующим образом:
protected int tryAcquireShared(int acquires) {
for (;;) {
//如果前面有线程再等待,直接返回-1
if (hasQueuedPredecessors())
return -1;
//后面与非公平一样
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
Как видно из вышеизложенного, разница между FairSync и NonFairSync в том, что он сначала определит, есть ли потоки, ожидающие в текущей очереди, и если есть, то честно войдет в очередь ожидания, вместо того, чтобы пытаться сначала, как NonfairSync, может быть, он только что получил разрешение, чтобы он мог перейти в очередь.
После прочтения Получение разрешений посмотрите на Освобождение разрешений.
лицензия на выпуск
Также есть несколько перегруженных методов для освобождения разрешения, но все они вызывают следующий метод с параметрами:
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
Метод releaseShared в AQS выглядит следующим образом:
public final boolean releaseShared(int arg) {
//如果改变许可数量成功
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
Подклассы AQS, реализующие общий режим, должны реализовать класс tryReleaseShared, чтобы определить, был ли выпуск успешным.Реализация выглядит следующим образом:
protected final boolean tryReleaseShared(int releases) {
for (;;) {
//获取当前许可数量
int current = getState();
//计算回收后的数量
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//CAS改变许可数量成功,返回true
if (compareAndSetState(current, next))
return true;
}
}
Как видно из вышеприведенного, как только CAS успешно изменит количество лицензий, будет вызван метод doReleaseShared() для освобождения заблокированного потока.
Уменьшить количество лицензий
Semaphore также имеет метод сокращения количества лицензий, который можно использовать для уменьшения количества лицензий, когда ресурсы исчерпаны и больше не могут использоваться. код показывает, как показано ниже:
protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
}
Как видите, делегированный в Sync метод reducePermits для Sync выглядит следующим образом:
final void reducePermits(int reductions) {
for (;;) {
//得到当前剩余许可数量
int current = getState();
//得到减完之后的许可数量
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
//如果CAS改变成功
if (compareAndSetState(current, next))
return;
}
}
Как видно из вышеизложенного, именно CAS изменяет переменную состояния в AQS, потому что эта переменная представляет количество лицензий.
Получить оставшееся количество лицензий
Semaphore также может забрать все оставшиеся лицензии за один раз.Этот метод является методом слива, следующим образом:
public int drainPermits() {
return sync.drainPermits();
}
Синхронизация реализована следующим образом:
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
Как видите, CAS устанавливает количество лицензий равным 0.
Суммировать
Семафор — это семафор, используемый для управления набором ресурсов. Его внутренний режим совместного использования основан на AQS. Состояние AQS указывает на количество лицензий. Когда количество лицензий недостаточно, поток будет приостановлен, и как только поток освобождает ресурс, можно повторно разбудить ожидающий поток продолжает выполняться.