Перестаньте спрашивать меня, какова связь между Runnable, Callable, Future, FutureTask.

Java
Перестаньте спрашивать меня, какова связь между Runnable, Callable, Future, FutureTask.

Отсканируйте QR-код ниже или WeChat, чтобы найти официальную учетную запись.菜鸟飞呀飞, вы можете следить за публичной учетной записью WeChat, читать дальшеSpring源码分析а такжеJava并发编程статья.

微信公众号

Запускаемый и вызываемый

Как мы все знаем, когда мы используем потоки для запускаRunnableПри использовании задачи получение возвращаемого значения не поддерживается, поскольку метод run() интерфейса Runnable используетvoid修饰Да, методы не поддерживают возвращаемые значения. Во многих сценариях, с одной стороны, нам нужно выполнять задачи асинхронно через потоки для повышения производительности, а с другой стороны, мы также ожидаем получить результаты выполнения задач. Особенно в среде RPC асинхронное получение возвращаемого значения задачи — это функция, которую должен реализовать почти каждый интерфейс RPC. В настоящее время использование Runnable явно не может удовлетворить наши потребности, поэтомуCallableпоявился.

Callable похож на Runnable, это интерфейс и только один метод:call(), разница в том, что метод call() Callable имеет возвращаемое значение, а тип возвращаемого значения является универсальным типом, который указывается при создании объекта Callable.

public interface Callable<V> {
    V call() throws Exception;
}

Объект Runnable можно передать в конструктор класса Thread, а задачу Runnable можно запустить через поток, в то время как интерфейс Callable нельзя напрямую передать в поток для выполнения. Интерфейс Callable обычно используется в связке с потоком. бассейн. В дополнение к методу execute() для отправки задач пул потоков ThreadPoolExecutor также предоставляет три перегруженных метода submit() для отправки задач, каждый из которых имеет возвращаемые значения. Класс ThreadPoolExecutor наследует абстрактный классAbstractExecutorService, в AbstractExecutorService определены три метода перегрузки submit(). Конкретные определения следующие.

имя метода иллюстрировать
Future<?> submit(Runnable task) Хотя объект возвращаемого значения этого метода — Future, но поскольку отправленная задача имеет тип Runnable, используйтеFuture.get()Он возвращает null при получении результата.
Future submit(Runnable task,T result) Объект возвращаемого значения метода — это Future, который передается черезFuture.get()При получении определенного возвращаемого значения результат равен результату второго параметра метода.
Future submit(Callable task) Параметром этого метода являетсяCallable类型Объект, методы которого имеют возвращаемые значения. Значение, полученное при вызове Future.get(), является значением, возвращаемым методом call() интерфейса Callable.

Видно, что возвращаемые значения трех перегруженных методов submit() — это все объекты типа Future, так каково будущее Future?

Будущее и будущееЗадача

Когда задача отправляется в пул потоков, нам может понадобиться получить возвращаемое значение задачи, или мы хотим знать, была ли задача выполнена или нет, или даже иногда нам нужно отменить задачу из-за особых обстоятельств, так что мы должны сделать в это время?

В пакете JUC нам предоставляется класс инструмента:Future. Future — это интерфейс, который предоставляет методы 5. Когда задача отправляется в пул потоков с помощью метода submit (), пул потоков возвращает объект типа Future. Мы можем получить задачу с помощью этих 5 методов объекта Future. Состояние в пуле потоков. Эти методы определяются следующим образом.

имя метода иллюстрировать
boolean cancel(boolean mayInterruptIfRunning) Используется для отмены задачи. Параметр mayInterruptIfRunning используется, чтобы указать, нужно ли прервать поток. Если передано значение true, это означает, что поток необходимо прервать, тогда статус задачи будет установлен наINTERRUPTING; если false, то состояние задачи будет установлено вCANCELLED(относительно статуса задачиINTERRUPTINGа такжеCANCELLEDбудет объяснено позже)
boolean isCancelled() Определите, была ли задача отменена, верните true, чтобы указать, что она была отменена.
boolean isDone() Определить, выполнена ли задача
V get() Получение возвращаемого значения задачи заблокирует текущий поток до тех пор, пока не будет получено возвращаемое значение задачи.
V get(long timeout, TimeUnit unit) Получить возвращаемое значение задачи в виде тайм-аута.Если возвращаемое значение задачи не получено в течение периода тайм-аута, генерируется исключение TimeoutException.

Интерфейс Future имеет конкретный класс реализации:FutureTask. На самом деле, три перегруженных метода submit() пула потоков ThreadPoolExecutor, возвращаемые объекты типа Future являются объектами-экземплярами FutureTask. UML-диаграмма FutureTask выглядит следующим образом.

FutureTask类的UML图

Как видно из диаграммы UML, FutureTask реализован напрямуюRunnableFutureинтерфейс, а интерфейс RunnableFuture наследует интерфейсы Runnable и Future, поэтому FutureTask является как типом Runnable, так и типом Future. Когда метод submit() вызывается для отправки задачи в пул потоков, независимо от того, является ли отправленная задача задачей типа Runnable или Callable, задача в конечном итоге инкапсулируется в объект FutureTask. Ниже сFuture<T> submit(Callable<T> task)Например, давайте посмотрим на исходный код.

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    // 调用newTaskFor()将Callable任务封装成一个FutureTask
    RunnableFuture<T> ftask = newTaskFor(task);
    // 执行任务
    execute(ftask);
    return ftask;
}

// newTaskFor
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    // 直接new一个FutureTask对象
    return new FutureTask<T>(callable);
}

Когда метод submit() отправляет задачу Runnable, она будет вызыватьсяnewTaskFor()方法的另一个重载方法Чтобы инкапсулировать задачу в объект FutureTask, чтобы конечный пул потоков выполнял задачи типа FutureTask. (Обратите внимание, что объекты типа Runnable в конечном итоге будут проходитьExecutors.callable()метод, который инкапсулирует объект Runnable как объект типа Callable. Принцип Executors.callable() заключается в использовании适配器模式, адаптер естьRunnableAdapterДобрый)

Узнав взаимосвязь между Future и этими классами, давайте проанализируем, как пул потоков выполняет задачу FutureTask? И как получить возвращаемое значение задачи через метод Future.get()?

Принципы проектирования и структуры данных

Если вы хотите понять принципы нескольких методов интерфейса Future, вы должны сначала понять интерфейс данных FutureTask и принцип его построения.

Поскольку вы хотите получать статус задач в пуле через другие потоки вне пула потоков, а задачи в пуле потоков относятся к типу FutureTask, в объекте FutureTask должны быть переменные, связанные со статусом задачи.

Очень важные свойства определены в FutureTask. как показано в таблице ниже.

Атрибуты имея в виду
int state Переменная состояния используется для сохранения состояния задачи, и ее значения составляют значения 0–6, 7, каждое значение имеет разное значение, конкретное значение см. в следующем описании.
Callable callable Callable представляет отправленную нами задачу, а задача типа Runnable будет преобразована в Callable через Executors.callable().
Object outcome Используется для сохранения возвращаемого значения метода call() вызываемого объекта.
Thread runner Сохранить поток, выполняющий текущую задачу
WaitNode waiters Он используется для сохранения очереди ожидания потока, ожидающего получения возвращаемого значения задачи.Когда мы вызываем метод Future.get() в основном потоке, основной поток будет инкапсулирован в WaitNode. Когда несколько потоков одновременно вызывают метод Future.get(), WaitNode поддерживает связанный список через свойство next.

Существует 7 типов значений состояния. Значение каждого значения указано в комментариях к коду.

private volatile int state;
// 任务的初始状态,当新建一个FutureTask任务时,state值默认为NEW
private static final int NEW          = 0;
// 任务处于完成中,什么是完成中呢?有两种情况
// 1. 任务正常被线程执行完成了,但是还没有将返回值赋值给outcome属性
// 2. 任务在执行过程中出现了异常,被捕获了,然后处理异常了,在将异常对象赋值给outcome属性之前
private static final int COMPLETING   = 1;
// 任务正常被执行完成,并将任务的返回值赋值给outcome属性之后,会处于NORMAL状态
private static final int NORMAL       = 2;
// 任务出了异常,并将异常对象赋值给outcome属性之后
private static final int EXCEPTIONAL  = 3;
// 调用cancle(false),任务被取消了
private static final int CANCELLED    = 4;
// 调用cancle(true),任务取消,但是在线程中断之前
private static final int INTERRUPTING = 5;
//  调用cancle(true),任务取消,但是在线程中断之后
private static final int INTERRUPTED  = 6;

Хотя состояние задачи имеет семь значений, его можно условно разделить на три категории: начальное состояние, промежуточное состояние и конечное состояние. Изменение отношения этих состояний показано на следующем рисунке.

任务状态变化

  • Когда задача отправляется в пул потоков, ее начальное состояниеNEW; Когда задача выполняется нормально, она сначала установит состояние задачи вCOMPLETING; Затем присвойте возвращаемое значение задачи (то есть возвращаемое значение метода call() Callable) в FutureTaskoutcomeАтрибут, когда задание выполнено, установить статус задачи наNORMAL. Это процесс нормального выполнения задачи, т. е. соответствующая цифрапоказана линия.
  • Когда задача отправляется в пул потоков, поток имеет исключение при выполнении задачи, тогда статус задачи будет изменен сNEWУстановить какCOMPLETING; затем присвойте объект исключенияoutcomeсвойство, когда задание выполнено, установите состояние задачи вEXCEPTIONAL. Это тот случай, когда задача нештатная, т. е. соответствующая цифрапоказана линия.
  • Когда задача отправляется в пул потоков, если вызывается метод cancle() объекта Future, когда параметр, переданный cancle(), имеет значение false, статус задачи будет изменен непосредственно сNEWУстановить какCANCELLED, то есть соответствующий рис.соответствующий маршрут.
  • Когда параметр, переданный методом cancel(), имеет значение true, состояние задачи будет установлено наINTERRUPTING; затем вызовите потокinterrupt()метод и, наконец, установите статус задачи наINTERRUPTED, то есть рис.соответствующую строку.

Анализ исходного кода

Когда метод submit() вызывается для отправки задачи в пул потоков, он сначала вызываетnewTaskFor()Метод инкапсулирует задачу в объект FutureTask, а затем вызывает метод execute() для выполнения задачи. В методе execute() сначала будет запущен поток Worker, а когда поток будет запущен, будет вызван метод runWorker() потока. существуетrunWorker()метод в конечном итоге вызоветtask.run()метод, который является методом run() FutureTask. Подробный анализ исходного кода этого шага можно найти в этой статье:Принцип реализации пула потоков ThreadPoolExecutor

Далее анализируется только метод FutureTask.run(). В методе run() в конечном итоге вызывается метод call() вызываемого свойства. Когда задача выполняется нормально, будет вызываться FutureTaskset()метод для обновления состояния задачи и сохранения возвращаемого значения задачи и, наконец, для пробуждения ожидающего потока, который получает результат задачи. Если возникает исключение, оно будет вызваноsetException()методы для обновления состояния задачи, сохранения исключений и пробуждения ожидающих потоков. Ниже приведен исходный код метода run() Я удалил исходный код и сохранил только основную логику.

public void run() {
    ......
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                // 执行任务
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                // 出异常时将state置为EXCEPTIONAL
                setException(ex);
            }
            if (ran)
                // 设置任务状态为COMPLETING,然后保存返回值,最后再设置为NORMAL
                set(result);
        }
    } finally {
        // 其他处理
        ......
    }
}

заset()а такжеsetException()Метод относительно прост: обновить статус задачи через CAS, а затем присвоить возвращаемое значение задачиoutcomeсвойства, последний звонокfinishCompletion()метод пробужденияwaitersПотоки в очереди ожидания, сформированные этим свойством. (Чтобы узнать о принципах и знаниях, связанных с CAS, вы можете обратиться к этим двум статьям:Первое понимание принципа реализации CASа такжеИнтерпретация исходного кода класса Unsafe и сценариев использования)

Затем проанализируйте конкретный исходный кодFuture.get()выполнение метода. Когда вызывается метод Future.get(), вызывается метод get() FutureTask. В методе get() сначала определите, выполнена ли задача, если она была завершена, она сразу вернет результат, если нет, то подождет.

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    // 如果状态处于NEW或者COMPLETING状态,表示任务还没有执行完成,需要等待
    if (s <= COMPLETING)
        // awaitDone()进行等待
        s = awaitDone(false, 0L);
    // 返回结果
    return report(s);
}

позвонивreport(s)Метод возвращает результат, в методе report() он сначала определяет, находится ли задача вNORMALСтатус, то есть завершена ли задача нормально.Только при завершении нормального выполнения будет возвращен результат, в противном случае будет выброшено соответствующее исключение.

private V report(int s) throws ExecutionException {
    Object x = outcome;
    // 只有任务正常结束时,才会返回
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

когда задача вNEW或者COMPLETINGВ состоянии это означает, что задача выполняется или возвращаемое значение задачи не было присвоено атрибуту результата, поэтому в это время результат не может быть возвращен, поэтому он должен войти в состояние ожидания, то есть , вызовawaitDone()метод. В методе awaitDone() есть бесконечный цикл for, который сначала определяет, находится ли задача в состоянии ЗАВЕРШЕНИЕ. Если он находится в состоянии ЗАВЕРШЕНИЕ, позвольте текущему потоку сначала отказаться от права планирования ЦП (зачем вам отказываться от права планирования ЦП? Потому что переход из состояния ЗАВЕРШЕНИЕ в состояние НОРМАЛЬНОЕ или другие состояния очень короткий процесс, пусть текущий поток сначала отдает ЦП Право планирования ЦП позволяет другим потокам получать ресурсы ЦП, а квант времени ЦП также является очень коротким периодом времени Когда следующий поток получает ресурсы ЦП, состояние задачи, скорее всего, станет НОРМАЛЬНЫМ или другими конечными состояниями. , поскольку код находится в цикле for, он войдет в следующий цикл). Если текущая задача не находится в состоянии ЗАВЕРШЕНИЕ, потоку будет разрешено ожидать парковки. Определяется параметрами, переданными методом awaitDone(). (Когда вызывается метод park(), когда эти потоки пробуждаются? Когда статус задачи становится最终状态, он позвонитfinishCompletion()способ разбудить эти ожидающие потоки. )

Ниже приведена часть исходного кода метода awaitDone() Я удалил исходный код и сохранил только основную логику.

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    ......
    for (;;) {
        ......
        // 任务处于COMPLETING中,就让当前线程先暂时放弃CPU的执行权
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        ......                               
        else if (timed) {
        	// 超时时间计算
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            // 等待一段时间
            LockSupport.parkNanos(this, nanos);
        }
        else
        	// 等待
            LockSupport.park(this);
    }
}

Суммировать

  • В этой статье в основном представлена ​​разница между интерфейсом Runnable и интерфейсом Callable.Первый не имеет возвращаемого значения и может выполняться непосредственно потоком, второй имеет возвращаемое значение и не может выполняться потоком напрямую и должен выполняться через поток. бассейн.
  • Затем он представляет пять методов интерфейса Future, а также несколько важных свойств и структур данных его класса реализации FutureTask. Будь то объект Runnable или Callable, при отправке в пул потоков он инкапсулируется в объект FutureTask и выполняется. Для сценариев использования Future существует большое количество приложений в Netty и Dubbo.
  • Наконец, в сочетании с исходным кодом подробно представлен принцип реализации методов get() и run() FutureTask.

рекомендовать

微信公众号