Как Java реализует шаблон Future? 10000 слов подробное объяснение!

Java
Как Java реализует шаблон Future? 10000 слов подробное объяснение!

Проект анализа исходного кода JDK1.8 (китайская аннотация) Адрес Github:

GitHub.com/Примечания к источнику/…

1 Что такое будущее?

Например, мы обычно покупаем вещи в Интернете.После размещения заказа будет сгенерирован номер заказа, и затем продавец доставит товар в соответствии с этим номером заказа.Экспресс-покупки в Интернете для нас. В этом процессе эта серия номеров отслеживания является важным документом для нашей квитанции.

Таким образом, будущее JDK похоже на порядковый номер вещей, которые мы покупаем в Интернете.Когда мы выполняем трудоемкую задачу, мы можем запустить другой поток для асинхронного выполнения этой трудоемкой задачи, и в то же время мы можем делать другие вещи. Когда работа сделана, мы можем извлечь результат выполнения трудоемкой задачи по «единому номеру» будущего. Следовательно, Future также является режимом приложения в многопоточности.

расширять: Говоря о многопоточности, в чем разница между Future и Thread? Самое важное отличие состоит в том, что Thread не возвращает результаты, а режим Future возвращает.

2 Как использовать Будущее

Прежде чем мы разберемся, что такое Future, давайте возьмем простой пример, чтобы увидеть, как использовать Future.

Если мы хотим приготовить горячий горшок сейчас, сначала нам нужно подготовить две вещи: вскипятить воду и подготовить ингредиенты. Поскольку кипячение воды — это длительный процесс (эквивалентно трудоемкой бизнес-логике), мы можем кипятить воду (эквивалентно запуску другого потока) во время приготовления ингредиентов для горячих блюд (основной поток) и ждать, пока они оба будут готовы. горшок.

// DaHuoGuo.java

public class DaHuoGuo {
	public static void main(String[] args) throws Exception {
		FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
			@Override
			public String call() throws Exception {
				System.out.println(Thread.currentThread().getName() + ":" + "开始烧开水...");
				// 模拟烧开水耗时
				Thread.sleep(2000);
				System.out.println(Thread.currentThread().getName() + ":"  + "开水已经烧好了...");
				return "开水";
			}
		});

		Thread thread = new Thread(futureTask);
		thread.start();

		// do other thing
		System.out.println(Thread.currentThread().getName() + ":"  + " 此时开启了一个线程执行future的逻辑(烧开水),此时我们可以干点别的事情(比如准备火锅食材)...");
		// 模拟准备火锅食材耗时
		Thread.sleep(3000);
		System.out.println(Thread.currentThread().getName() + ":"  + "火锅食材准备好了");
		String shicai = "火锅食材";

		// 开水已经稍好,我们取得烧好的开水
		String boilWater = futureTask.get();

		System.out.println(Thread.currentThread().getName() + ":"  + boilWater + "和" + shicai + "已经准备好,我们可以开始打火锅啦");
	}
}

Результат выполнения показан на скриншоте ниже, что соответствует нашим ожиданиям:

Как видно из приведенного выше кода, мы используем Future в основном на следующих этапах:

  1. создать новыйCallableАнонимные функции реализуют объекты класса, и наша бизнес-логикаCallableизcallРеализовано в методе, где универсальный тип Callable является типом возвращаемого результата;
  2. тогда поставьCallableанонимный функциональный объект какFutureTaskПараметры конструктора передаются для созданияfutureTaskобъект;
  3. тогда поставьfutureTaskобъект какThreadПараметры построения передаются, и поток запускается для выполнения бизнес-логики;
  4. Наконец мы звонимfutureTaskобъектgetМетод получает результат выполнения бизнес-логики.

Вы можете видеть, что классы JDK, связанные с использованием Future, в основномFutureTaskиCallableДва, следующие в основном дляFutureTaskВыполните анализ исходного кода.

расширять: есть еще один способ использованияFutureпуть кCallableЗдесь не будет представлен способ передачи класса реализации в пул потоков для выполнения, а только Baidu.

3 Анализ структуры класса FutureTask

Давайте сначала посмотримFutureTaskСтруктура класса:

можно увидетьFutureTaskДостигнутоRunnableFutureинтерфейс, при этомRunnableFutureИнтерфейс снова наследуетFutureиRunnableинтерфейс. так какFutureTaskреализовано косвенноRunnableинтерфейс, поэтому его можно использовать как задачуThreadвыполнять; кроме того,Самая важная вещьэтоFutureTaskтакже косвенноFutureинтерфейс, поэтому результат выполнения задачи тоже можно получить. Давайте кратко рассмотрим эти интерфейсы.api.

// Runnable.java

@FunctionalInterface
public interface Runnable {
    // 执行线程任务
    public abstract void run();
}

RunnableРассказывать особо нечего, я думаю, что все уже знакомы с ним.

// Future.java

public interface Future<V> {
    /**
     * 尝试取消线程任务的执行,分为以下几种情况:
     * 1)如果线程任务已经完成或已经被取消或其他原因不能被取消,此时会失败并返回false;
     * 2)如果任务还未开始执行,此时执行cancel方法,那么任务将被取消执行,此时返回true;TODO 此时对应任务状态state的哪种状态???不懂!!
     * 3)如果任务已经开始执行,那么mayInterruptIfRunning这个参数将决定是否取消任务的执行。
     *    这里值得注意的是,cancel(true)实质并不能真正取消线程任务的执行,而是发出一个线程
     *    中断的信号,一般需要结合Thread.currentThread().isInterrupted()来使用。
     */
    boolean cancel(boolean mayInterruptIfRunning);
    /**
     * 判断任务是否被取消,在执行任务完成前被取消,此时会返回true
     */
    boolean isCancelled();
    /**
     * 这个方法不管任务正常停止,异常还是任务被取消,总是返回true。
     */
    boolean isDone();
    /**
     * 获取任务执行结果,注意是阻塞等待获取任务执行结果。
     */
    V get() throws InterruptedException, ExecutionException;
    /**
     * 获取任务执行结果,注意是阻塞等待获取任务执行结果。
     * 只不过在规定的时间内未获取到结果,此时会抛出超时异常
     */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

FutureИнтерфейс символизирует результат асинхронного выполнения задач, то есть выполнение трудоёмкой задачи может выполняться в другом потоке, и тогда мы можем в это время заниматься другими делами, а вызывать будем после завершения других дел.Future.get()Метод может получить результат, в это время, если асинхронная задача не завершилась, он заблокируется и будет ждать, пока асинхронная задача не выполнится и не будет получен результат.

// RunnableFuture.java

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

RunnableFutureдаFutureиRunnableКомпозиция интерфейса, то есть представление интерфейса, может выполняться потоком асинхронно, поскольку реализацияRunnableинтерфейс, вы можете получить результат выполнения асинхронной задачи потока, потому что реализацияFutureинтерфейс. Так решеноRunnableАсинхронные задачи не имеют недостатка в возврате результата.

Далее давайте посмотрим наFutureTask,FutureTaskДостигнутоRunnableFutureинтерфейс так себеFutureиRunnableКонкретный класс реализации интерфейса представляет собой отменяемую задачу асинхронного потока, которая обеспечиваетFutureБазовая реализация асинхронной задачи, то есть после выполнения асинхронной задачи мы можем получить результат выполнения асинхронной задачи, что является высшим приоритетом нашего следующего анализа.FutureTaskможет упаковатьCallableиRunnableобъект, кроме того,FutureTaskПомимо выполнения потоками, он также может быть отправлен в пул потоков для выполнения.

Давайте взглянемFutureTaskКатегорияapi, основные методы обведены красным.

На фото вышеFutureTaskизrunМетод — это метод, который асинхронно выполняется потоком.getМетод — это метод получения результата выполнения асинхронной задачи.cancelметод — это метод, который отменяет выполнение задачи. Далее мы в основном сосредоточимся на этих трех методах.

считать:

  1. FutureTaskперезаписанныйrunТип возврата метода по-прежнемуvoid, указывая на отсутствие возвращаемого значения, тоFutureTaskизgetКак метод получает возвращаемое значение?
  2. FutureTaskизcancelМожет ли метод действительно отменить выполнение асинхронной задачи потока? При каких обстоятельствах я могу отменить?

так какFutureTaskРезультат выполнения асинхронной задачи также следуетCallableИнтерфейс связан, так что давайте посмотрим на него еще разCallableинтерфейс:

// Callable.java

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     */
    V call() throws Exception;
}

мы все знаем,Callable<V>интерфейс иRunnableВсе интерфейсы могут быть отправлены в пул потоков для выполнения, единственная разницаCallable<V>Интерфейс имеет возвращаемый результат, и общий тип в немVдолжен вернуть результат, иRunnableИнтерфейс не возвращает результатов.

считать: В целом,RunnableТолько класс реализации интерфейса может быть отправлен в пул потоков для выполнения, почемуCallableКлассы реализации интерфейса также могут быть отправлены в пул потоков для выполнения? Подумайте о пулах потоковsubmitвнутри методаCallableЭто подходит?

4 Анализ исходного кода FutureTask

4.1 Переменные-члены FutureTask

давайте сначала посмотримFutureTaskКаковы переменные-члены , и понимание этих переменных-членов очень важно для последующего анализа исходного кода.

// FutureTask.java

/** 封装的Callable对象,其call方法用来执行异步任务 */
private Callable<V> callable;
/** 在FutureTask里面定义一个成员变量outcome,用来装异步任务的执行结果 */
private Object outcome; // non-volatile, protected by state reads/writes
/** 用来执行callable任务的线程 */
private volatile Thread runner;
/** 线程等待节点,reiber stack的一种实现 */
private volatile WaitNode waiters;
/** 任务执行状态 */
private volatile int state;

// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
// 对应成员变量state的偏移地址
private static final long stateOffset;
// 对应成员变量runner的偏移地址
private static final long runnerOffset;
// 对应成员变量waiters的偏移地址
private static final long waitersOffset;

Здесь мы сосредоточимся наFutureTaskизCallableпеременная-член, потому чтоFutureTaskАсинхронная задача в конечном итоге делегируетсяCallableдостигать.

считать:

  1. FutureTaskпеременная-членrunner,waitersиstateвсе былоvolatileМодификация, мы можем подумать о том, почему эти три переменные-члены должны бытьvolatileИзменено, и другие переменные-члены не используют его?volatileКакова роль ключевых слов?
  2. Теперь, когда переменные-члены определеныrunner,waitersиstate, теперь снова определеноstateOffset,runnerOffsetиwaitersOffsetПеременные соответствуютrunner,waitersиstateАдрес смещения , зачем беспокоиться?

Посмотрим еще разstateOffset,runnerOffsetиwaitersOffsetПроцесс инициализации этих трех переменных:

// FutureTask.java

static {
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> k = FutureTask.class;
        stateOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("state"));
        runnerOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("runner"));
        waitersOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("waiters"));
    } catch (Exception e) {
        throw new Error(e);
    }
    }

4.2 Изменения состояния FutureTask

сказал ранееFutureTaskпеременная-член, имеет представлениегосударствопеременная-членstateДавайте сосредоточимся наstateПеременная представляет состояние выполнения задачи.

// FutureTask.java

/** 任务执行状态 */
private volatile int state;
/** 任务新建状态 */
private static final int NEW          = 0;
/** 任务正在完成状态,是一个瞬间过渡状态 */
private static final int COMPLETING   = 1;
/** 任务正常结束状态 */
private static final int NORMAL       = 2;
/** 任务执行异常状态 */
private static final int EXCEPTIONAL  = 3;
/** 任务被取消状态,对应cancel(false) */
private static final int CANCELLED    = 4;
/** 任务中断状态,是一个瞬间过渡状态 */
private static final int INTERRUPTING = 5;
/** 任务被中断状态,对应cancel(true) */
private static final int INTERRUPTED  = 6;

Вы можете увидеть переменные состояния задачиstateВышеупомянутые 7 состояний, 0-6 соответствуют каждому состоянию соответственно. Статус задачи изначальноNEW, затем поFutureTaskтри методаset,setExceptionиcancelЧтобы установить изменение состояния, изменение состояния имеет следующие четыре ситуации:

  1. NEW -> COMPLETING -> NORMAL: это изменение состояния указывает на нормальное завершение асинхронной задачи, гдеCOMPLETINGявляется мгновенным временным переходным состоянием,setМетод задает изменение состояния;
  2. NEW -> COMPLETING -> EXCEPTIONAL: это изменение состояния указывает на то, что во время выполнения асинхронной задачи возникает исключение.setExceptionМетод задает изменение состояния;
  3. NEW -> CANCELLED: это изменение состояния указывает на то, что оно отменено, то есть вызов сделанcancel(false),Зависит отcancelметод установки изменений состояния;
  4. NEW -> INTERRUPTING -> INTERRUPTED: Это изменение состояния указывает на то, что оно прервано, то есть вызов сделанcancel(true),Зависит отcancelметод установки изменений состояния.

4.3 Конструктор FutureTask

FutureTaskЕсть два конструктора, рассмотрим их по отдельности:

// FutureTask.java

// 第一个构造函数
public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

Можно видеть, что этот конструктор полезен в примере кода «горячего горшка», который мы упоминали ранее, то естьCallableНазначение переменных-членов, вызываемое при асинхронном выполнении задачиCallable.callМетод выполняет логику асинхронной задачи. Кроме того, в этот момент укажите статус задачиstateназначить какNEW, указывающий на новый статус задачи.

давайте посмотрим еще разFutureTaskДругой конструктор:

// FutureTask.java

// 另一个构造函数
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

Этот конструктор выполняетExecutors.callable(runnable, result)через адаптерRunnableAdapterбудущееRunnableобъектrunnableпреобразовать вCallableобъект, а затем датьcallableиstateприсвоение переменной.

Уведомление, здесь нужно помнить, чтоFutureTaskКогда новый, статус задачи в это времяstateдаNEWДостаточно.

4.4 Метод FutureTask.run, используемый для выполнения асинхронных задач

Ранее мы упоминалиFutureTaskреализовано косвенноRunnableинтерфейс, переопределенныйRunnableинтерфейсrunметод, поэтому переопределенныйrunМетод передается потоку для выполнения, и в то же времяrunМетод — это именно метод выполнения логики асинхронной задачи, то после выполненияrunКак метод сохраняет результат выполнения асинхронной задачи?

Теперь сосредоточимся на анализеrunметод:

// FutureTask.java

public void run() {
    // 【1】,为了防止多线程并发执行异步任务,这里需要判断线程满不满足执行异步任务的条件,有以下三种情况:
    // 1)若任务状态state为NEW且runner为null,说明还未有线程执行过异步任务,此时满足执行异步任务的条件,
    // 此时同时调用CAS方法为成员变量runner设置当前线程的值;
    // 2)若任务状态state为NEW且runner不为null,任务状态虽为NEW但runner不为null,说明有线程正在执行异步任务,
    // 此时不满足执行异步任务的条件,直接返回;
    // 1)若任务状态state不为NEW,此时不管runner是否为null,说明已经有线程执行过异步任务,此时没必要再重新
    // 执行一次异步任务,此时不满足执行异步任务的条件;
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        // 拿到之前构造函数传进来的callable实现类对象,其call方法封装了异步任务执行的逻辑
        Callable<V> c = callable;
        // 若任务还是新建状态的话,那么就调用异步任务
        if (c != null && state == NEW) {
            // 异步任务执行结果
            V result;
            // 异步任务执行成功还是始遍标志
            boolean ran;
            try {
                // 【2】,执行异步任务逻辑,并把执行结果赋值给result
                result = c.call();
                // 若异步任务执行过程中没有抛出异常,说明异步任务执行成功,此时设置ran标志为true
                ran = true;
            } catch (Throwable ex) {
                result = null;
                // 异步任务执行过程抛出异常,此时设置ran标志为false
                ran = false;
                // 【3】设置异常,里面也设置state状态的变化
                setException(ex);
            }
            // 【3】若异步任务执行成功,此时设置异步任务执行结果,同时也设置状态的变化
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        // 异步任务正在执行过程中,runner一直是非空的,防止并发调用run方法,前面有调用cas方法做判断的
        // 在异步任务执行完后,不管是正常结束还是异常结束,此时设置runner为null
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        // 线程执行异步任务后的任务状态
        int s = state;
        // 【4】如果执行了cancel(true)方法,此时满足条件,
        // 此时调用handlePossibleCancellationInterrupt方法处理中断
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

Вы можете увидеть выполнение асинхронных задачrunМетод в основном делится на следующие четыре шага:

  1. Определить, соответствует ли поток условиям для выполнения асинхронных задач: чтобы предотвратить одновременное выполнение нескольких потоков асинхронных задач, необходимо оценить, заполнены ли потоки или не соответствуют ли они условиям для выполнения асинхронных задач;
  2. Если условие выполнено, выполнить асинхронную задачу: потому что логика асинхронной задачи инкапсулирована вCallable.callметод, вызывайте непосредственно в это времяCallable.callМетод выполняет асинхронную задачу, а затем возвращает результат выполнения;
  3. Выполнять различную обработку в зависимости от выполнения асинхронных задач: 1) Если выполнение асинхронной задачи завершается нормально, вызовите в это времяset(result);Чтобы установить результат выполнения задачи; 2) Если асинхронное выполнение задачи вызывает исключение, вызовите это времяsetException(ex);Чтобы установить исключения, см. подробный анализ4.4.1小节;
  4. Последствия выполнения асинхронной задачи: Независимо от успешности или неудачи асинхронной задачи, если другие потоки вызываютFutureTask.cancel(true), вам нужно позвонитьhandlePossibleCancellationInterruptМетод обрабатывает прерывание.Для подробного анализа см.4.4.2小节.

здесьпримечательныйЭто судить о том, заполнен ли поток или нет, чтобы соответствовать условиям выполнения асинхронных задач,runnerЭтоnullэто вызовUNSAFEизCASметодcompareAndSwapObjectсудить и устанавливать, и в то же времяcompareAndSwapObjectчерез переменные-членыrunnerадрес смещенияrunnerOffsetприйти, чтобы датьrunnerНазначены, кроме того, переменные-членыrunnerизменено какvolatileВ случае многопоточности один потокvolatileЗначение декоративной переменной может быть немедленно сброшено в основную память, чтобы оно было видно другим потокам.

4.4.1 Методы set и setException FutureTask

Давайте посмотрим на вызов, когда асинхронное выполнение задачи завершается нормально.set(result);метод:

// FutureTask.java

protected void set(V v) {
    // 【1】调用UNSAFE的CAS方法判断任务当前状态是否为NEW,若为NEW,则设置任务状态为COMPLETING
    // 【思考】此时任务不能被多线程并发执行,什么情况下会导致任务状态不为NEW?
    // 答案是只有在调用了cancel方法的时候,此时任务状态不为NEW,此时什么都不需要做,
    // 因此需要调用CAS方法来做判断任务状态是否为NEW
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // 【2】将任务执行结果赋值给成员变量outcome
        outcome = v;
        // 【3】将任务状态设置为NORMAL,表示任务正常结束
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        // 【4】调用任务执行完成方法,此时会唤醒阻塞的线程,调用done()方法和清空等待线程链表等
        finishCompletion();
    }
}

Видно, что после нормального выполнения асинхронной задачи заканчивается, а асинхронная задача не выполняетсяcancelВ случае , он сделает следующее: сохранит результат выполнения задачи вFutureTaskпеременная-членoutcome, он будет вызываться после присваиванияfinishCompletionметод пробуждения заблокированного потока (откуда взялся заблокированный поток? Это будет проанализировано позже),примечательныйСоответствующее изменение состояния задачи здесьNEW -> COMPLETING -> NORMAL.

Давайте продолжим видеть, что когда во время выполнения асинхронной задачи возникает исключение, оно будет вызвано в это время.setException(ex);метод.

// FutureTask.java

protected void setException(Throwable t) {
    // 【1】调用UNSAFE的CAS方法判断任务当前状态是否为NEW,若为NEW,则设置任务状态为COMPLETING
    // 【思考】此时任务不能被多线程并发执行,什么情况下会导致任务状态不为NEW?
    // 答案是只有在调用了cancel方法的时候,此时任务状态不为NEW,此时什么都不需要做,
    // 因此需要调用CAS方法来做判断任务状态是否为NEW
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // 【2】将异常赋值给成员变量outcome
        outcome = t;
        // 【3】将任务状态设置为EXCEPTIONAL
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        // 【4】调用任务执行完成方法,此时会唤醒阻塞的线程,调用done()方法和清空等待线程链表等
        finishCompletion();
    }
}

можно увидетьsetException(Throwable t)Логика кода такая же, как и в предыдущемset(V v)Почти то же самое, разница в том, что во время выполнения задачи выбрасывается исключение, и исключение сохраняется вFutureTaskпеременная-членoutcomeв, а также,примечательныйСоответствующее изменение состояния задачи здесьNEW -> COMPLETING -> EXCEPTIONAL.

Поскольку асинхронная задача завершается нормально или аварийно, она будет вызвана в это время.FutureTaskизfinishCompletionметод для пробуждения заблокированного потока, где заблокированный поток означает, что мы вызываемFuture.getЕсли асинхронная задача не была выполнена во время выполнения метода, поток в это время будет заблокирован.

// FutureTask.java

private void finishCompletion() {
    // assert state > COMPLETING;
    // 取出等待线程链表头节点,判断头节点是否为null
    // 1)若线程链表头节点不为空,此时以“后进先出”的顺序(栈)移除等待的线程WaitNode节点
    // 2)若线程链表头节点为空,说明还没有线程调用Future.get()方法来获取任务执行结果,固然不用移除
    for (WaitNode q; (q = waiters) != null;) {
        // 调用UNSAFE的CAS方法将成员变量waiters设置为空
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                // 取出WaitNode节点的线程
                Thread t = q.thread;
                // 若取出的线程不为null,则将该WaitNode节点线程置空,且唤醒正在阻塞的该线程
                if (t != null) {
                    q.thread = null;
                    //【重要】唤醒正在阻塞的该线程
                    LockSupport.unpark(t);
                }
                // 继续取得下一个WaitNode线程节点
                WaitNode next = q.next;
                // 若没有下一个WaitNode线程节点,说明已经将所有等待的线程唤醒,此时跳出for循环
                if (next == null)
                    break;
                // 将已经移除的线程WaitNode节点的next指针置空,此时好被垃圾回收
                q.next = null; // unlink to help gc
                // 再把下一个WaitNode线程节点置为当前线程WaitNode头节点
                q = next;
            }
            break;
        }
    }
    // 不管任务正常执行还是抛出异常,都会调用done方法
    done();
    // 因为异步任务已经执行完且结果已经保存到outcome中,因此此时可以将callable对象置空了
    callable = null;        // to reduce footprint
}

finishCompletionФункция метода заключается в пробуждении и удалении узла ожидающего потока из списка ожидания потока независимо от того, нормально или аварийно завершается асинхронная задача.Treiber stack, поэтому порядок пробуждения (удаления) такой: "последний вошел-первый вышел", то есть поток, который приходит первым, пробуждается (удаляется) первым. продолжить анализ позже.

4.4.2 Метод handlePossibleCancellationInterrupt FutureTask

существует4.4小节анализrunВ конце метода естьfinallyблокировать, если статус задачиstate >= INTERRUPTING, это означает, что выполняются другие потокиcancel(true)метод, необходимо датьCPUСегмент времени выполнения выполняется другими потоками Давайте посмотрим на конкретный исходный код:

// FutureTask.java

private void handlePossibleCancellationInterrupt(int s) {
    // It is possible for our interrupter to stall before getting a
    // chance to interrupt us.  Let's spin-wait patiently.
    // 当任务状态是INTERRUPTING时,此时让出CPU执行的机会,让其他线程执行
    if (s == INTERRUPTING)
        while (state == INTERRUPTING)
            Thread.yield(); // wait out pending interrupt

    // assert state == INTERRUPTED;

    // We want to clear any interrupt we may have received from
    // cancel(true).  However, it is permissible to use interrupts
    // as an independent mechanism for a task to communicate with
    // its caller, and there is no way to clear only the
    // cancellation interrupt.
    //
    // Thread.interrupted();
}

считать: почему статус задачиINTERRUPTINGНа этот раз нужно отказаться от временного отрезка выполнения CPU? И почему он должен вызываться после выполнения обязательной задачиhandlePossibleCancellationInterruptметод?

4.5 Метод FutureTask.get, получить результат выполнения задачи

前面我们起一个线程在其`run`方法中执行异步任务后,此时我们可以调用`FutureTask.get`方法来获取异步任务执行的结果。

// FutureTask.java

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    // 【1】若任务状态<=COMPLETING,说明任务正在执行过程中,此时可能正常结束,也可能遇到异常
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    // 【2】最后根据任务状态来返回任务执行结果,此时有三种情况:1)任务正常执行;2)任务执行异常;3)任务被取消
    return report(s);
}

Вы можете видеть, что если статус задачиstate<=COMPLETING, указывая на то, что асинхронная задача находится в процессе выполнения, в это время она вызоветawaitDoneМетод блокируется и ждет, при выполнении задачи в это время снова вызываетсяreportспособ сообщить о результате задачи, в это время возможны три ситуации: 1) задача выполняется нормально; 2) задача выполняется ненормально; 3) задача отменена.

4.5.1 Метод FutureTask.awaitDone

FutureTask.awaitDoneМетод блокирует текущий поток, который получает результат выполнения асинхронной задачи, до завершения выполнения асинхронной задачи.

// FutureTask.java

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    // 计算超时结束时间
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    // 线程链表头节点
    WaitNode q = null;
    // 是否入队
    boolean queued = false;
    // 死循环
    for (;;) {
        // 如果当前获取任务执行结果的线程被中断,此时移除该线程WaitNode链表节点,并抛出InterruptedException
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        // 【5】如果任务状态>COMPLETING,此时返回任务执行结果,其中此时任务可能正常结束(NORMAL),可能抛出异常(EXCEPTIONAL)
        // 或任务被取消(CANCELLED,INTERRUPTING或INTERRUPTED状态的一种)
        if (s > COMPLETING) {
            // 【问】此时将当前WaitNode节点的线程置空,其中在任务结束时也会调用finishCompletion将WaitNode节点的thread置空,
            // 这里为什么又要再调用一次q.thread = null;呢?
            // 【答】因为若很多线程来获取任务执行结果,在任务执行完的那一刻,此时获取任务的线程要么已经在线程等待链表中,要么
            // 此时还是一个孤立的WaitNode节点。在线程等待链表中的的所有WaitNode节点将由finishCompletion来移除(同时唤醒)所有
            // 等待的WaitNode节点,以便垃圾回收;而孤立的线程WaitNode节点此时还未阻塞,因此不需要被唤醒,此时只要把其属性置为
            // null,然后其有没有被谁引用,因此可以被GC。
            if (q != null)
                q.thread = null;
            // 【重要】返回任务执行结果
            return s;
        }
        // 【4】若任务状态为COMPLETING,此时说明任务正在执行过程中,此时获取任务结果的线程需让出CPU执行时间片段
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        // 【1】若当前线程还没有进入线程等待链表的WaitNode节点,此时新建一个WaitNode节点,并把当前线程赋值给WaitNode节点的thread属性
        else if (q == null)
            q = new WaitNode();
        // 【2】若当前线程等待节点还未入线程等待队列,此时加入到该线程等待队列的头部
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        // 若有超时设置,那么处理超时获取任务结果的逻辑
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        // 【3】若没有超时设置,此时直接阻塞当前线程
        else
            LockSupport.park(this);
    }
}

FutureTask.awaitDoneОсновные вещи, которые делает этот метод, резюмируются следующим образом:

  1. во-первыхawaitDoneМетод представляет собой бесконечный цикл;
  2. Если текущий поток, который получает результат, прерывается другим потоком, узел связанного списка WaitNode потока удаляется в это время и генерируется InterruptedException;
  3. Если статус задачиstate>COMPLETING, в это время возвращается результат выполнения задачи;
  4. Если статус задачиCOMPLETING, поток, который получает результат задачи, должен отказаться от сегмента времени выполнения ЦП;
  5. какq == null, указывая на то, что текущий поток не был установлен вWaitNodeузел, новый на данный моментWaitNodeузел и установить егоthreadАтрибут — текущий поток;
  6. какqueued==false, указывающий текущий потокWaitNodeУзел еще не присоединился к списку ожидания потока, а затем присоединяется к началу списка;
  7. когдаtimedЕсли установлено значение true, метод имеет функцию тайм-аута в это время, и логика тайм-аута здесь подробно не анализируется;
  8. Когда предыдущие 6 условий не выполняются, текущий поток в это время блокируется.

Мы проанализировали здесь, у нас может быть только один поток для выполнения асинхронной задачи, и результат асинхронной задачи может быть получен несколькими потоками.Когда асинхронная задача не была выполнена, поток, который получает результат асинхронной задачи, присоединится к потоку чтобы ждать связанный список, затем позвоните по вызовуLockSupport.park(this);метод блокирует текущий поток. Пока асинхронная задача не будет выполнена, она будет вызыватьсяfinishCompletionметод пробуждения и удаления потока, ожидающего каждого из связанного спискаWaitNodeУзел, проснись здесь (удалить)WaitNodeПоток узла начинается с головы связанного списка, который мы также проанализировали ранее.

Еще одна вещь, которую следует отметить, это то, чтоawaitDoneВ методе есть бесконечный цикл.Когда поток, который получает асинхронную задачу, входит, он может войти в несколько условных ветвей несколько раз для выполнения различной бизнес-логики или может войти только в одну условную ветвь. Ниже приведены две возможные ситуации:

Дело 1: Когда приходит поток, который получает результат асинхронной задачи, асинхронная задача еще не выполнена.state=NEWИ когда нет настройки тайм-аута:

  1. первый цикл:В настоящее времяq = null, введите вышеуказанную кодовую метку в это время【1】Ветка суждения состоит в том, чтобы создать новую для текущего потока.WaitNodeузел;
  2. второй цикл:В настоящее времяqueued = false, введите вышеуказанную кодовую метку в это время【2】Судебная ветвь , которая будет вновь создана доWaitNodeУзел добавляется в список ожидания потока;
  3. третий цикл: Введите указанный выше код в это время.【3】Судебная ветвь , то есть блокировка текущего потока;
  4. четвертый цикл: Присоединяйтесь в это время к выполнению асинхронной задачи, введите указанную выше кодовую метку в это время.【5】Ветвь решения должна вернуть результат выполнения асинхронной задачи.

Случай 2: Когда приходит поток, который получает результат асинхронной задачи, асинхронная задача уже выполнена.state>COMPLETINGИ когда нет настройки тайм-аута, прямо введите указанную выше кодовую метку в это время.【5】Ветвь оценки должна напрямую возвращать результат выполнения асинхронной задачи, и нет необходимости присоединяться к списку ожидания потока.

4.5.2 Метод FutureTask.report

существуетgetВ методе, когда выполнение асинхронной задачи завершается, не имеет значения, завершается ли асинхронная задача нормально или аварийно, илиcancel, поток, который получает результат асинхронной задачи, в это время будет разбужен, поэтому он продолжит выполнениеFutureTask.reportМетод сообщает о выполнении асинхронной задачи, которая может вернуть результат или вызвать исключение.

// FutureTask.java

private V report(int s) throws ExecutionException {
    // 将异步任务执行结果赋值给x,此时FutureTask的成员变量outcome要么保存着
    // 异步任务正常执行的结果,要么保存着异步任务执行过程中抛出的异常
    Object x = outcome;
    // 【1】若异步任务正常执行结束,此时返回异步任务执行结果即可
    if (s == NORMAL)
        return (V)x;
    // 【2】若异步任务执行过程中,其他线程执行过cancel方法,此时抛出CancellationException异常
    if (s >= CANCELLED)
        throw new CancellationException();
    // 【3】若异步任务执行过程中,抛出异常,此时将该异常转换成ExecutionException后,重新抛出。
    throw new ExecutionException((Throwable)x);
}

4.6 Метод FutureTask.cancel отменяет выполнение задачи

Давайте посмотрим на последнийFutureTask.cancelметод, как только мы увидимFutureTask.cancelметод, мы, должно быть, наивно думали, что это метод, который может отменить выполнение асинхронных задач.Если мы так думаем, мы можем только сказать, что мы правы только наполовину.

// FutureTask.java

public boolean cancel(boolean mayInterruptIfRunning) {
    // 【1】判断当前任务状态,若state == NEW时根据mayInterruptIfRunning参数值给当前任务状态赋值为INTERRUPTING或CANCELLED
    // a)当任务状态不为NEW时,说明异步任务已经完成,或抛出异常,或已经被取消,此时直接返回false。
    // TODO 【问题】此时若state = COMPLETING呢?此时为何也直接返回false,而不能发出中断异步任务线程的中断信号呢??
    // TODO 仅仅因为COMPLETING是一个瞬时态吗???
    // b)当前仅当任务状态为NEW时,此时若mayInterruptIfRunning为true,此时任务状态赋值为INTERRUPTING;否则赋值为CANCELLED。
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        // 【2】如果mayInterruptIfRunning为true,此时中断执行异步任务的线程runner(还记得执行异步任务时就把执行异步任务的线程就赋值给了runner成员变量吗)
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    // 中断执行异步任务的线程runner
                    t.interrupt();
            } finally { // final state
                // 最后任务状态赋值为INTERRUPTED
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    // 【3】不管mayInterruptIfRunning为true还是false,此时都要调用finishCompletion方法唤醒阻塞的获取异步任务结果的线程并移除线程等待链表节点
    } finally {
        finishCompletion();
    }
    // 返回true
    return true;
}

В приведенном выше коде, когда статус асинхронной задачиstate != NEW, это означает, что асинхронная задача была выполнена нормально, завершилась аварийно или былаcancel, затем вернитесь напрямуюfalse; когда состояние асинхронной задачиstate = NEW, в это время согласноmayInterruptIfRunningПараметрtrueДелится на следующие два случая:

  1. когдаmayInterruptIfRunning = false, статус задачи на данный моментstateнепосредственно назначается какCANCELLED, в это время потоку, выполняющему асинхронную задачу, не будет выдан сигнал прерывания,примечательныйСоответствующее изменение состояния задачи здесьNEW -> CANCELLED.
  2. когдаmayInterruptIfRunning = trueКогда сигнал прерывания выдается потоку, выполняющему асинхронную задачу,примечательныйСоответствующее изменение состояния задачи здесьNEW -> INTERRUPTING -> INTERRUPTED.

В конце концов, неважноmayInterruptIfRunningзаtrueвсе ещеfalse, затем позвонитеfinishCompletionМетод пробуждает заблокированный поток, который получает результат асинхронной задачи, и удаляет поток, ожидающий узла связанного списка.

отFutureTask.cancelМы можем получить ответ из исходного кода, этот метод не может реально прервать поток, выполняющий асинхронную задачу, а может только послать сигнал прерывания потоку, выполняющему асинхронную задачу. Если поток, выполняющий асинхронную задачу, находится вsleep,waitилиjoinсостояние, оно броситInterruptedExceptionисключение, поток может быть прерван, кроме того, если асинхронной задаче необходимоwhileЕсли цикл выполняется, поток асинхронной задачи может быть завершен путем объединения следующего кода, то есть когда поток, выполняющий асинхронную задачу, прерывается, в это времяThread.currentThread().isInterrupted()возвращениеtrue, не удовлетвореныwhileТаким образом, условие цикла выходит из цикла, завершая поток выполнения асинхронной задачи со следующим кодом:

public Integer call() throws Exception {
    while (!Thread.currentThread().isInterrupted()) {
        // 业务逻辑代码
        System.out.println("running...");

    }
    return 666;
}

Уведомление: называетсяFutureTask.cancelметод, если возвращаемый результатtrue, если поток асинхронной задачи не может быть прерван, даже если поток асинхронной задачи выполняется нормально и возвращается результат выполнения, вызовитеFutureTask.getМетод также не может получить результат выполнения асинхронной задачи, он выкинетCancellationExceptionаномальный. Вы знаете, почему это так?

потому что звонюFutureTask.cancelметод, если возвращаемый результатtrue, статус задачи в это времяCANCELLEDилиINTERRUPTED, и обязательно реализуемfinishCompletionметод, в то время какfinishCompletionМетод разбудит поток, который получает результат асинхронной задачи, и будет ждать поток списка, а поток, который получит результат асинхронной задачи, проснется и найдет статусs >= CANCELLED, это броситCancellationExceptionИсключительный.

5 Резюме

Ну статья правильнаяFutureTaskЭто конец анализа исходного кода, давайте еще раз подведем итоги.FutureTaskЛогика реализации:

  1. мы достигаемCallableинтерфейс, переопределенныйcallБизнес-логика, которую необходимо выполнить, определяется в методе;
  2. Затем мы реализуемCallableОбъект реализации интерфейса передаетсяFutureTask,ПотомFutureTaskОтправляется в поток для выполнения как асинхронная задача;
  3. самое главное этоFutureTaskподдерживает состояние внутриstate, любая операция (будь то асинхронная задача завершается нормально или отменяется) вращается вокруг этого состояния и обновляется в любой моментstateстатус задачи;
  4. Только один поток может выполнять асинхронную задачу. Когда асинхронная задача выполняется, она может завершиться нормально, аварийно завершиться или быть отменена.
  5. Несколько потоков могут одновременно получать результат выполнения асинхронной задачи.Если асинхронная задача не была выполнена, поток, который получает асинхронную задачу в это время, присоединится к списку ожидания потока для ожидания;
  6. Когда выполнение потока асинхронной задачи завершено, поток, который получает результат выполнения асинхронной задачи, будет пробужден в это время.Обратите внимание, что последовательность пробуждения "последний вошел первым вышел", то есть заблокированный поток добавленные позже, будут разбужены первыми.
  7. когда мы звонимFutureTask.cancelметод на самом деле не останавливает поток, выполняющий асинхронную задачу, он просто сигнализирует потоку прерывания. Но покаcancelметод возвращаетtrue, даже если асинхронная задача может быть выполнена нормально, в это время мы вызываемgetМетод все равно будет бросать при получении результатаCancellationExceptionаномальный.

расширять: мы упоминали ранееFutureTaskизrunner,waitersиstateвсе используютvolatileМодификация ключевого слова указывает, что эти три переменные являются объектами (переменными-членами), совместно используемыми несколькими потоками, и будут управляться несколькими потоками.volatileИзменение ключевого слова для операции потокаvolatileПосле того, как значение переменной атрибута установлено, оно может быть видимым для других потоков во времени. На данный момент многопоточная операция переменных-членов используется толькоvolatileКлючевые слова по-прежнему имеют проблемы с безопасностью потоков, но в настоящее время г-н Дуг Леа не вводил никаких блокировок потоков, а принялUnsafeизCASметод замены операции блокировки для обеспечения безопасности потоков.

6 Анализ исходного кода FutureTask, что мы можем узнать?

Какова цель нашего анализа исходного кода? кроме как понятьFutureTaskВ дополнение к внутреннему принципу реализации фреймворка, нам также нужно учиться у различных навыков больших парней, чтобы писать исходный код фреймворка.Только таким образом мы можем расти.

АнализыFutureTaskИсходный код, мы можем узнать из него:

  1. использоватьLockSupportДля достижения механизма блокировки/пробуждения потока;
  2. использоватьvolatileиUNSAFEизCASметод обеспечения безблокировочной работы общих переменных потока;
  3. Чтобы написать логику исключения тайм-аута, вы можете обратиться кFutureTaskизget(long timeout, TimeUnit unit)логика реализации;
  4. Логическая реализация списка ожидания потока, когда ему нужно ждать, когда несколько потоков получат результат переменной-члена;
  5. Логическая реализация того, что асинхронная задача может выполняться только одним потоком в определенное время;
  6. FutureTaskстатус задачи вsatateЛогическая реализация обработки изменений.
  7. ...

Пункты, перечисленные выше, являются всеми местами, из которых мы можем учиться.

Если вы считаете, что это хорошо, пожалуйста, сделайте ретвит и безжалостно лайкните!

[Примечания к источнику] Адрес Github:

GitHub.com/Примечания к источнику/…