Проект анализа исходного кода JDK1.8 (китайская аннотация) Адрес Github:
GitHub.com/Примечания к источнику/…
1 Что такое будущее?
Например, мы обычно покупаем вещи в Интернете.После размещения заказа будет сгенерирован номер заказа, и затем продавец доставит товар в соответствии с этим номером заказа.Экспресс-покупки в Интернете для нас. В этом процессе эта серия номеров отслеживания является важным документом для нашей квитанции.
Таким образом, будущее JDK похоже на порядковый номер вещей, которые мы покупаем в Интернете.Когда мы выполняем трудоемкую задачу, мы можем запустить другой поток для асинхронного выполнения этой трудоемкой задачи, и в то же время мы можем делать другие вещи. Когда работа сделана, мы можем извлечь результат выполнения трудоемкой задачи по «единому номеру» будущего. Следовательно, Future также является режимом приложения в многопоточности.
расширять: Говоря о многопоточности, в чем разница между Future и Thread? Самое важное отличие состоит в том, что Thread не возвращает результаты, а режим Future возвращает.
2 Как использовать Будущее
Прежде чем мы разберемся, что такое Future, давайте возьмем простой пример, чтобы увидеть, как использовать Future.
Если мы хотим приготовить горячий горшок сейчас, сначала нам нужно подготовить две вещи: вскипятить воду и подготовить ингредиенты. Поскольку кипячение воды — это длительный процесс (эквивалентно трудоемкой бизнес-логике), мы можем кипятить воду (эквивалентно запуску другого потока) во время приготовления ингредиентов для горячих блюд (основной поток) и ждать, пока они оба будут готовы. горшок.
// DaHuoGuo.java
public class DaHuoGuo {
public static void main(String[] args) throws Exception {
FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println(Thread.currentThread().getName() + ":" + "开始烧开水...");
// 模拟烧开水耗时
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + ":" + "开水已经烧好了...");
return "开水";
}
});
Thread thread = new Thread(futureTask);
thread.start();
// do other thing
System.out.println(Thread.currentThread().getName() + ":" + " 此时开启了一个线程执行future的逻辑(烧开水),此时我们可以干点别的事情(比如准备火锅食材)...");
// 模拟准备火锅食材耗时
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName() + ":" + "火锅食材准备好了");
String shicai = "火锅食材";
// 开水已经稍好,我们取得烧好的开水
String boilWater = futureTask.get();
System.out.println(Thread.currentThread().getName() + ":" + boilWater + "和" + shicai + "已经准备好,我们可以开始打火锅啦");
}
}
Результат выполнения показан на скриншоте ниже, что соответствует нашим ожиданиям:
Как видно из приведенного выше кода, мы используем Future в основном на следующих этапах:
- создать новый
Callable
Анонимные функции реализуют объекты класса, и наша бизнес-логикаCallable
изcall
Реализовано в методе, где универсальный тип Callable является типом возвращаемого результата; - тогда поставь
Callable
анонимный функциональный объект какFutureTask
Параметры конструктора передаются для созданияfutureTask
объект; - тогда поставь
futureTask
объект какThread
Параметры построения передаются, и поток запускается для выполнения бизнес-логики; - Наконец мы звоним
futureTask
объектget
Метод получает результат выполнения бизнес-логики.
Вы можете видеть, что классы JDK, связанные с использованием Future, в основномFutureTask
иCallable
Два, следующие в основном дляFutureTask
Выполните анализ исходного кода.
расширять: есть еще один способ использования
Future
путь кCallable
Здесь не будет представлен способ передачи класса реализации в пул потоков для выполнения, а только Baidu.
3 Анализ структуры класса FutureTask
Давайте сначала посмотримFutureTask
Структура класса:
можно увидеть
FutureTask
ДостигнутоRunnableFuture
интерфейс, при этомRunnableFuture
Интерфейс снова наследуетFuture
иRunnable
интерфейс. так какFutureTask
реализовано косвенноRunnable
интерфейс, поэтому его можно использовать как задачуThread
выполнять; кроме того,Самая важная вещьэтоFutureTask
также косвенноFuture
интерфейс, поэтому результат выполнения задачи тоже можно получить. Давайте кратко рассмотрим эти интерфейсы.api
.
// Runnable.java
@FunctionalInterface
public interface Runnable {
// 执行线程任务
public abstract void run();
}
Runnable
Рассказывать особо нечего, я думаю, что все уже знакомы с ним.
// Future.java
public interface Future<V> {
/**
* 尝试取消线程任务的执行,分为以下几种情况:
* 1)如果线程任务已经完成或已经被取消或其他原因不能被取消,此时会失败并返回false;
* 2)如果任务还未开始执行,此时执行cancel方法,那么任务将被取消执行,此时返回true;TODO 此时对应任务状态state的哪种状态???不懂!!
* 3)如果任务已经开始执行,那么mayInterruptIfRunning这个参数将决定是否取消任务的执行。
* 这里值得注意的是,cancel(true)实质并不能真正取消线程任务的执行,而是发出一个线程
* 中断的信号,一般需要结合Thread.currentThread().isInterrupted()来使用。
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
* 判断任务是否被取消,在执行任务完成前被取消,此时会返回true
*/
boolean isCancelled();
/**
* 这个方法不管任务正常停止,异常还是任务被取消,总是返回true。
*/
boolean isDone();
/**
* 获取任务执行结果,注意是阻塞等待获取任务执行结果。
*/
V get() throws InterruptedException, ExecutionException;
/**
* 获取任务执行结果,注意是阻塞等待获取任务执行结果。
* 只不过在规定的时间内未获取到结果,此时会抛出超时异常
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Future
Интерфейс символизирует результат асинхронного выполнения задач, то есть выполнение трудоёмкой задачи может выполняться в другом потоке, и тогда мы можем в это время заниматься другими делами, а вызывать будем после завершения других дел.Future.get()
Метод может получить результат, в это время, если асинхронная задача не завершилась, он заблокируется и будет ждать, пока асинхронная задача не выполнится и не будет получен результат.
// RunnableFuture.java
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
RunnableFuture
даFuture
иRunnable
Композиция интерфейса, то есть представление интерфейса, может выполняться потоком асинхронно, поскольку реализацияRunnable
интерфейс, вы можете получить результат выполнения асинхронной задачи потока, потому что реализацияFuture
интерфейс. Так решеноRunnable
Асинхронные задачи не имеют недостатка в возврате результата.
Далее давайте посмотрим наFutureTask
,FutureTask
ДостигнутоRunnableFuture
интерфейс так себеFuture
иRunnable
Конкретный класс реализации интерфейса представляет собой отменяемую задачу асинхронного потока, которая обеспечиваетFuture
Базовая реализация асинхронной задачи, то есть после выполнения асинхронной задачи мы можем получить результат выполнения асинхронной задачи, что является высшим приоритетом нашего следующего анализа.FutureTask
может упаковатьCallable
иRunnable
объект, кроме того,FutureTask
Помимо выполнения потоками, он также может быть отправлен в пул потоков для выполнения.
Давайте взглянемFutureTask
Категорияapi
, основные методы обведены красным.
На фото выше
FutureTask
изrun
Метод — это метод, который асинхронно выполняется потоком.get
Метод — это метод получения результата выполнения асинхронной задачи.cancel
метод — это метод, который отменяет выполнение задачи. Далее мы в основном сосредоточимся на этих трех методах.
считать:
-
FutureTask
перезаписанныйrun
Тип возврата метода по-прежнемуvoid
, указывая на отсутствие возвращаемого значения, тоFutureTask
изget
Как метод получает возвращаемое значение? -
FutureTask
изcancel
Может ли метод действительно отменить выполнение асинхронной задачи потока? При каких обстоятельствах я могу отменить?
так какFutureTask
Результат выполнения асинхронной задачи также следуетCallable
Интерфейс связан, так что давайте посмотрим на него еще разCallable
интерфейс:
// Callable.java
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*/
V call() throws Exception;
}
мы все знаем,Callable<V>
интерфейс иRunnable
Все интерфейсы могут быть отправлены в пул потоков для выполнения, единственная разницаCallable<V>
Интерфейс имеет возвращаемый результат, и общий тип в немV
должен вернуть результат, иRunnable
Интерфейс не возвращает результатов.
считать: В целом,
Runnable
Только класс реализации интерфейса может быть отправлен в пул потоков для выполнения, почемуCallable
Классы реализации интерфейса также могут быть отправлены в пул потоков для выполнения? Подумайте о пулах потоковsubmit
внутри методаCallable
Это подходит?
4 Анализ исходного кода FutureTask
4.1 Переменные-члены FutureTask
давайте сначала посмотримFutureTask
Каковы переменные-члены , и понимание этих переменных-членов очень важно для последующего анализа исходного кода.
// FutureTask.java
/** 封装的Callable对象,其call方法用来执行异步任务 */
private Callable<V> callable;
/** 在FutureTask里面定义一个成员变量outcome,用来装异步任务的执行结果 */
private Object outcome; // non-volatile, protected by state reads/writes
/** 用来执行callable任务的线程 */
private volatile Thread runner;
/** 线程等待节点,reiber stack的一种实现 */
private volatile WaitNode waiters;
/** 任务执行状态 */
private volatile int state;
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
// 对应成员变量state的偏移地址
private static final long stateOffset;
// 对应成员变量runner的偏移地址
private static final long runnerOffset;
// 对应成员变量waiters的偏移地址
private static final long waitersOffset;
Здесь мы сосредоточимся наFutureTask
изCallable
переменная-член, потому чтоFutureTask
Асинхронная задача в конечном итоге делегируетсяCallable
достигать.
считать:
-
FutureTask
переменная-членrunner
,waiters
иstate
все былоvolatile
Модификация, мы можем подумать о том, почему эти три переменные-члены должны бытьvolatile
Изменено, и другие переменные-члены не используют его?volatile
Какова роль ключевых слов? - Теперь, когда переменные-члены определены
runner
,waiters
иstate
, теперь снова определеноstateOffset
,runnerOffset
иwaitersOffset
Переменные соответствуютrunner
,waiters
иstate
Адрес смещения , зачем беспокоиться?
Посмотрим еще разstateOffset
,runnerOffset
иwaitersOffset
Процесс инициализации этих трех переменных:
// FutureTask.java
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = FutureTask.class;
stateOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("state"));
runnerOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("runner"));
waitersOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiters"));
} catch (Exception e) {
throw new Error(e);
}
}
4.2 Изменения состояния FutureTask
сказал ранееFutureTask
переменная-член, имеет представлениегосударствопеременная-членstate
Давайте сосредоточимся наstate
Переменная представляет состояние выполнения задачи.
// FutureTask.java
/** 任务执行状态 */
private volatile int state;
/** 任务新建状态 */
private static final int NEW = 0;
/** 任务正在完成状态,是一个瞬间过渡状态 */
private static final int COMPLETING = 1;
/** 任务正常结束状态 */
private static final int NORMAL = 2;
/** 任务执行异常状态 */
private static final int EXCEPTIONAL = 3;
/** 任务被取消状态,对应cancel(false) */
private static final int CANCELLED = 4;
/** 任务中断状态,是一个瞬间过渡状态 */
private static final int INTERRUPTING = 5;
/** 任务被中断状态,对应cancel(true) */
private static final int INTERRUPTED = 6;
Вы можете увидеть переменные состояния задачиstate
Вышеупомянутые 7 состояний, 0-6 соответствуют каждому состоянию соответственно. Статус задачи изначальноNEW
, затем поFutureTask
три методаset
,setException
иcancel
Чтобы установить изменение состояния, изменение состояния имеет следующие четыре ситуации:
-
NEW -> COMPLETING -> NORMAL
: это изменение состояния указывает на нормальное завершение асинхронной задачи, гдеCOMPLETING
является мгновенным временным переходным состоянием,set
Метод задает изменение состояния; -
NEW -> COMPLETING -> EXCEPTIONAL
: это изменение состояния указывает на то, что во время выполнения асинхронной задачи возникает исключение.setException
Метод задает изменение состояния; -
NEW -> CANCELLED
: это изменение состояния указывает на то, что оно отменено, то есть вызов сделанcancel(false)
,Зависит отcancel
метод установки изменений состояния; -
NEW -> INTERRUPTING -> INTERRUPTED
: Это изменение состояния указывает на то, что оно прервано, то есть вызов сделанcancel(true)
,Зависит отcancel
метод установки изменений состояния.
4.3 Конструктор FutureTask
FutureTask
Есть два конструктора, рассмотрим их по отдельности:
// FutureTask.java
// 第一个构造函数
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
Можно видеть, что этот конструктор полезен в примере кода «горячего горшка», который мы упоминали ранее, то естьCallable
Назначение переменных-членов, вызываемое при асинхронном выполнении задачиCallable.call
Метод выполняет логику асинхронной задачи. Кроме того, в этот момент укажите статус задачиstate
назначить какNEW
, указывающий на новый статус задачи.
давайте посмотрим еще разFutureTask
Другой конструктор:
// FutureTask.java
// 另一个构造函数
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
Этот конструктор выполняетExecutors.callable(runnable, result)
через адаптерRunnableAdapter
будущееRunnable
объектrunnable
преобразовать вCallable
объект, а затем датьcallable
иstate
присвоение переменной.
Уведомление, здесь нужно помнить, чтоFutureTask
Когда новый, статус задачи в это времяstate
даNEW
Достаточно.
4.4 Метод FutureTask.run, используемый для выполнения асинхронных задач
Ранее мы упоминалиFutureTask
реализовано косвенноRunnable
интерфейс, переопределенныйRunnable
интерфейсrun
метод, поэтому переопределенныйrun
Метод передается потоку для выполнения, и в то же времяrun
Метод — это именно метод выполнения логики асинхронной задачи, то после выполненияrun
Как метод сохраняет результат выполнения асинхронной задачи?
Теперь сосредоточимся на анализеrun
метод:
// FutureTask.java
public void run() {
// 【1】,为了防止多线程并发执行异步任务,这里需要判断线程满不满足执行异步任务的条件,有以下三种情况:
// 1)若任务状态state为NEW且runner为null,说明还未有线程执行过异步任务,此时满足执行异步任务的条件,
// 此时同时调用CAS方法为成员变量runner设置当前线程的值;
// 2)若任务状态state为NEW且runner不为null,任务状态虽为NEW但runner不为null,说明有线程正在执行异步任务,
// 此时不满足执行异步任务的条件,直接返回;
// 1)若任务状态state不为NEW,此时不管runner是否为null,说明已经有线程执行过异步任务,此时没必要再重新
// 执行一次异步任务,此时不满足执行异步任务的条件;
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
// 拿到之前构造函数传进来的callable实现类对象,其call方法封装了异步任务执行的逻辑
Callable<V> c = callable;
// 若任务还是新建状态的话,那么就调用异步任务
if (c != null && state == NEW) {
// 异步任务执行结果
V result;
// 异步任务执行成功还是始遍标志
boolean ran;
try {
// 【2】,执行异步任务逻辑,并把执行结果赋值给result
result = c.call();
// 若异步任务执行过程中没有抛出异常,说明异步任务执行成功,此时设置ran标志为true
ran = true;
} catch (Throwable ex) {
result = null;
// 异步任务执行过程抛出异常,此时设置ran标志为false
ran = false;
// 【3】设置异常,里面也设置state状态的变化
setException(ex);
}
// 【3】若异步任务执行成功,此时设置异步任务执行结果,同时也设置状态的变化
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
// 异步任务正在执行过程中,runner一直是非空的,防止并发调用run方法,前面有调用cas方法做判断的
// 在异步任务执行完后,不管是正常结束还是异常结束,此时设置runner为null
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
// 线程执行异步任务后的任务状态
int s = state;
// 【4】如果执行了cancel(true)方法,此时满足条件,
// 此时调用handlePossibleCancellationInterrupt方法处理中断
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
Вы можете увидеть выполнение асинхронных задачrun
Метод в основном делится на следующие четыре шага:
- Определить, соответствует ли поток условиям для выполнения асинхронных задач: чтобы предотвратить одновременное выполнение нескольких потоков асинхронных задач, необходимо оценить, заполнены ли потоки или не соответствуют ли они условиям для выполнения асинхронных задач;
-
Если условие выполнено, выполнить асинхронную задачу: потому что логика асинхронной задачи инкапсулирована в
Callable.call
метод, вызывайте непосредственно в это времяCallable.call
Метод выполняет асинхронную задачу, а затем возвращает результат выполнения; -
Выполнять различную обработку в зависимости от выполнения асинхронных задач: 1) Если выполнение асинхронной задачи завершается нормально, вызовите в это время
set(result);
Чтобы установить результат выполнения задачи; 2) Если асинхронное выполнение задачи вызывает исключение, вызовите это времяsetException(ex);
Чтобы установить исключения, см. подробный анализ4.4.1小节
; -
Последствия выполнения асинхронной задачи: Независимо от успешности или неудачи асинхронной задачи, если другие потоки вызывают
FutureTask.cancel(true)
, вам нужно позвонитьhandlePossibleCancellationInterrupt
Метод обрабатывает прерывание.Для подробного анализа см.4.4.2小节
.
здесьпримечательныйЭто судить о том, заполнен ли поток или нет, чтобы соответствовать условиям выполнения асинхронных задач,runner
Этоnull
это вызовUNSAFE
изCAS
методcompareAndSwapObject
судить и устанавливать, и в то же времяcompareAndSwapObject
через переменные-членыrunner
адрес смещенияrunnerOffset
прийти, чтобы датьrunner
Назначены, кроме того, переменные-членыrunner
изменено какvolatile
В случае многопоточности один потокvolatile
Значение декоративной переменной может быть немедленно сброшено в основную память, чтобы оно было видно другим потокам.
4.4.1 Методы set и setException FutureTask
Давайте посмотрим на вызов, когда асинхронное выполнение задачи завершается нормально.set(result);
метод:
// FutureTask.java
protected void set(V v) {
// 【1】调用UNSAFE的CAS方法判断任务当前状态是否为NEW,若为NEW,则设置任务状态为COMPLETING
// 【思考】此时任务不能被多线程并发执行,什么情况下会导致任务状态不为NEW?
// 答案是只有在调用了cancel方法的时候,此时任务状态不为NEW,此时什么都不需要做,
// 因此需要调用CAS方法来做判断任务状态是否为NEW
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 【2】将任务执行结果赋值给成员变量outcome
outcome = v;
// 【3】将任务状态设置为NORMAL,表示任务正常结束
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
// 【4】调用任务执行完成方法,此时会唤醒阻塞的线程,调用done()方法和清空等待线程链表等
finishCompletion();
}
}
Видно, что после нормального выполнения асинхронной задачи заканчивается, а асинхронная задача не выполняетсяcancel
В случае , он сделает следующее: сохранит результат выполнения задачи вFutureTask
переменная-членoutcome
, он будет вызываться после присваиванияfinishCompletion
метод пробуждения заблокированного потока (откуда взялся заблокированный поток? Это будет проанализировано позже),примечательныйСоответствующее изменение состояния задачи здесьNEW -> COMPLETING -> NORMAL.
Давайте продолжим видеть, что когда во время выполнения асинхронной задачи возникает исключение, оно будет вызвано в это время.setException(ex);
метод.
// FutureTask.java
protected void setException(Throwable t) {
// 【1】调用UNSAFE的CAS方法判断任务当前状态是否为NEW,若为NEW,则设置任务状态为COMPLETING
// 【思考】此时任务不能被多线程并发执行,什么情况下会导致任务状态不为NEW?
// 答案是只有在调用了cancel方法的时候,此时任务状态不为NEW,此时什么都不需要做,
// 因此需要调用CAS方法来做判断任务状态是否为NEW
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 【2】将异常赋值给成员变量outcome
outcome = t;
// 【3】将任务状态设置为EXCEPTIONAL
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
// 【4】调用任务执行完成方法,此时会唤醒阻塞的线程,调用done()方法和清空等待线程链表等
finishCompletion();
}
}
можно увидетьsetException(Throwable t)
Логика кода такая же, как и в предыдущемset(V v)
Почти то же самое, разница в том, что во время выполнения задачи выбрасывается исключение, и исключение сохраняется вFutureTask
переменная-членoutcome
в, а также,примечательныйСоответствующее изменение состояния задачи здесьNEW -> COMPLETING -> EXCEPTIONAL.
Поскольку асинхронная задача завершается нормально или аварийно, она будет вызвана в это время.FutureTask
изfinishCompletion
метод для пробуждения заблокированного потока, где заблокированный поток означает, что мы вызываемFuture.get
Если асинхронная задача не была выполнена во время выполнения метода, поток в это время будет заблокирован.
// FutureTask.java
private void finishCompletion() {
// assert state > COMPLETING;
// 取出等待线程链表头节点,判断头节点是否为null
// 1)若线程链表头节点不为空,此时以“后进先出”的顺序(栈)移除等待的线程WaitNode节点
// 2)若线程链表头节点为空,说明还没有线程调用Future.get()方法来获取任务执行结果,固然不用移除
for (WaitNode q; (q = waiters) != null;) {
// 调用UNSAFE的CAS方法将成员变量waiters设置为空
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
// 取出WaitNode节点的线程
Thread t = q.thread;
// 若取出的线程不为null,则将该WaitNode节点线程置空,且唤醒正在阻塞的该线程
if (t != null) {
q.thread = null;
//【重要】唤醒正在阻塞的该线程
LockSupport.unpark(t);
}
// 继续取得下一个WaitNode线程节点
WaitNode next = q.next;
// 若没有下一个WaitNode线程节点,说明已经将所有等待的线程唤醒,此时跳出for循环
if (next == null)
break;
// 将已经移除的线程WaitNode节点的next指针置空,此时好被垃圾回收
q.next = null; // unlink to help gc
// 再把下一个WaitNode线程节点置为当前线程WaitNode头节点
q = next;
}
break;
}
}
// 不管任务正常执行还是抛出异常,都会调用done方法
done();
// 因为异步任务已经执行完且结果已经保存到outcome中,因此此时可以将callable对象置空了
callable = null; // to reduce footprint
}
finishCompletion
Функция метода заключается в пробуждении и удалении узла ожидающего потока из списка ожидания потока независимо от того, нормально или аварийно завершается асинхронная задача.Treiber stack
, поэтому порядок пробуждения (удаления) такой: "последний вошел-первый вышел", то есть поток, который приходит первым, пробуждается (удаляется) первым. продолжить анализ позже.
4.4.2 Метод handlePossibleCancellationInterrupt FutureTask
существует4.4小节
анализrun
В конце метода естьfinally
блокировать, если статус задачиstate >= INTERRUPTING
, это означает, что выполняются другие потокиcancel(true)
метод, необходимо датьCPU
Сегмент времени выполнения выполняется другими потоками Давайте посмотрим на конкретный исходный код:
// FutureTask.java
private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let's spin-wait patiently.
// 当任务状态是INTERRUPTING时,此时让出CPU执行的机会,让其他线程执行
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
// assert state == INTERRUPTED;
// We want to clear any interrupt we may have received from
// cancel(true). However, it is permissible to use interrupts
// as an independent mechanism for a task to communicate with
// its caller, and there is no way to clear only the
// cancellation interrupt.
//
// Thread.interrupted();
}
считать: почему статус задачи
INTERRUPTING
На этот раз нужно отказаться от временного отрезка выполнения CPU? И почему он должен вызываться после выполнения обязательной задачиhandlePossibleCancellationInterrupt
метод?
4.5 Метод FutureTask.get, получить результат выполнения задачи
前面我们起一个线程在其`run`方法中执行异步任务后,此时我们可以调用`FutureTask.get`方法来获取异步任务执行的结果。
// FutureTask.java
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 【1】若任务状态<=COMPLETING,说明任务正在执行过程中,此时可能正常结束,也可能遇到异常
if (s <= COMPLETING)
s = awaitDone(false, 0L);
// 【2】最后根据任务状态来返回任务执行结果,此时有三种情况:1)任务正常执行;2)任务执行异常;3)任务被取消
return report(s);
}
Вы можете видеть, что если статус задачиstate<=COMPLETING
, указывая на то, что асинхронная задача находится в процессе выполнения, в это время она вызоветawaitDone
Метод блокируется и ждет, при выполнении задачи в это время снова вызываетсяreport
способ сообщить о результате задачи, в это время возможны три ситуации: 1) задача выполняется нормально; 2) задача выполняется ненормально; 3) задача отменена.
4.5.1 Метод FutureTask.awaitDone
FutureTask.awaitDone
Метод блокирует текущий поток, который получает результат выполнения асинхронной задачи, до завершения выполнения асинхронной задачи.
// FutureTask.java
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// 计算超时结束时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 线程链表头节点
WaitNode q = null;
// 是否入队
boolean queued = false;
// 死循环
for (;;) {
// 如果当前获取任务执行结果的线程被中断,此时移除该线程WaitNode链表节点,并抛出InterruptedException
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// 【5】如果任务状态>COMPLETING,此时返回任务执行结果,其中此时任务可能正常结束(NORMAL),可能抛出异常(EXCEPTIONAL)
// 或任务被取消(CANCELLED,INTERRUPTING或INTERRUPTED状态的一种)
if (s > COMPLETING) {
// 【问】此时将当前WaitNode节点的线程置空,其中在任务结束时也会调用finishCompletion将WaitNode节点的thread置空,
// 这里为什么又要再调用一次q.thread = null;呢?
// 【答】因为若很多线程来获取任务执行结果,在任务执行完的那一刻,此时获取任务的线程要么已经在线程等待链表中,要么
// 此时还是一个孤立的WaitNode节点。在线程等待链表中的的所有WaitNode节点将由finishCompletion来移除(同时唤醒)所有
// 等待的WaitNode节点,以便垃圾回收;而孤立的线程WaitNode节点此时还未阻塞,因此不需要被唤醒,此时只要把其属性置为
// null,然后其有没有被谁引用,因此可以被GC。
if (q != null)
q.thread = null;
// 【重要】返回任务执行结果
return s;
}
// 【4】若任务状态为COMPLETING,此时说明任务正在执行过程中,此时获取任务结果的线程需让出CPU执行时间片段
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 【1】若当前线程还没有进入线程等待链表的WaitNode节点,此时新建一个WaitNode节点,并把当前线程赋值给WaitNode节点的thread属性
else if (q == null)
q = new WaitNode();
// 【2】若当前线程等待节点还未入线程等待队列,此时加入到该线程等待队列的头部
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 若有超时设置,那么处理超时获取任务结果的逻辑
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
// 【3】若没有超时设置,此时直接阻塞当前线程
else
LockSupport.park(this);
}
}
FutureTask.awaitDone
Основные вещи, которые делает этот метод, резюмируются следующим образом:
- во-первых
awaitDone
Метод представляет собой бесконечный цикл; - Если текущий поток, который получает результат, прерывается другим потоком, узел связанного списка WaitNode потока удаляется в это время и генерируется InterruptedException;
- Если статус задачи
state>COMPLETING
, в это время возвращается результат выполнения задачи; - Если статус задачи
COMPLETING
, поток, который получает результат задачи, должен отказаться от сегмента времени выполнения ЦП; - как
q == null
, указывая на то, что текущий поток не был установлен вWaitNode
узел, новый на данный моментWaitNode
узел и установить егоthread
Атрибут — текущий поток; - как
queued==false
, указывающий текущий потокWaitNode
Узел еще не присоединился к списку ожидания потока, а затем присоединяется к началу списка; - когда
timed
Если установлено значение true, метод имеет функцию тайм-аута в это время, и логика тайм-аута здесь подробно не анализируется; - Когда предыдущие 6 условий не выполняются, текущий поток в это время блокируется.
Мы проанализировали здесь, у нас может быть только один поток для выполнения асинхронной задачи, и результат асинхронной задачи может быть получен несколькими потоками.Когда асинхронная задача не была выполнена, поток, который получает результат асинхронной задачи, присоединится к потоку чтобы ждать связанный список, затем позвоните по вызовуLockSupport.park(this);
метод блокирует текущий поток. Пока асинхронная задача не будет выполнена, она будет вызыватьсяfinishCompletion
метод пробуждения и удаления потока, ожидающего каждого из связанного спискаWaitNode
Узел, проснись здесь (удалить)WaitNode
Поток узла начинается с головы связанного списка, который мы также проанализировали ранее.
Еще одна вещь, которую следует отметить, это то, чтоawaitDone
В методе есть бесконечный цикл.Когда поток, который получает асинхронную задачу, входит, он может войти в несколько условных ветвей несколько раз для выполнения различной бизнес-логики или может войти только в одну условную ветвь. Ниже приведены две возможные ситуации:
Дело 1:
Когда приходит поток, который получает результат асинхронной задачи, асинхронная задача еще не выполнена.state=NEW
И когда нет настройки тайм-аута:
-
первый цикл:В настоящее время
q = null
, введите вышеуказанную кодовую метку в это время【1】
Ветка суждения состоит в том, чтобы создать новую для текущего потока.WaitNode
узел; -
второй цикл:В настоящее время
queued = false
, введите вышеуказанную кодовую метку в это время【2】
Судебная ветвь , которая будет вновь создана доWaitNode
Узел добавляется в список ожидания потока; -
третий цикл: Введите указанный выше код в это время.
【3】
Судебная ветвь , то есть блокировка текущего потока; -
четвертый цикл: Присоединяйтесь в это время к выполнению асинхронной задачи, введите указанную выше кодовую метку в это время.
【5】
Ветвь решения должна вернуть результат выполнения асинхронной задачи.
Случай 2:
Когда приходит поток, который получает результат асинхронной задачи, асинхронная задача уже выполнена.state>COMPLETING
И когда нет настройки тайм-аута, прямо введите указанную выше кодовую метку в это время.【5】
Ветвь оценки должна напрямую возвращать результат выполнения асинхронной задачи, и нет необходимости присоединяться к списку ожидания потока.
4.5.2 Метод FutureTask.report
существуетget
В методе, когда выполнение асинхронной задачи завершается, не имеет значения, завершается ли асинхронная задача нормально или аварийно, илиcancel
, поток, который получает результат асинхронной задачи, в это время будет разбужен, поэтому он продолжит выполнениеFutureTask.report
Метод сообщает о выполнении асинхронной задачи, которая может вернуть результат или вызвать исключение.
// FutureTask.java
private V report(int s) throws ExecutionException {
// 将异步任务执行结果赋值给x,此时FutureTask的成员变量outcome要么保存着
// 异步任务正常执行的结果,要么保存着异步任务执行过程中抛出的异常
Object x = outcome;
// 【1】若异步任务正常执行结束,此时返回异步任务执行结果即可
if (s == NORMAL)
return (V)x;
// 【2】若异步任务执行过程中,其他线程执行过cancel方法,此时抛出CancellationException异常
if (s >= CANCELLED)
throw new CancellationException();
// 【3】若异步任务执行过程中,抛出异常,此时将该异常转换成ExecutionException后,重新抛出。
throw new ExecutionException((Throwable)x);
}
4.6 Метод FutureTask.cancel отменяет выполнение задачи
Давайте посмотрим на последнийFutureTask.cancel
метод, как только мы увидимFutureTask.cancel
метод, мы, должно быть, наивно думали, что это метод, который может отменить выполнение асинхронных задач.Если мы так думаем, мы можем только сказать, что мы правы только наполовину.
// FutureTask.java
public boolean cancel(boolean mayInterruptIfRunning) {
// 【1】判断当前任务状态,若state == NEW时根据mayInterruptIfRunning参数值给当前任务状态赋值为INTERRUPTING或CANCELLED
// a)当任务状态不为NEW时,说明异步任务已经完成,或抛出异常,或已经被取消,此时直接返回false。
// TODO 【问题】此时若state = COMPLETING呢?此时为何也直接返回false,而不能发出中断异步任务线程的中断信号呢??
// TODO 仅仅因为COMPLETING是一个瞬时态吗???
// b)当前仅当任务状态为NEW时,此时若mayInterruptIfRunning为true,此时任务状态赋值为INTERRUPTING;否则赋值为CANCELLED。
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
// 【2】如果mayInterruptIfRunning为true,此时中断执行异步任务的线程runner(还记得执行异步任务时就把执行异步任务的线程就赋值给了runner成员变量吗)
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
// 中断执行异步任务的线程runner
t.interrupt();
} finally { // final state
// 最后任务状态赋值为INTERRUPTED
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
// 【3】不管mayInterruptIfRunning为true还是false,此时都要调用finishCompletion方法唤醒阻塞的获取异步任务结果的线程并移除线程等待链表节点
} finally {
finishCompletion();
}
// 返回true
return true;
}
В приведенном выше коде, когда статус асинхронной задачиstate != NEW
, это означает, что асинхронная задача была выполнена нормально, завершилась аварийно или былаcancel
, затем вернитесь напрямуюfalse
; когда состояние асинхронной задачиstate = NEW
, в это время согласноmayInterruptIfRunning
Параметрtrue
Делится на следующие два случая:
- когда
mayInterruptIfRunning = false
, статус задачи на данный моментstate
непосредственно назначается какCANCELLED
, в это время потоку, выполняющему асинхронную задачу, не будет выдан сигнал прерывания,примечательныйСоответствующее изменение состояния задачи здесьNEW -> CANCELLED. - когда
mayInterruptIfRunning = true
Когда сигнал прерывания выдается потоку, выполняющему асинхронную задачу,примечательныйСоответствующее изменение состояния задачи здесьNEW -> INTERRUPTING -> INTERRUPTED.
В конце концов, неважноmayInterruptIfRunning
заtrue
все ещеfalse
, затем позвонитеfinishCompletion
Метод пробуждает заблокированный поток, который получает результат асинхронной задачи, и удаляет поток, ожидающий узла связанного списка.
отFutureTask.cancel
Мы можем получить ответ из исходного кода, этот метод не может реально прервать поток, выполняющий асинхронную задачу, а может только послать сигнал прерывания потоку, выполняющему асинхронную задачу. Если поток, выполняющий асинхронную задачу, находится вsleep
,wait
илиjoin
состояние, оно броситInterruptedException
исключение, поток может быть прерван, кроме того, если асинхронной задаче необходимоwhile
Если цикл выполняется, поток асинхронной задачи может быть завершен путем объединения следующего кода, то есть когда поток, выполняющий асинхронную задачу, прерывается, в это времяThread.currentThread().isInterrupted()
возвращениеtrue
, не удовлетвореныwhile
Таким образом, условие цикла выходит из цикла, завершая поток выполнения асинхронной задачи со следующим кодом:
public Integer call() throws Exception {
while (!Thread.currentThread().isInterrupted()) {
// 业务逻辑代码
System.out.println("running...");
}
return 666;
}
Уведомление: называетсяFutureTask.cancel
метод, если возвращаемый результатtrue
, если поток асинхронной задачи не может быть прерван, даже если поток асинхронной задачи выполняется нормально и возвращается результат выполнения, вызовитеFutureTask.get
Метод также не может получить результат выполнения асинхронной задачи, он выкинетCancellationException
аномальный. Вы знаете, почему это так?
потому что звонюFutureTask.cancel
метод, если возвращаемый результатtrue
, статус задачи в это времяCANCELLED
илиINTERRUPTED
, и обязательно реализуемfinishCompletion
метод, в то время какfinishCompletion
Метод разбудит поток, который получает результат асинхронной задачи, и будет ждать поток списка, а поток, который получит результат асинхронной задачи, проснется и найдет статусs >= CANCELLED
, это броситCancellationException
Исключительный.
5 Резюме
Ну статья правильнаяFutureTask
Это конец анализа исходного кода, давайте еще раз подведем итоги.FutureTask
Логика реализации:
- мы достигаем
Callable
интерфейс, переопределенныйcall
Бизнес-логика, которую необходимо выполнить, определяется в методе; - Затем мы реализуем
Callable
Объект реализации интерфейса передаетсяFutureTask
,ПотомFutureTask
Отправляется в поток для выполнения как асинхронная задача; - самое главное это
FutureTask
поддерживает состояние внутриstate
, любая операция (будь то асинхронная задача завершается нормально или отменяется) вращается вокруг этого состояния и обновляется в любой моментstate
статус задачи; - Только один поток может выполнять асинхронную задачу. Когда асинхронная задача выполняется, она может завершиться нормально, аварийно завершиться или быть отменена.
- Несколько потоков могут одновременно получать результат выполнения асинхронной задачи.Если асинхронная задача не была выполнена, поток, который получает асинхронную задачу в это время, присоединится к списку ожидания потока для ожидания;
- Когда выполнение потока асинхронной задачи завершено, поток, который получает результат выполнения асинхронной задачи, будет пробужден в это время.Обратите внимание, что последовательность пробуждения "последний вошел первым вышел", то есть заблокированный поток добавленные позже, будут разбужены первыми.
- когда мы звоним
FutureTask.cancel
метод на самом деле не останавливает поток, выполняющий асинхронную задачу, он просто сигнализирует потоку прерывания. Но покаcancel
метод возвращаетtrue
, даже если асинхронная задача может быть выполнена нормально, в это время мы вызываемget
Метод все равно будет бросать при получении результатаCancellationException
аномальный.
расширять: мы упоминали ранее
FutureTask
изrunner
,waiters
иstate
все используютvolatile
Модификация ключевого слова указывает, что эти три переменные являются объектами (переменными-членами), совместно используемыми несколькими потоками, и будут управляться несколькими потоками.volatile
Изменение ключевого слова для операции потокаvolatile
После того, как значение переменной атрибута установлено, оно может быть видимым для других потоков во времени. На данный момент многопоточная операция переменных-членов используется толькоvolatile
Ключевые слова по-прежнему имеют проблемы с безопасностью потоков, но в настоящее время г-н Дуг Леа не вводил никаких блокировок потоков, а принялUnsafe
изCAS
метод замены операции блокировки для обеспечения безопасности потоков.
6 Анализ исходного кода FutureTask, что мы можем узнать?
Какова цель нашего анализа исходного кода? кроме как понятьFutureTask
В дополнение к внутреннему принципу реализации фреймворка, нам также нужно учиться у различных навыков больших парней, чтобы писать исходный код фреймворка.Только таким образом мы можем расти.
АнализыFutureTask
Исходный код, мы можем узнать из него:
- использовать
LockSupport
Для достижения механизма блокировки/пробуждения потока; - использовать
volatile
иUNSAFE
изCAS
метод обеспечения безблокировочной работы общих переменных потока; - Чтобы написать логику исключения тайм-аута, вы можете обратиться к
FutureTask
изget(long timeout, TimeUnit unit)
логика реализации; - Логическая реализация списка ожидания потока, когда ему нужно ждать, когда несколько потоков получат результат переменной-члена;
- Логическая реализация того, что асинхронная задача может выполняться только одним потоком в определенное время;
-
FutureTask
статус задачи вsatate
Логическая реализация обработки изменений. - ...
Пункты, перечисленные выше, являются всеми местами, из которых мы можем учиться.
Если вы считаете, что это хорошо, пожалуйста, сделайте ретвит и безжалостно лайкните!
[Примечания к источнику] Адрес Github: