Оригинальный адрес:Java 8 Concurrency Tutorial: Threads and Executors
Java 5
впервые представилConcurrency API, и будет постоянно оптимизироваться и улучшаться в последующих выпусках. Большинство концепций, описанных в этой статье, применимы и к более ранним версиям. Мои образцы кода в основном сосредоточены наJava 8
, и применяются в большом количествеlambda
выражения и некоторые новые функции. Если вы не знакомы сlambda
выражение, рекомендуется сначала прочитатьJava 8 Tutorial.
Threads
иRunnables
Все современные операционные системы进程
и线程
для поддержки параллелизма.进程
Обычно экземпляры программы запускаются независимо друг от друга. Например, вы начинаетеJava
программы, операционная система создаст новую进程
работать параллельно с другими программами. В этих进程
доступно в线程
Выполняйте код одновременно. чтобы мы могли в полной мере воспользоватьсяCPU
.
Java
отJDK 1.0
поддержка с самого начала线程
. начиная новый线程
Прежде необходимо указать код для запуска, который обычно называетсяTask
. Следующее достигается за счетRunnable
Пример интерфейса для запуска нового потока:
Runnable task = () -> {
String threadName = Thread.currentThread().getName();
System.out.println("Hello " + threadName);
};
task.run();
Thread thread = new Thread(task);
thread.start();
System.out.println("Done!");
так какRunnable
Является函数式
интерфейс, мы можем использоватьlambda
выражение для вывода имени потока на консоль. Выполняем прямо в основном потокеRunnable
, затем запустите новый поток. В консоли вы увидите такой результат:
Hello main
Hello Thread-0
Done!
или:
Hello main
Done!
Hello Thread-0
Так как это并发
исполнение, мы не можем предсказатьRunnable
печатаетDone
Вызывается до или после, порядок не является неопределенным, поэтому并发编程
стать сложной задачей при разработке больших приложений.
Потоки также могут находиться в спящем режиме в течение определенного периода времени, как в следующем примере:
Runnable runnable = () -> {
try {
String name = Thread.currentThread().getName();
System.out.println("Foo " + name);
TimeUnit.SECONDS.sleep(1);
System.out.println("Bar " + name);
}
catch (InterruptedException e) {
e.printStackTrace();
}
};
Thread thread = new Thread(runnable);
thread.start();
Выполнение приведенного выше кода сделает паузу на 1 секунду между двумя операторами печати.TimeUnit
представляет собой перечисление единиц времени, или к нему можно получить доступ, вызвавThread.sleep(1000)
выполнить.
использоватьThread
Занятия могут быть очень утомительными и подверженными ошибкам. По этой причине в 2004 г.Java 5
версия введенаConcurrency API
.API
родыjava.util.concurrent
Пакет содержит множество полезных классов для параллельного программирования. С тех пор каждый новый выпускJava
Версии имеют повышенный параллелизмAPI
,Java 8
Также предусмотрены новые классы и методы для обработки параллелизма.
Теперь давайте посмотрим поближеConcurrency API
Самая важная часть в -executor services
.
Executors
Concurrency API
представилExecutorService
Вместо этого используется концепция , как высокоуровневый способ работы с потоками.Threads
.Executors
Может выполнять задачи асинхронно и обычно управляет пулом потоков. Таким образом, нам не нужно вручную создавать потоки, все потоки в пуле потоков будут использоваться повторно. так что вexecutor service
Запускайте как можно больше одновременных задач на протяжении всего жизненного цикла приложения.
Ниже приведен простойexecutors
пример:
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
String threadName = Thread.currentThread().getName();
System.out.println("Hello " + threadName);
});
// => Hello pool-1-thread-1
Executors
Классы предоставляют удобные фабричные методы для создания различных типовexecutor services
. В этом примере используется только один поток выполненияexecutor
.
Результат выполнения похож на пример выше, но вы заметите важное отличие:Java
Процесс никогда не останавливается, исполнитель должен остановить его явно, иначе он будет продолжать принимать новые задачи.
ExecutorService
Для этого предусмотрено два метода:shutdown()
Дождитесь завершения текущей задачи иshutdownNow()
Все запущенные задачи прерываются, а исполнитель немедленно закрывается. существуетshudown
После этого никакие задачи больше не могут быть отправлены в пул потоков.
Вот мой предпочтительный способ закрыть программу:
try {
System.out.println("attempt to shutdown executor");
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
System.err.println("tasks interrupted");
}
finally {
if (!executor.isTerminated()) {
System.err.println("cancel non-finished tasks");
}
executor.shutdownNow();
System.out.println("shutdown finished");
}
вызов исполнителяshutdown
закрытиеexecutor
, после ожидания в течение 5 секунд она будет вызвана независимо от того, завершилась задача или нет.shutdownNow
Завершите работу, прервав текущую задачу.
Callables и фьючерсы
КромеRunnable
за пределами,executors
также поддерживаетCallable
задачи иRunnable
То же самое — функциональный интерфейс, но у него есть возвращаемое значение.
Ниже приведено использованиеlambda
выражение определеноCallable
, который возвращает целочисленное значение после ожидания в течение 1 секунды.
Callable<Integer> task = () -> {
try {
TimeUnit.SECONDS.sleep(1);
return 123;
}
catch (InterruptedException e) {
throw new IllegalStateException("task interrupted", e);
}
};
иRunnable
Такой же,Callable
можно также представитьexecutor services
, но каков результат выполнения? так какsubmit()
Не дожидайтесь завершения выполнения задачи,executor service
Результат вызова не может быть возвращен напрямую. Соответственно, он возвращаетFuture
введите результат, используйтеFuture
Фактический результат выполнения может быть получен.
ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> future = executor.submit(task);
System.out.println("future done? " + future.isDone());
Integer result = future.get();
System.out.println("future done? " + future.isDone());
System.out.print("result: " + result);
в волеCallable
подчинятьсяexecutor
, сначала поisDone()
Проверятьfuture
Завершено ли выполнение. Я уверен, что это не так, поскольку приведенный выше вызов спит в течение 1 секунды, прежде чем вернуть целое число.
метод вызоваget()
заблокирует текущий поток до тех пор, покаcallable
Выполнение завершено и возвращает результат, теперьfuture
Выполнение завершается и в консоль выводятся следующие результаты:
future done? false
future done? true
result: 123
Future
иexecutor service
Плотно связан, если закрытexecutor service
, каждыйFuture
вызовет исключение.
executor.shutdownNow();
future.get();
Создано здесьexecutor
Способ отличается от предыдущего примера, здесь используйтеnewFixedThreadPool(1)
для создания пула потоков с количеством потоков 1 для поддержкиexecutor
, Это эквивалентноnewSingleThreadExecutor()
, позже мы увеличим размер пула потоков, передав значение больше 1.
Timeouts
любая параfuture.get()
звонки будут блокироваться и ждатьCallable
был прекращен. В худшем случае вызываемый объект будет работать вечно, делая приложение невосприимчивым. Этим ситуациям можно противодействовать, просто отключив время:
ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> future = executor.submit(() -> {
try {
TimeUnit.SECONDS.sleep(2);
return 123;
}
catch (InterruptedException e) {
throw new IllegalStateException("task interrupted", e);
}
});
future.get(1, TimeUnit.SECONDS);
Выполнение приведенного выше кода вызоветTimeoutException
Exception in thread "main" java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.get(FutureTask.java:205)
Указано максимальное время ожидания 1 секунда, но вызываемый объект фактически занимает 2 секунды, прежде чем вернуть результат.
InvokeAll
Executors
поддержка черезinvokeAll()
Отправить несколько пакетовCallable
. Этот метод принимаетCallable
параметр коллекции типов и возвращаетFuture
ТипList
.
ExecutorService executor = Executors.newWorkStealingPool();
List<Callable<String>> callables = Arrays.asList(
() -> "task1",
() -> "task2",
() -> "task3");
executor.invokeAll(callables)
.stream()
.map(future -> {
try {
return future.get();
}
catch (Exception e) {
throw new IllegalStateException(e);
}
})
.forEach(System.out::println);
В этом примере мы используемJava 8
поток для обработкиinvokeAll
Все вернулись по звонкуFuture
. Сначала мы сопоставляем каждыйFuture
, затем выведите каждое значение на консоль. Если вы еще не знакомы с потоками, прочтитеJava 8 Stream Tutorial.
InvokeAny
Другой способ пакетной отправки вызываемого объекта:invokeAny()
, что то же самое, чтоinvokeAll()
Немного отличается. Этот метод не возвращает всеFuture
объект, который возвращает только результат первой выполненной задачи.
Callable<String> callable(String result, long sleepSeconds) {
return () -> {
TimeUnit.SECONDS.sleep(sleepSeconds);
return result;
};
}
Мы используем этот метод для созданияCallable
. пройти черезinvokeAny()
Отправьте эти вызовы наexecutor
, возвращает самый быстрый результат выполнения, в данном случае task2:
ExecutorService executor = Executors.newWorkStealingPool();
List<Callable<String>> callables = Arrays.asList(
callable("task1", 2),
callable("task2", 1),
callable("task3", 3));
String result = executor.invokeAny(callables);
System.out.println(result);
// => task2
В приведенном выше примере используетсяnewWorkStealingPool()
создал еще один видexecutor
. Этот заводской методJava 8
часть и возвращает типForkJoinPool
изexecutor
, это то же самое, что и обычноеexecutor
Немного отличается. Он не использует пул потоков фиксированного размера, по умолчанию это количество ядер, доступных центральному процессору.
Scheduled Executors
Мы научились, какExecutors
Отправляйте и запускайте задачи на . Чтобы периодически запускать задачу несколько раз, мы можем использоватьscheduled thread pools
.
ScheduledExecutorService
Возможность планировать выполнение задач периодически или один раз через определенный период времени.
Следующий код представляет собой пример задачи, которая запускается через три секунды:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());
ScheduledFuture<?> future = executor.schedule(task, 3, TimeUnit.SECONDS);
TimeUnit.MILLISECONDS.sleep(1337);
long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS);
System.out.printf("Remaining Delay: %sms", remainingDelay);
Планирование задачи производит типScheduledFuture
стоимость, кромеFuture
Кроме того, он также обеспечиваетgetDelay()
метод для получения оставшегося времени для выполнения задачи.
Для запланированных задачexecutor
Предусмотрено два методаscheduleAtFixedRate()
иscheduleWithFixedDelay()
. Первый метод способен выполнять задачи с фиксированными интервалами времени, например, один раз в секунду:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());
int initialDelay = 0;
int period = 1;
executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);
Кроме того, этот метод также может установить время задержки, которое описывает, как долго ждать перед выполнением задачи в первый раз.
scheduleWithFixedDelay()
метод сscheduleAtFixedRate()
немного отличается, разница в их времени ожидания,scheduleWithFixedDelay()
Время ожидания устанавливается между окончанием предыдущей задачи и началом следующей задачи.
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Runnable task = () -> {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("Scheduling: " + System.nanoTime());
}
catch (InterruptedException e) {
System.err.println("task interrupted");
}
};
executor.scheduleWithFixedDelay(task, 0, 1, TimeUnit.SECONDS);
В этом примере задержка составляет 1 секунду между окончанием выполнения и началом следующего выполнения. Начальная задержка равна 0, а продолжительность задачи составляет 2 секунды. Таким образом, мы получаем интервал выполнения 0 с, 3 с, 6 с, 9 с и т. д.