Параллелизм в Java 8: синхронизация и блокировки

Java задняя часть Безопасность API

Оригинальный адрес:Java 8 Concurrency Tutorial: Synchronization and Locks

Для простоты в примере кода для этого руководства используетсяздесьОпределены два вспомогательных метода,sleep(seconds)иstop(executor)

Synchronized

Особую осторожность необходимо соблюдать, когда мы пишем многопоточный код для доступа к совместно используемым переменным. Ниже приведен пример многопоточности для изменения целого числа.

определить переменнуюcount, определить методincrement()сделатьcountДобавьте 1.

int count = 0;

void increment() {
    count = count + 1;
}

Когда несколько потоков вызываются одновременноincrement()Проблема возникает, когда:

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 10000)
    .forEach(i -> executor.submit(this::increment));

stop(executor);

System.out.println(count);  // 9965

Результат выполнения приведенного выше кода не10000, причина в том, что мы разделяем переменную в разных потоках, не устанавливая условие гонки для доступа к этой переменной.

Чтобы увеличить число, необходимо выполнить три шага: (i) прочитать текущее значение; (ii) увеличить значение на 1; (iii) записать новое значение в переменную; если два потока выполняют эти шаги параллельно, два потоки Шаг 1 может выполняться одновременно, таким образом читая одно и то же текущее значение. Это приводит к потере записи, поэтому фактический результат ниже. В приведенном выше примере 35 приращений теряются из-за одновременного подсчета асинхронного доступа, но вы можете увидеть другие результаты при самостоятельном выполнении кода.

К счастью,Javaранний проходsynchronizedКлючевое слово поддерживает синхронизацию потоков. Мы можем воспользоваться синхронизацией, чтобы решить вышеуказанное состояние гонки при увеличении счетчика:

synchronized void incrementSync() {
    count = count + 1;
}

когда мы используемincrementSync()метод, мы получаем желаемый результат, и результат каждого выполнения выглядит так.

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 10000)
    .forEach(i -> executor.submit(this::incrementSync));

stop(executor);

System.out.println(count);  // 10000

synchronizedКлючи-значения также можно использовать в блоке операторов.

void incrementSync() {
    synchronized (this) {
        count = count + 1;
    }
}

существуетJVMВнутренне использует монитор, также известный как блокировка монитора и внутренняя блокировка, для управления синхронизацией. Этот монитор привязан к объекту, и при использовании синхронизированных методов каждый метод совместно использует монитор соответствующего объекта.

Все неявные мониторы реализуют повторный вход. Повторный вход означает, что блокировка привязана к текущему потоку, и поток может безопасно получить одну и ту же блокировку несколько раз без взаимоблокировки (например, синхронизированный метод вызывает другой синхронизированный метод для того же объекта).

Locks

Помимо использования ключевых словsynchronizedПомимо поддерживаемых неявных блокировок (встроенных блокировок объекта),Concurrency API поддерживаетсяLockРазличные блокировки дисплея, указанные интерфейсом. Блокировки отображения могут управлять более тонкой детализацией, поэтому они также имеют лучшую производительность и логически более четкие.

стандартныйJDKРазличные реализации блокировки дисплея представлены в , которые описаны в следующих разделах.

ReentrantLock

ReentrantLockкласс является мьютексом, он иsynchronizedНеявная блокировка для доступа по ключевому слову имеет ту же функциональность, но имеет расширенную функциональность. Он также реализует реентерабельную функциональность.

Давайте посмотрим, как использоватьReentrantLock

ReentrantLock lock = new ReentrantLock();
int count = 0;

void increment() {
    lock.lock();
    try {
        count++;
    } finally {
        lock.unlock();
    }
}

блокировать черезlock()пройти черезunlock()релиз, инкапсулировать код вtry/finallyБлокировка очень важна для обеспечения того, чтобы блокировка также снималась в случае исключения. Этот метод и использование ключевых словsynchronizedДекорированный метод такой же, как потокобезопасный. Если поток получил блокировку, последующие потоки вызываютlock()Поток приостанавливается до тех пор, пока блокировка не будет снята, и только один поток может получить блокировку.

lockПоддерживает более детальное управление синхронизацией метода, как в следующем коде:

ExecutorService executor = Executors.newFixedThreadPool(2);
ReentrantLock lock = new ReentrantLock();

executor.submit(() -> {
    lock.lock();
    try {
        sleep(1000);
    } finally {
        lock.unlock();
    }
});

executor.submit(() -> {
    System.out.println("Locked: " + lock.isLocked());
    System.out.println("Held by me: " + lock.isHeldByCurrentThread());
    boolean locked = lock.tryLock();
    System.out.println("Lock acquired: " + locked);
});

stop(executor);

Когда первая задача получает блокировку, вторая задача получает информацию о состоянии блокировки:

Locked: true
Held by me: false
Lock acquired: false

в видеlock()Альтернатива методуtryLock()Чтобы попытаться получить блокировку без приостановки текущего потока, вы должны использоватьboolВ результате определяется, действительно ли блокировка получена.

ReadWriteLock

ReadWriteLockУказан другой тип блокировки, блокировка чтения-записи. Логика, реализованная блокировкой чтения-записи, заключается в том, что, когда ни один поток не записывает переменную, другие потоки могут прочитать эту переменную, поэтому, когда ни один поток не удерживает блокировку записи, блокировку чтения могут удерживать все потоки. Если чтение выполняется чаще, чем запись, это повысит производительность и пропускную способность системы.

ExecutorService executor = Executors.newFixedThreadPool(2);
Map<String, String> map = new HashMap<>();
ReadWriteLock lock = new ReentrantReadWriteLock();

executor.submit(() -> {
    lock.writeLock().lock();
    try {
        sleep(1000);
        map.put("foo", "bar");
    } finally {
        lock.writeLock().unlock();
    }
});

В приведенном выше примере сначала устанавливается блокировка записи, затемsleepчерез 1 секунду вmapзаписать значение в , до того, как эта задача завершится, отправляются еще две задачи, пытающиеся получить отmapпрочитанное значение:

Runnable readTask = () -> {
    lock.readLock().lock();
    try {
        System.out.println(map.get("foo"));
        sleep(1000);
    } finally {
        lock.readLock().unlock();
    }
};

executor.submit(readTask);
executor.submit(readTask);

stop(executor);

При выполнении приведенного выше кода вы заметите, что задача с двумя чтениями должна дождаться завершения записи (пока выполняется чтение, запись не может получить блокировку). После снятия блокировки записи две задачи выполняются параллельно, и им не нужно ждать завершения другой, поскольку они могут одновременно удерживать блокировку чтения, пока ни один поток не удерживает блокировку записи.

StampedLock

Java 8обеспечивает новый тип блокировкиStampedLock, как и в приведенном выше примере, он также поддерживает блокировки чтения-записи сReadWriteLockразница в том,StampedLockМетод блокировки возвращаетlongзначение, вы можете использовать это значение, чтобы проверить, снята ли блокировка и действительна ли блокировка. Кроме тогоStampedLockПоддерживается еще один режим, называемый оптимистической блокировкой.

использовать нижеStampedLockзаменитьReadWriteLock

ExecutorService executor = Executors.newFixedThreadPool(2);
Map<String, String> map = new HashMap<>();
StampedLock lock = new StampedLock();

executor.submit(() -> {
    long stamp = lock.writeLock();
    try {
        sleep(1000);
        map.put("foo", "bar");
    } finally {
        lock.unlockWrite(stamp);
    }
});

Runnable readTask = () -> {
    long stamp = lock.readLock();
    try {
        System.out.println(map.get("foo"));
        sleep(1000);
    } finally {
        lock.unlockRead(stamp);
    }
};

executor.submit(readTask);
executor.submit(readTask);

stop(executor);

пройти черезreadLock()иwriteLock()метод для получения блокировки чтения-записи возвращаетfinallyЗначение в блоке для снятия блокировки. Обратите внимание, что блокировка здесь не является повторной. Новое значение возвращается каждый раз, когда оно заблокировано, и оно блокируется без блокировки. Будьте осторожны, чтобы не заблокировать его при использовании.

как спередиReadWriteLockКак и в примере в , две задачи чтения должны ждать, пока задача записи снимет блокировку. Затем выводите результаты на консоль одновременно параллельно.

Следующий пример демонстрирует乐观锁

ExecutorService executor = Executors.newFixedThreadPool(2);
StampedLock lock = new StampedLock();

executor.submit(() -> {
    long stamp = lock.tryOptimisticRead();
    try {
        System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
        sleep(1000);
        System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
        sleep(2000);
        System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
    } finally {
        lock.unlock(stamp);
    }
});

executor.submit(() -> {
    long stamp = lock.writeLock();
    try {
        System.out.println("Write Lock acquired");
        sleep(2000);
    } finally {
        lock.unlock(stamp);
        System.out.println("Write done");
    }
});

stop(executor);

позвонивtryOptimisticRead()получить乐观读写锁,tryOptimisticRead()Всегда возвращайте значение, не блокируя текущий поток и не зависимо от того, доступна ли блокировка. Возвращает, если активна блокировка записи0. в состоянии пройтиlock.validate(stamp)проверить возвращенный токен (longзначение) является действительным.

Выполнение приведенного выше кода выводит:

Optimistic Lock Valid: true
Write Lock acquired
Optimistic Lock Valid: false
Write done
Optimistic Lock Valid: false

Оптимистическая блокировка вступает в силу, как только блокировка получена. В отличие от обычных блокировок чтения, оптимистичные блокировки не препятствуют немедленному захвату другими потоками блокировок записи. После того, как первый поток заснул на одну секунду, второй поток получает блокировку записи, не дожидаясь снятия оптимистичной блокировки чтения. Оптимистичные блокировки чтения больше недействительны, даже если блокировки записи сняты, оптимистичные блокировки чтения по-прежнему недействительны.

Следовательно, при использовании оптимистической блокировки блокировку необходимо проверять после каждого доступа к любой общей переменной, чтобы гарантировать, что чтение все еще действительно.

Иногда бывает полезно преобразовать блокировку чтения в блокировку записи без необходимости разблокировки и повторной блокировки.StampedLockпредусмотрено для этогоtryConvertToWriteLock()метод, как показано в следующем примере:

ExecutorService executor = Executors.newFixedThreadPool(2);
StampedLock lock = new StampedLock();

executor.submit(() -> {
    long stamp = lock.readLock();
    try {
        if (count == 0) {
            stamp = lock.tryConvertToWriteLock(stamp);
            if (stamp == 0L) {
                System.out.println("Could not convert to write lock");
                stamp = lock.writeLock();
            }
            count = 23;
        }
        System.out.println(count);
    } finally {
        lock.unlock(stamp);
    }
});

stop(executor);

Сначала задание получает读锁, и выводит текущее значение счетчика переменных на консоль. Однако, если текущее значение равно 0, мы присваиваем новое значение 23. Мы должны сначала读锁преобразовать в写锁, чтобы не нарушать потенциальный одновременный доступ других потоков. перечислитьtryConvertToWriteLock()Не блокирует, но может возвращать 0, указывая на то, что в настоящее время нет доступных блокировок записи. В этом случае мы вызываем writeLock(), чтобы заблокировать текущий поток, пока не станет доступной блокировка записи.

Semaphores

В дополнение к блокировкам API параллелизма также поддерживает счетные семафоры. Блокировки обычно предоставляют эксклюзивный доступ к переменным или ресурсам, тогда как семафоры поддерживают полный набор лицензий. В разных ситуациях необходимо ограничить количество одновременных доступов к определенным частям приложения.

Вот пример того, как ограничить доступ к длинным задачам:

ExecutorService executor = Executors.newFixedThreadPool(10);

Semaphore semaphore = new Semaphore(5);

Runnable longRunningTask = () -> {
    boolean permit = false;
    try {
        permit = semaphore.tryAcquire(1, TimeUnit.SECONDS);
        if (permit) {
            System.out.println("Semaphore acquired");
            sleep(5000);
        } else {
            System.out.println("Could not acquire semaphore");
        }
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    } finally {
        if (permit) {
            semaphore.release();
        }
    }
};

IntStream.range(0, 10)
    .forEach(i -> executor.submit(longRunningTask));

stop(executor);

Исполнители могут работать одновременно10задачи, но мы используем5семафор, тем самым ограничивая одновременный доступ к5Кусок. использоватьtry/finallyблок, очень важно правильно освободить семафор даже в случае исключения.

Запуск приведенного выше кода выводит:

Semaphore acquired
Semaphore acquired
Semaphore acquired
Semaphore acquired
Semaphore acquired
Could not acquire semaphore
Could not acquire semaphore
Could not acquire semaphore
Could not acquire semaphore
Could not acquire semaphore

когда есть5После того, как задача получает количество модели, последующие задачи не могут получить семафор. Но если перед5задача выполнена,finally 块释放了型号量, последующие потоки могут получить количество звездочек, и общее количество не превысит5Кусок. позвони сюдаtryAcquire()Для получения суммы модели устанавливается тайм-аут в 1 секунду, что означает, что когда потоку не удается получить семафор, он может заблокироваться и ждать 1 секунду, прежде чем получить его.