Параллелизм в Java 8: потоки и исполнители

Java задняя часть Операционная система API

Оригинальный адрес:Java 8 Concurrency Tutorial: Threads and Executors

Java 5впервые представилConcurrency API, и будет постоянно оптимизироваться и улучшаться в последующих выпусках. Большинство концепций, описанных в этой статье, применимы и к более ранним версиям. Мои образцы кода в основном сосредоточены наJava 8, и применяются в большом количествеlambdaвыражения и некоторые новые функции. Если вы не знакомы сlambdaвыражение, рекомендуется сначала прочитатьJava 8 Tutorial.

ThreadsиRunnables

Все современные операционные системы进程и线程для поддержки параллелизма.进程Обычно экземпляры программы запускаются независимо друг от друга. Например, вы начинаетеJavaпрограммы, операционная система создаст новую进程работать параллельно с другими программами. В этих进程доступно в线程Выполняйте код одновременно. чтобы мы могли в полной мере воспользоватьсяCPU.

JavaотJDK 1.0поддержка с самого начала线程. начиная новый线程Прежде необходимо указать код для запуска, который обычно называетсяTask. Следующее достигается за счетRunnableПример интерфейса для запуска нового потока:

Runnable task = () -> {
    String threadName = Thread.currentThread().getName();
    System.out.println("Hello " + threadName);
};

task.run();

Thread thread = new Thread(task);
thread.start();

System.out.println("Done!");

так какRunnableЯвляется函数式интерфейс, мы можем использоватьlambdaвыражение для вывода имени потока на консоль. Выполняем прямо в основном потокеRunnable, затем запустите новый поток. В консоли вы увидите такой результат:

Hello main
Hello Thread-0
Done!

или:

Hello main
Done!
Hello Thread-0

Так как это并发исполнение, мы не можем предсказатьRunnableпечатаетDoneВызывается до или после, порядок не является неопределенным, поэтому并发编程стать сложной задачей при разработке больших приложений.

Потоки также могут находиться в спящем режиме в течение определенного периода времени, как в следующем примере:

Runnable runnable = () -> {
    try {
        String name = Thread.currentThread().getName();
        System.out.println("Foo " + name);
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Bar " + name);
    }
    catch (InterruptedException e) {
        e.printStackTrace();
    }
};

Thread thread = new Thread(runnable);
thread.start();

Выполнение приведенного выше кода сделает паузу на 1 секунду между двумя операторами печати.TimeUnitпредставляет собой перечисление единиц времени, или к нему можно получить доступ, вызвавThread.sleep(1000)выполнить.

использоватьThreadЗанятия могут быть очень утомительными и подверженными ошибкам. По этой причине в 2004 г.Java 5версия введенаConcurrency API.APIродыjava.util.concurrentПакет содержит множество полезных классов для параллельного программирования. С тех пор каждый новый выпускJavaВерсии имеют повышенный параллелизмAPI,Java 8Также предусмотрены новые классы и методы для обработки параллелизма.

Теперь давайте посмотрим поближеConcurrency APIСамая важная часть в -executor services.

Executors

Concurrency APIпредставилExecutorServiceВместо этого используется концепция , как высокоуровневый способ работы с потоками.Threads.ExecutorsМожет выполнять задачи асинхронно и обычно управляет пулом потоков. Таким образом, нам не нужно вручную создавать потоки, все потоки в пуле потоков будут использоваться повторно. так что вexecutor serviceЗапускайте как можно больше одновременных задач на протяжении всего жизненного цикла приложения.

Ниже приведен простойexecutorsпример:

ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
    String threadName = Thread.currentThread().getName();
    System.out.println("Hello " + threadName);
});

// => Hello pool-1-thread-1

ExecutorsКлассы предоставляют удобные фабричные методы для создания различных типовexecutor services. В этом примере используется только один поток выполненияexecutor.

Результат выполнения похож на пример выше, но вы заметите важное отличие:JavaПроцесс никогда не останавливается, исполнитель должен остановить его явно, иначе он будет продолжать принимать новые задачи.

ExecutorServiceДля этого предусмотрено два метода:shutdown()Дождитесь завершения текущей задачи иshutdownNow()Все запущенные задачи прерываются, а исполнитель немедленно закрывается. существуетshudownПосле этого никакие задачи больше не могут быть отправлены в пул потоков.

Вот мой предпочтительный способ закрыть программу:

try {
    System.out.println("attempt to shutdown executor");
    executor.shutdown();
    executor.awaitTermination(5, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
    System.err.println("tasks interrupted");
}
finally {
    if (!executor.isTerminated()) {
        System.err.println("cancel non-finished tasks");
    }
    executor.shutdownNow();
    System.out.println("shutdown finished");
}

вызов исполнителяshutdownзакрытиеexecutor, после ожидания в течение 5 секунд она будет вызвана независимо от того, завершилась задача или нет.shutdownNowЗавершите работу, прервав текущую задачу.

Callables и фьючерсы

КромеRunnableза пределами,executorsтакже поддерживаетCallableзадачи иRunnableТо же самое — функциональный интерфейс, но у него есть возвращаемое значение.

Ниже приведено использованиеlambdaвыражение определеноCallable, который возвращает целочисленное значение после ожидания в течение 1 секунды.

Callable<Integer> task = () -> {
    try {
        TimeUnit.SECONDS.sleep(1);
        return 123;
    }
    catch (InterruptedException e) {
        throw new IllegalStateException("task interrupted", e);
    }
};

иRunnableТакой же,Callableможно также представитьexecutor services, но каков результат выполнения? так какsubmit()Не дожидайтесь завершения выполнения задачи,executor serviceРезультат вызова не может быть возвращен напрямую. Соответственно, он возвращаетFutureвведите результат, используйтеFutureФактический результат выполнения может быть получен.

ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> future = executor.submit(task);

System.out.println("future done? " + future.isDone());

Integer result = future.get();

System.out.println("future done? " + future.isDone());
System.out.print("result: " + result);

в волеCallableподчинятьсяexecutor, сначала поisDone()ПроверятьfutureЗавершено ли выполнение. Я уверен, что это не так, поскольку приведенный выше вызов спит в течение 1 секунды, прежде чем вернуть целое число.

метод вызоваget()заблокирует текущий поток до тех пор, покаcallableВыполнение завершено и возвращает результат, теперьfutureВыполнение завершается и в консоль выводятся следующие результаты:

future done? false
future done? true
result: 123

Futureиexecutor serviceПлотно связан, если закрытexecutor service, каждыйFutureвызовет исключение.

executor.shutdownNow();
future.get();

Создано здесьexecutorСпособ отличается от предыдущего примера, здесь используйтеnewFixedThreadPool(1)для создания пула потоков с количеством потоков 1 для поддержкиexecutor, Это эквивалентноnewSingleThreadExecutor(), позже мы увеличим размер пула потоков, передав значение больше 1.

Timeouts

любая параfuture.get()звонки будут блокироваться и ждатьCallableбыл прекращен. В худшем случае вызываемый объект будет работать вечно, делая приложение невосприимчивым. Этим ситуациям можно противодействовать, просто отключив время:

ExecutorService executor = Executors.newFixedThreadPool(1);

Future<Integer> future = executor.submit(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
        return 123;
    }
    catch (InterruptedException e) {
        throw new IllegalStateException("task interrupted", e);
    }
});

future.get(1, TimeUnit.SECONDS);

Выполнение приведенного выше кода вызоветTimeoutException

Exception in thread "main" java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask.get(FutureTask.java:205)

Указано максимальное время ожидания 1 секунда, но вызываемый объект фактически занимает 2 секунды, прежде чем вернуть результат.

InvokeAll

Executorsподдержка черезinvokeAll()Отправить несколько пакетовCallable. Этот метод принимаетCallableпараметр коллекции типов и возвращаетFutureТипList.

ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
        () -> "task1",
        () -> "task2",
        () -> "task3");

executor.invokeAll(callables)
    .stream()
    .map(future -> {
        try {
            return future.get();
        }
        catch (Exception e) {
            throw new IllegalStateException(e);
        }
    })
    .forEach(System.out::println);

В этом примере мы используемJava 8поток для обработкиinvokeAllВсе вернулись по звонкуFuture. Сначала мы сопоставляем каждыйFuture, затем выведите каждое значение на консоль. Если вы еще не знакомы с потоками, прочтитеJava 8 Stream Tutorial.

InvokeAny

Другой способ пакетной отправки вызываемого объекта:invokeAny(), что то же самое, чтоinvokeAll()Немного отличается. Этот метод не возвращает всеFutureобъект, который возвращает только результат первой выполненной задачи.

Callable<String> callable(String result, long sleepSeconds) {
    return () -> {
        TimeUnit.SECONDS.sleep(sleepSeconds);
        return result;
    };
}

Мы используем этот метод для созданияCallable. пройти черезinvokeAny()Отправьте эти вызовы наexecutor, возвращает самый быстрый результат выполнения, в данном случае task2:

ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
    callable("task1", 2),
    callable("task2", 1),
    callable("task3", 3));

String result = executor.invokeAny(callables);
System.out.println(result);

// => task2

В приведенном выше примере используетсяnewWorkStealingPool()создал еще один видexecutor. Этот заводской методJava 8часть и возвращает типForkJoinPoolизexecutor, это то же самое, что и обычноеexecutorНемного отличается. Он не использует пул потоков фиксированного размера, по умолчанию это количество ядер, доступных центральному процессору.

Scheduled Executors

Мы научились, какExecutorsОтправляйте и запускайте задачи на . Чтобы периодически запускать задачу несколько раз, мы можем использоватьscheduled thread pools.

ScheduledExecutorServiceВозможность планировать выполнение задач периодически или один раз через определенный период времени.

Следующий код представляет собой пример задачи, которая запускается через три секунды:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());
ScheduledFuture<?> future = executor.schedule(task, 3, TimeUnit.SECONDS);

TimeUnit.MILLISECONDS.sleep(1337);

long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS);
System.out.printf("Remaining Delay: %sms", remainingDelay);

Планирование задачи производит типScheduledFutureстоимость, кромеFutureКроме того, он также обеспечиваетgetDelay()метод для получения оставшегося времени для выполнения задачи.

Для запланированных задачexecutorПредусмотрено два методаscheduleAtFixedRate()иscheduleWithFixedDelay(). Первый метод способен выполнять задачи с фиксированными интервалами времени, например, один раз в секунду:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());

int initialDelay = 0;
int period = 1;
executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);

Кроме того, этот метод также может установить время задержки, которое описывает, как долго ждать перед выполнением задачи в первый раз.

scheduleWithFixedDelay()метод сscheduleAtFixedRate()немного отличается, разница в их времени ожидания,scheduleWithFixedDelay()Время ожидания устанавливается между окончанием предыдущей задачи и началом следующей задачи.

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

Runnable task = () -> {
    try {
        TimeUnit.SECONDS.sleep(2);
        System.out.println("Scheduling: " + System.nanoTime());
    }
    catch (InterruptedException e) {
        System.err.println("task interrupted");
    }
};

executor.scheduleWithFixedDelay(task, 0, 1, TimeUnit.SECONDS);

В этом примере задержка составляет 1 секунду между окончанием выполнения и началом следующего выполнения. Начальная задержка равна 0, а продолжительность задачи составляет 2 секунды. Таким образом, мы получаем интервал выполнения 0 с, 3 с, 6 с, 9 с и т. д.