Future
Future — это добавленный в Java5 класс, который используется для описания результата асинхронного вычисления. Вы можете использовать метод isDone, чтобы проверить, завершено ли вычисление, или использовать метод get, чтобы заблокировать вызывающий поток, пока вычисление не будет завершено и не будет возвращен результат. Вы также можете использовать метод отмены, чтобы остановить выполнение задачи. Вот каштан:
public class FutureDemo {
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(10);
Future<Integer> f = es.submit(() ->{
Thread.sleep(10000);
// 结果
return 100;
});
// do something
Integer result = f.get();
System.out.println(result);
// while (f.isDone()) {
// System.out.println(result);
// }
}
}
В этом примере мы отправляем задачу в пул потоков и немедленно возвращаем объект Future, затем мы можем выполнить некоторые другие операции и, наконец, использовать его метод get для блокировки ожидания результата или метод isDone для опроса результата (объект принцип будущего Вы можете обратиться к предыдущим статьям:[Параллельное программирование] Будущий режим и реализация в JDK)
Хотя эти методы и обеспечивают возможность выполнения задач асинхронно, все равно получать результаты очень неудобно, а результаты задач можно получить только блокировкой или опросом.
Метод блокировки явно противоречит первоначальному замыслу нашего асинхронного программирования.Метод опроса будет потреблять ненужные ресурсы ЦП, и результат вычисления не может быть получен вовремя.Почему мы не можем использовать шаблон проектирования наблюдателя, чтобы вовремя уведомить слушателя когда результат расчета завершен Шерстяная ткань?
Многие языки, такие как Node.js, используют обратный вызов для реализации асинхронного программирования. Некоторые платформы Java, такие как Netty, расширяют интерфейс Java Future и предоставляют несколько методов расширения, таких как addListener. Гуава Google также предоставляет общее расширение Future: ListenableFuture, SettableFuture и вспомогательные классы Futures и т. д. для облегчения асинхронного программирования. С этой целью в Java наконец-то добавлен более функциональный класс Future в версии JDK1.8: CompletableFuture. Он предоставляет очень мощную функцию расширения Future, которая может помочь нам упростить сложность асинхронного программирования, предоставляет возможность функционального программирования и может обрабатывать результаты вычислений с помощью обратных вызовов. Давайте рассмотрим эти способы.
Netty-Future
Введите зависимости Maven:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.29.Final</version>
</dependency>
public class NettyFutureDemo {
public static void main(String[] args) throws InterruptedException {
EventExecutorGroup group = new DefaultEventExecutorGroup(4);
System.out.println("开始:" + DateUtils.getNow());
Future<Integer> f = group.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("开始耗时计算:" + DateUtils.getNow());
Thread.sleep(10000);
System.out.println("结束耗时计算:" + DateUtils.getNow());
return 100;
}
});
f.addListener(new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> objectFuture) throws Exception {
System.out.println("计算结果:" + objectFuture.get());
}
});
System.out.println("结束:" + DateUtils.getNow());
// 不让守护线程退出
new CountDownLatch(1).await();
}
}
Выходной результат:
开始:2019-05-16 08:25:40:779
结束:2019-05-16 08:25:40:788
开始耗时计算:2019-05-16 08:25:40:788
结束耗时计算:2019-05-16 08:25:50:789
计算结果:100
Из результатов видно, что метод завершения Listener автоматически запускается после завершения трудоемких вычислений, избегая ненужной блокировки и ожидания основного потока, так как же он это делает? См. исходный код ниже
DefaultEventExecutorGroup реализует интерфейс EventExecutorGroup, а EventExecutorGroup — это интерфейс группы потоков, реализующий интерфейс JDK ScheduledExecutorService, поэтому он имеет все методы пула потоков. Однако он переписывает все методы, которые возвращают java.util.concurrent.Future, чтобы они возвращали io.netty.util.concurrent.Future, и все методы, которые возвращают java.util.concurrent.ScheduledFuture, чтобы возвращать io.netty.util .concurrent.ScheduledFuture.
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
/**
* 返回一个EventExecutor
*/
EventExecutor next();
Iterator<EventExecutor> iterator();
Future<?> submit(Runnable task);
<T> Future<T> submit(Runnable task, T result);
<T> Future<T> submit(Callable<T> task);
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}
Метод submit группы EventExecutorGroup возвращает класс реализации Netty Future из-за перезаписи newTaskFor, и этот класс реализации — PromiseTask.
@Override
public <T> Future<T> submit(Callable<T> task) {
return (Future<T>) super.submit(task);
}
@Override
protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new PromiseTask<T>(this, callable);
}
Реализация PromiseTask очень проста, она кэширует вызываемую задачу для выполнения и завершает вызов задачи и уведомление прослушивателя в методе запуска.
@Override
public void run() {
try {
if (setUncancellableInternal()) {
V result = task.call();
setSuccessInternal(result);
}
} catch (Throwable e) {
setFailureInternal(e);
}
}
@Override
public Promise<V> setSuccess(V result) {
if (setSuccess0(result)) {
notifyListeners();
return this;
}
throw new IllegalStateException("complete already: " + this);
}
@Override
public Promise<V> setFailure(Throwable cause) {
if (setFailure0(cause)) {
notifyListeners();
return this;
}
throw new IllegalStateException("complete already: " + this, cause);
}
Когда задача будет вызвана успешно или неудачно, будет вызвана функция notifyListeners для уведомления прослушивателя, поэтому вам необходимо вызвать метод isSuccess в функции обратного вызова, чтобы проверить статус.
Здесь есть сомнение, вызовет ли Future метод addListener, когда задача будет выполнена, так что уведомление не будет выполнено?
@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
synchronized (this) {
addListener0(listener);
}
if (isDone()) {
notifyListeners();
}
return this;
}
Можно обнаружить, что после успешного добавления Listener статус будет проверен немедленно, и если задача была выполнена, обратный вызов будет сделан немедленно, так что не беспокойтесь. Хорошо, давайте посмотрим на реализацию Guava-Future.
Guava-Future
Сначала введите зависимости Maven от guava:
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>22.0</version>
</dependency>
public class GuavaFutureDemo {
public static void main(String[] args) throws InterruptedException {
System.out.println("开始:" + DateUtils.getNow());
ExecutorService executorService = Executors.newFixedThreadPool(10);
ListeningExecutorService service = MoreExecutors.listeningDecorator(executorService);
ListenableFuture<Integer> future = service.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("开始耗时计算:" + DateUtils.getNow());
Thread.sleep(10000);
System.out.println("结束耗时计算:" + DateUtils.getNow());
return 100;
}
});
future.addListener(new Runnable() {
@Override
public void run() {
System.out.println("调用成功");
}
}, executorService);
System.out.println("结束:" + DateUtils.getNow());
new CountDownLatch(1).await();
}
}
ListenableFuture может добавить функцию обратного вызова с помощью метода addListener, который обычно используется там, где не важен результат выполнения. Если вам нужно получить результат при успешном выполнении или получить информацию об исключении при сбое выполнения, вам нужно использовать метод addCallback класса инструментов Futures:
Futures.addCallback(future, new FutureCallback<Integer>() {
@Override
public void onSuccess(@Nullable Integer result) {
System.out.println("成功,计算结果:" + result);
}
@Override
public void onFailure(Throwable t) {
System.out.println("失败");
}
}, executorService);
Как упоминалось ранее, в дополнение к ListenableFuture существует также класс SettableFuture, который также поддерживает возможности обратного вызова. Он реализован из ListenableFuture, поэтому обладает всеми возможностями ListenableFuture.
public class GuavaFutureDemo {
public static void main(String[] args) throws InterruptedException {
System.out.println("开始:" + DateUtils.getNow());
ExecutorService executorService = Executors.newFixedThreadPool(10);
ListenableFuture<Integer> future = submit(executorService);
Futures.addCallback(future, new FutureCallback<Integer>() {
@Override
public void onSuccess(@Nullable Integer result) {
System.out.println("成功,计算结果:" + result);
}
@Override
public void onFailure(Throwable t) {
System.out.println("失败:" + t.getMessage());
}
}, executorService);
Thread.sleep(1000);
System.out.println("结束:" + DateUtils.getNow());
new CountDownLatch(1).await();
}
private static ListenableFuture<Integer> submit(Executor executor) {
SettableFuture<Integer> future = SettableFuture.create();
executor.execute(new Runnable() {
@Override
public void run() {
System.out.println("开始耗时计算:" + DateUtils.getNow());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("结束耗时计算:" + DateUtils.getNow());
// 返回值
future.set(100);
// 设置异常信息
// future.setException(new RuntimeException("custom error!"));
}
});
return future;
}
}
Кажется, что нет большой разницы в использовании, но есть важная проблема, которую легко упустить из виду. Когда метод отмены, наконец, вызывается таким образом SettableFuture, задачи в пуле потоков будут продолжать выполняться, а метод ListenableFuture, возвращаемый методом submit, немедленно отменяет выполнение, что особенно важно. Взгляните на исходный код ниже:
Как и Future Netty, Guava также переписывает метод отправки, реализуя собственный класс реализации ExecutorService ListeningExecutorService.
public interface ListeningExecutorService extends ExecutorService {
<T> ListenableFuture<T> submit(Callable<T> task);
ListenableFuture<?> submit(Runnable task);
<T> ListenableFuture<T> submit(Runnable task, T result);
}
Точно так же метод newTaskFor также был переписан для возврата пользовательского класса Future: TrustedListenableFutureTask.
@Override
protected final <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return TrustedListenableFutureTask.create(runnable, value);
}
@Override
protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return TrustedListenableFutureTask.create(callable);
}
Вызов задачи перейдет в метод запуска TrustedFutureInterruptibleTask:
@Override
public void run() {
TrustedFutureInterruptibleTask localTask = task;
if (localTask != null) {
localTask.run();
}
}
@Override
public final void run() {
if (!ATOMIC_HELPER.compareAndSetRunner(this, null, Thread.currentThread())) {
return; // someone else has run or is running.
}
try {
// 抽象方法,子类进行重写
runInterruptibly();
} finally {
if (wasInterrupted()) {
while (!doneInterrupting) {
Thread.yield();
}
}
}
}
Наконец, вызывается метод runInterruptably задачи TrustedFutureInterruptibleTask, а после завершения задачи вызывается метод set.
@Override
void runInterruptibly() {
if (!isDone()) {
try {
set(callable.call());
} catch (Throwable t) {
setException(t);
}
}
}
protected boolean set(@Nullable V value) {
Object valueToSet = value == null ? NULL : value;
// CAS设置值
if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
complete(this);
return true;
}
return false;
}
В конце полного метода будет получен Listener для обратного вызова.
Метод отмены SettableFuture и ListenableFuture, упомянутый выше, имеет разные эффекты, поскольку один из них переопределяет метод afterDone, а другой — нет.
Вот метод afterDone для ListenableFuture:
@Override
protected void afterDone() {
super.afterDone();
if (wasInterrupted()) {
TrustedFutureInterruptibleTask localTask = task;
if (localTask != null) {
localTask.interruptTask();
}
}
this.task = null;
}
wasInterrupted используется для определения того, вызывается ли отмена (метод отмены установит объект отмены Cancellation в значение)
protected final boolean wasInterrupted() {
final Object localValue = value;
return (localValue instanceof Cancellation) && ((Cancellation) localValue).wasInterrupted;
}
Метод interruptTask фактически отменяет выполнение задачи потока с помощью метода прерывания потока:
final void interruptTask() {
Thread currentRunner = runner;
if (currentRunner != null) {
currentRunner.interrupt();
}
doneInterrupting = true;
}
Шаблон обещания, представленный Callback Hell
Если вы знакомы с ES6, вы не будете незнакомы с паттерном Promise.Если вы не знакомы с внешним интерфейсом, это не имеет значения.Давайте сначала рассмотрим концепцию Callback Hell.
Обратные вызовы — это способ асинхронного вызова, который мы уважаем, но есть и проблемы, то есть вложенность обратных вызовов. Когда необходимо написать несколько асинхронных обратных вызовов вместе, появится следующий код (в качестве примера возьмем js):
asyncFunc1(opt, (...args1) => {
asyncFunc2(opt, (...args2) => {
asyncFunc3(opt, (...args3) => {
asyncFunc4(opt, (...args4) => {
// some operation
});
});
});
});
Хотя многоуровневая вложенность обратных вызовов редко встречается в бизнес-коде JAVA, она всегда является проблемой: такой код нелегко читать, и его трудно модифицировать, если вложенность слишком глубокая. Итак, ES6 предложил шаблон Promise для решения проблемы ада обратных вызовов. Некоторые люди могут захотеть спросить: есть ли шаблон промиса в java? Ответ положительный.
Как упоминалось выше, расширения Netty и Guava предоставляют интерфейсы, такие как addListener, для обработки вызовов обратного вызова, но на самом деле jdk1.8 предоставляет более продвинутый метод обратного вызова: CompletableFuture. Сначала попробуйте использовать CompletableFuture, чтобы переписать описанную выше проблему обратного вызова.
public class CompletableFutureTest {
public static void main(String[] args) throws InterruptedException {
System.out.println("开始:" + DateUtils.getNow());
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("开始耗时计算:" + DateUtils.getNow());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("结束耗时计算:" + DateUtils.getNow());
return 100;
});
completableFuture.whenComplete((result, e) -> {
System.out.println("回调结果:" + result);
});
System.out.println("结束:" + DateUtils.getNow());
new CountDownLatch(1).await();
}
}
Использование трудоемких операций CompletableFuture не занимает квант времени основного потока и позволяет достичь эффекта асинхронных вызовов. Нам также не нужно вводить какие-либо сторонние зависимости, которые все зависят от внешнего вида java.util.concurrent.CompletableFuture. CompletableFuture предоставляет почти 50 методов, что значительно облегчает выполнение многопоточных операций Java и метод написания асинхронных вызовов.
Используйте CompletableFuture для решения проблемы ада обратного вызова:
public class CompletableFutureDemo {
public static void main(String[] args) throws InterruptedException {
long l = System.currentTimeMillis();
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("在回调中执行耗时操作...");
Thread.sleep(10000);
return 100;
});
completableFuture = completableFuture.thenCompose(i -> {
return CompletableFuture.supplyAsync(() -> {
System.out.println("在回调的回调中执行耗时操作...");
Thread.sleep(10000);
return i + 100;
});
});
completableFuture.whenComplete((result, e) -> {
System.out.println("计算结果:" + result);
});
System.out.println("主线程运算耗时:" + (System.currentTimeMillis() - l) + " ms");
new CountDownLatch(1).await();
}
}
вывод:
在回调中执行耗时操作...主线程运算耗时:58 ms在回调的回调中执行耗时操作...计算结果:200
Использование таких методов, как thenCompose или thenComposeAsync, может реализовать обратный вызов обратного вызова, а написанный метод прост в обслуживании.
Как правило, добавление функции обратного вызова в режим Future не требует блокировки ожидания возврата результата и не требует ненужных ресурсов ЦП для опроса состояния обработки.До JDK8 использовались классы инструментов, предоставляемые Netty или Guava. , а после JDK8 можно использовать автоматический класс CompletableFuture с . Future имеет два режима: future и callback. Тип обратного вызова будет иметь проблему ада обратного вызова, которая получена из режима Promise для решения этой проблемы. Это корреляция между паттерном Future и паттерном Promise.