Написав столько всего, я, наконец, добрался до будущего. Эта тема действительно актуальна для него. Мы уже много раз видели этот объект в пуле потоков раньше. В этой статье мы сосредоточимся на нем.
1. Что будет в будущем
1.3 Краткое введение в будущую задачу
эффект: future может использоваться для асинхронного получения результатов многопоточной задачи, Callable используется для генерации результатов, а Future используется для получения результатовобработать: Процесс похож на аплодисменты и ожидание еды Ожидание еды процесс трудоемкий, но это не мешает нам звонить
- Когда Future отправляется, бизнес-обработка запущена в нескольких потоках, и Get получает данные из нескольких потоков.
- Когда дело не обрабатывается Get, текущий поток будет заблокирован до завершения обработки дела, поэтому необходимо обратить внимание на постановку задач в будущем
Использование будущего имеет следующие эффекты:
- 1 Запустите многопоточную задачу
- 2 Займитесь другими делами
- 3 Соберите результаты многопоточных задач
Соответствующий метод Future:
- cancel(boolean) : Отменить операцию
- get() : получить результаты
- get(long,TimeUtil) : Получить в указанное время
- isCancelled() : была ли задача отменена до завершения
- готово() :определить есть ли результат
Роль интерфейса Future состоит в том, чтобы сначала сгенерировать объект Future, поместить конкретную операцию в объект future и, наконец, получить окончательный результат с помощью метода get объекта future.
1.2 Future Task
FutureTask представляет собой асинхронную операцию, которую можно отменить, обеспечивая полный процесс Future.
- Он имеет методы для запуска и отмены операций, запроса завершения операции и получения результатов операции.
- Результат можно получить только после завершения операции, если операция не завершена, метод get будет заблокирован.
Обратим внимание на следующие детали исходного кода:
// Node 1 : 实现了 RunnableFuture
// ------------------------------------
// Node 2 : 提供了7种状态
private static final int NEW = 0; // 新建
private static final int COMPLETING = 1; // 完成
private static final int NORMAL = 2; // 正常
private static final int EXCEPTIONAL = 3; // 异常
private static final int CANCELLED = 4; // 取消
private static final int INTERRUPTING = 5; // 中断(中)
private static final int INTERRUPTED = 6; // 打断
// 过程的流转 :
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
// ------------------------------------
// Node 3 : 内部属性
// 底层Callable 对象
private Callable<V> callable;
// 输出对象
private Object outcome;
// 运行线程
private volatile Thread runner;
// 等待线程的Treiber堆栈
private volatile WaitNode waiters;
// ------------------------------------
// Node 4 : 内部方法
// 方法一 : 获取参数
V report(int s) // 为已完成的任务返回结果或抛出异常
1. Object x = outcome;
2. return (V)x;
// 注意 : 状态为 CANCELLED 时会抛出异常 CancellationException
// 方法二 : 取消
public boolean cancel(boolean mayInterruptIfRunning) // 取消
1. UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
// 这里神奇的做了一个CAS操作, 判断当前的状态
2. Thread t = runner; + t.interrupt();
// 打断线程
3. UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
// CAS 方式修改状态
// 方法三 : get 类型
public V get() throws
public V get(long timeout, TimeUnit unit)
1. if (s <= COMPLETING &&(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
// 核心代码就是关注是否完成
// 方法四 : run
Step 1 : CAS 操作状态
Step 2 : 准备一个 Callable<V> c = callable;
Step 3 : state == NEW 后 , result = c.call();
// 此时阻塞等待
Step 4 : 设置结果 : set(result);
Step 5 : 当然是修改状态啦
Over !!!
// 方法五 : runAndReset
// 在不设置结果的情况下执行计算,然后将这个future重置为初始状态 (其实主要是结尾修改了状态)
核心 : return ran && s == NEW;
// 方法五 : finishCompletion
// 删除并通知所有等待的线程,调用done(),并使callable为空
// 2个for 循环保证执行
for (WaitNode q; (q = waiters) != null;) {
// CAS 保证操作的准确性
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
// 提供许可
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
2. Будущее использование
Future на самом деле относительно прост в использовании, просто инициируйте ожидание, но учтите, что Future заблокирует основной поток.
public class FutureService extends AbstractService implements ApplicationRunner, Callable<String> {
private Logger logger = LoggerFactory.getLogger(this.getClass());
private static Long startTime;
private static Long endTime;
private String salt;
private Integer sleepNum;
public FutureService() {
}
public FutureService(String salt, Integer sleepNum) {
this.salt = salt;
this.sleepNum = sleepNum;
}
@Override
public String call() throws Exception {
logger.info("------> 业务逻辑开始执行 :{} <-------", salt);
StringBuffer sb = new StringBuffer();
for (int i = 0; i < sleepNum; i++) {
sb.append(salt);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
endTime = System.currentTimeMillis();
logger.info("------> {} - 业务执行完成 :{} <-------", salt, sb.toString());
getTime(startTime, endTime);
return sb.toString();
}
@Override
public void run(ApplicationArguments args) throws Exception {
logger.info("------> 创建一个初始连接池 <-------");
ExecutorService executor = Executors.newFixedThreadPool(3);
logger.info("------> 开始业务一 future - a : <-------");
FutureTask<String> future = new FutureTask<String>(new FutureService("a", 10));
startTime = System.currentTimeMillis();
executor.submit(future);
logger.info("------> 业务一请求完毕!主线程执行 <-------");
logger.info("------> 开始业务二 future - b : <-------");
FutureTask<String> future2 = new FutureTask<String>(new FutureService("b", 5));
startTime = System.currentTimeMillis();
executor.submit(future2);
logger.info("------> 业务三请求完毕!主线程执行 <-------");
logger.info("------> 开始业务三 future - c : <-------");
FutureTask<String> future3 = new FutureTask<String>(new FutureService("c", 3));
startTime = System.currentTimeMillis();
executor.submit(future3);
logger.info("------> 业务三请求完毕!主线程执行 <-------");
logger.info("------> future2 数据处理完成:{} <-------", future2.get());
logger.info("------> 2-1 测试主线程是否阻塞 <-------");
logger.info("------> future1 数据处理完成:{} <-------", future.get());
logger.info("------> 1-3 测试主线程是否阻塞 <-------");
logger.info("------> future3 数据处理完成:{} <-------", future3.get());
}
}
19.839 [ main] this is run <-------
19.839 [ main] 开始业务一 future - a : <-------
19.839 [ main] 业务一请求完毕!主线程执行 <-------
19.839 [ main] 开始业务二 future - b : <-------
19.840 [ main] 业务三请求完毕!主线程执行 <-------
19.840 [ main] 开始业务三 future - c : <-------
19.840 [ main] 业务三请求完毕!主线程执行 <-------
19.840 [pool-4-thread-2] 业务逻辑开始执行 :b <-------
19.840 [pool-4-thread-1] 业务逻辑开始执行 :a <-------
19.840 [pool-4-thread-3] 业务逻辑开始执行 :c <-------
22.843 [pool-4-thread-3] c - 业务执行完成 :ccc <-------
22.843 [pool-4-thread-3] time is :3.0 <-------
24.844 [pool-4-thread-2] b - 业务执行完成 :bbbbb <-------
24.844 [pool-4-thread-2] time is :5.0 <-------
24.844 [ main] future2 数据处理完成:bbbbb <-------
24.844 [ main] 2-1 测试主线程是否阻塞 <-------
29.847 [pool-4-thread-1] a - 业务执行完成 :aaaaaaaaaa <-------
29.847 [pool-4-thread-1] time is :10.0 <-------
29.847 [ main] future1 数据处理完成:aaaaaaaaaa <-------
29.847 [ main] 1-3 测试主线程是否阻塞 <-------
29.847 [ main] future3 数据处理完成:ccc <-------
// 流程 :
// Main 线程中 , 哭有看到 abc 时顺序执行的 , 从 submit 开始 , 开始多线程执行 (所以顺序不再固定, 变成了 bac)
// 从多线程里面看 , c > b > a 执行完成
// 当 b 业务完成后 , 因为main 一直阻塞到 futurb.get 的阶段 , 所以B future 获取值 , main 线程遇到 a future 继续阻塞
// 当 a future get 完成后 , c 才能get
// 总结 :
> future submit 多线程执行
> future get 会阻塞主线程等待 , 当 get 时 , 多线程才会把数据提供出来
3. Будущие вопросы и ответы
Согласно наиболее распространенному использованию пула потоков, мы вернем объект Future при передаче executor.submit, а затем получим его через объект Future.
Во-первых: мы рассуждаем таким образом ExecutorService executor = Executors.newFixedThreadPool(1); Future future = executor.submit(A Callable Object);
Вопрос 1: На чем основан фундамент Future?
- Шаг 1: При вызове через submit нижний слой вызовет:
return new FutureTask<T>(runnable, value);
- Шаг 2: он будет повышен до родительского класса RunnableFuture во внешнем слое и будет повышен до Future, когда вернется.
RunnableFuture<T> ftask = newTaskFor(task, result);
Резюме: Таким образом, базовый класс реализации в основном можно рассматривать как FutureTask, а задачу можно фактически рассматривать как класс реализации Runnable.
Вопрос 2: Как работает Future?
Будущая операция в основном основана на run()
- Выполняется вызовом callable.call()
- Если вызов выполнен успешно, сохраните результат с помощью метода set и сохраните результат в результате;
- Если есть исключение при выполнении вызова, сохраните исключение через setException;
Вопрос 3: На чем основан будущий обратный вызов -- get(long,TimeUtil)?
Завершите вызов, позвонив в отчет(ы)
Вопрос 4: Как блокирует будущее?
Когда суждение не завершено, awaitDone будет вызываться для ожидания, а конкретная логика будет проанализирована позже.
if (s <= COMPLETING){
s = awaitDone(false, 0L);
}
awaitDone в основном выполняет следующие действия:
- Если основной поток прерывается, генерируется исключение прерывания;
- Оцените текущее состояние FutureTask, если оно больше, чем COMPLETING, это означает, что задача была выполнена, а затем вернуться напрямую;
- Если текущее состояние равно COMPLETING, это означает, что задача была выполнена, в это время основному потоку нужно только отказаться от ресурсов процессора через метод yield и дождаться, пока состояние станет NORMAL;
- Инкапсулировать текущий поток через класс WaitNode и добавить его в список ожидающих через UNSAFE;
- Наконец, приостановите поток через парк LockSupport или parkNanos;
Вопрос 5: Как отменить будущее?
- отменить (логическое значение mayInterruptIfRunning) в t.interrupt()
- И UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); изменяет состояние
Вопрос 6: Как будущее оценивает отмену и результат isCancelled -- isDone ?
return state >= CANCELLED;
4. Будущее и вызываемое
Это нужно видеть из ExecutorService.submit(), ExecutorService имеет два основных метода отправки, будь то Callable или Runnable, это не сложно найти через возвращаемое значение, которое в итоге становится Future
<T> Future<T> submit(Callable<T> task);
- RunnableFuture<T> ftask = newTaskFor(task);
- return new ForkJoinTask.AdaptedCallable<T>(callable);
- execute(ftask);
<T> Future<T> submit(Runnable task, T result);
- RunnableFuture<T> ftask = newTaskFor(task, result);
- execute(ftask);
// 例如这里可以直接将 Callable 作为参数传进去 :
Future<String> future = executor.submit(createCallable());
public Callable createCallable() {
Callable<Module> call = new Callable<Module>() {
public Module call() throws Exception {
// .....
}
};
return call;
}
5. Производное использование ScheduledFutureTask
// ScheduledFutureTask 简介
• time:任务执行时间;
• period:任务周期执行间隔;
• sequenceNumber:自增的任务序号。
// 执行顺序 : 在等待队列里调度不再按照FIFO,而是按照执行时间,谁即将执行,谁就排在前面。
M- getDelay(TimeUnit unit)
M- int compareTo(Delayed other)
// ScheduledFutureTask 主要在 ScheduledThreadPoolExecutor中
// Node 1 : ScheduledThreadPoolExecutor内部类 ScheduledFutureTask
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V>
ссылка на основной кодПул потоковэтот документ