Многопоточность Java — подробное объяснение параллельного класса инструментов Semaphore

Java

Введение

Semaphore — это вспомогательный инструмент синхронизации, который переводится как семафор, который используется для реализации управления потоком, который может контролировать количество обращений к ресурсам одновременно.

Будь то Synchroniezd или ReentrantLock, только одному потоку разрешен доступ к одному ресурсу за раз, но Semaphore может указать несколько потоков для одновременного доступа к ресурсу.

У семафора есть конструктор, который может передать целое число n, что означает, что к определенному фрагменту кода могут получить доступ не более n потоков. Если он превышает n, подождите, пока поток завершит выполнение этого блока кода, а следующий поток Повторно введите.

На семафоре определены две операции:

  • получение (приобретение): когда поток вызывает операцию получения, он либо успешно получает семафор (семафор минус 1), либо ждет, пока поток освободит семафор, либо истечет время ожидания, семафор будет поддерживать внутреннюю очередь ожидания. подвесные нити.
  • release (выпуск) фактически увеличивает значение семафора на 1 и затем будит произвольный ожидающий поток в очереди ожидания соответствующего экземпляра Sepmaphore.

Сценарии применения

Семафоры в основном используются для двух целей:

  • Для взаимоисключающего использования нескольких общих ресурсов.
  • Контроль количества одновременных потоков.

пример

Следующий пример: 5 потоков захватывают 3 парковочных места, и не более 3 потоков могут одновременно захватывать парковочное место и могут захватывать парковочное место после того, как другие потоки освобождают семафор.

	public static void main(String[] args) {
		Semaphore semaphore = new Semaphore(3);

		for (int i = 0; i < 5; i++) {
			new Thread(new Runnable() {
				@Override
				public void run() {
					try {
						semaphore.acquire();//申请资源
						System.out.println(Thread.currentThread().getName()+"抢到车位");
						ThreadUtil.sleep(RandomUtil.randomInt(1000,5000));
						System.out.println(Thread.currentThread().getName()+"归还车位");
					} catch (InterruptedException e) {
						e.printStackTrace();
					}finally {
					    //释放资源
						semaphore.release();
					}

				}
			},"线程"+i).start();
		}
	}

Меры предосторожности

  • Semaphore.acquire() и Semaphore.release() всегда используются парами, что должно гарантироваться самим кодом приложения.
  • Вызов Semaphore.release() должен быть помещен в блок finally, чтобы избежать возврата семафора, полученного текущим потоком, в случае возникновения исключения в коде приложения.
  • Если значение параметра разрешает в конструкторе семафора равно 1, созданный семафор эквивалентен мьютексу.В отличие от других мьютексов, этот мьютекс позволяет одному потоку снять блокировку, удерживаемую другим потоком, поскольку поток может выполнить соответствующий семафор. release() без выполнения Semaphore.acquire().
  • По умолчанию Semaphore использует политику несправедливого планирования.

принцип

 abstract static class Sync extends AbstractQueuedSynchronizer {
    //省略
 }

Semaphore использует класс Sync для внутреннего использования, а Sync наследует AbstractQueuedSynchronizer, поэтому нижний уровень Sync по-прежнему реализуется с помощью AQS. Sync имеет два класса реализации, NonfairSync и FairSync, которые используются для указания того, следует ли использовать справедливую стратегию при получении семафоров.

метод инициализации

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}


public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

Sync(int permits) {
    setState(permits);
}

Как показано выше, Semaphore по умолчанию использует нечестную стратегию.Если вам нужно использовать справедливую стратегию, вы можете использовать конструктор с двумя параметрами для создания объекта Semaphore.

Параметр разрешения передается в значение состояния AQS, которое используется для указания количества удерживаемых в данный момент семафоров.

недействительный метод получения ()

Целью текущего потока, вызывающего этот метод, является получение ресурса семафора.

Если текущее количество семафоров больше 0, счетчик текущего семафора будет уменьшен на 1, а затем метод вернется напрямую. В противном случае, если текущее количество семафоров равно 0, текущий поток будет помещен в очередь блокировки AQS. Когда другие потоки вызывают метод interrupt() текущего потока, чтобы прервать текущий поток, текущий поток выдает InterruptedException и возвращается.

//Semaphore方法
public void acquire() throws InterruptedException {
    //传递参数为1,说明要获取1个信号量资源
    sync.acquireSharedInterruptibly(1);
}

//AQS的方法
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    //(1)如果线程被中断,则抛出中断异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //(2)否则调用Sync子类方法尝试获取,这里根据构造函数确定使用公平策略
    if (tryAcquireShared(arg) < 0)
        //如果获取失败则放入阻塞队列.然后再次尝试,如果使用则调用park方法挂起当前线程
        doAcquireSharedInterruptibly(arg);
}

Как видно из вышеприведенного кода, методAcquire() внутри вызывает методAcquireSharedInterruptably из Sync, который будет реагировать на прерывание (если текущий поток будет прерван, будет сгенерировано исключение прерывания). Метод AQS tryAcquireShared, который пытается получить ресурсы семафора, реализован подклассом Sync, поэтому здесь он будет обсуждаться с двух сторон.

Сначала обсудим метод tryAcquireShared класса NonfairSync, нечестную стратегию, код выглядит следующим образом:

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        //获取当前信号量值
        int available = getState();
        //计算当前剩余值
        int remaining = available - acquires;
        //如果当前剩余值小于0或则CAS设置成功则返回
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

Приведенный выше код сначала получает текущее значение семафора (доступно), а затем вычитает значение, которое необходимо получить (приобретает), чтобы получить оставшееся количество семафоров (осталось).Если оставшееся значение меньше 0, это означает, что текущее число семафоров не может удовлетворить спрос. Затем верните отрицательное число напрямую, и текущий поток будет помещен в очередь блокировки AQS и приостановлен. Если оставшееся значение больше 0, используйте операцию CAS, чтобы установить для текущего значения семафора оставшееся значение, а затем вернуть оставшееся значение.

Кроме того, поскольку NonFairSync получен несправедливо, то есть поток, который первым вызывает метод aquire для получения семафора, не обязательно может получить семафор раньше последнего.

Рассмотрим следующий сценарий: если поток A сначала вызывает метод aquire() для получения семафора, но текущее количество семафоров равно 0, то поток A будет помещен в очередь блокировки AQS. Через некоторое время поток C вызывает метод release() для освобождения семафора.Если никакой другой поток в настоящее время не получает семафор, то поток A будет активирован и получит семафор, но если поток C освободит семафор, поток C вызовет метод require, то поток C будет конкурировать с потоком A за этот ресурс семафора. Если принята недобросовестная стратегия, из кода nonfairTryAcquireShared видно, что поток C может получить семафор до того, как поток A будет активирован, или до того, как поток A будет активирован, то есть поток и запрошенный в данный момент поток блокируются в этом потоке. Это конкурентные отношения, а не стратегия «первым пришел – первым обслужен».

Давайте посмотрим, как класс честности FairSync обеспечивает справедливость.

protected int tryAcquireShared(int acquires) {
    for (;;) {
        //查询是否当前线程节点的前驱节点也在等待获取该资源,有的话直接返回
        if (hasQueuedPredecessors())
            return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

Видно, что справедливость по-прежнему гарантируется функцией hasQueuedPredecessors. Таким образом, стратегия справедливости Semaphore заключается в том, чтобы увидеть, ожидает ли узел-предшественник текущего узла потока, чтобы получить ресурс.Если это так, он откажется от разрешения на получение, а затем текущий поток будет помещен в очередь блокировки AQS, в противном случае он будет приобретен.

аннулировать метод получения (интернет-разрешения)

Этот метод отличается от методаAcquire(), в последнем нужно получить только значение семафора, в то время как первый получает разрешения.

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

Пустой метод AcquireUninterruptably()

Этот метод похож наAcquire(), за исключением того, что этот метод не реагирует на прерывания, то есть когда текущий поток вызываетAcquireUninterruptably для получения ресурсов (в том числе после блокировки), другие потоки вызывают метод interrupt() текущего потока. чтобы установить флаг прерывания текущего потока.В это время текущий поток не будет генерировать исключение IllegalArgumentException и возвращаться.

public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
}

недействительный метод выпуска ()

Функция этого метода состоит в том, чтобы увеличить значение семафора текущего объекта Semaphore на 1. Если текущий поток помещен в очередь блокировки AQS из-за того, что вызов метода aquire заблокирован, количество семафоров будет выбрано в соответствии с политика справедливости.Активированный поток попытается получить только что увеличенный семафор.

public void release() {
    //(1)arg=1
    sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
    //(2)尝试释放资源
    if (tryReleaseShared(arg)) {
        //(3)资源释放成功则调用park方法唤醒AQS队列里面最先挂起的线程
        doReleaseShared();
        return true;
    }
    return false;
}

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        //获取当前信号量值
        int current = getState();
        //将当前信号量值增加releases,这里为增加1
        int next = current + releases;
        //移除处理
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        //使用CAS保证更新信号量值的原子性
        if (compareAndSetState(current, next))
            return true;
    }
}

Из кода release()->sync.releaseShared(1) видно, что метод release каждый раз увеличивает значение семафора только на 1, а метод tryReleaseShared представляет собой бесконечный цикл Использование CAS обеспечивает атомарную работу release, увеличивая семафор на 1. Метод .tryReleaseShared выполнит код (3) после успешного увеличения значения семафора, т. е. вызов метода AQS для активации потока, заблокированного вызовом метода получения.

Метод недействительного выпуска (внутренние разрешения)

Разница между этим методом и методом освобождения без параметров заключается в том, что первый будет увеличивать количество разрешений на основе исходного значения семафора каждый раз, когда он вызывается, а второй каждый раз будет увеличивать l.

public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}

Кроме того, вы можете видеть, что sync.releaseShared здесь является общим методом, что означает, что семафор используется совместно потоками, и семафор не привязан к фиксированному потоку Несколько потоков могут использовать CAS для обновления значения семафора в в то же время без блокировки.