Многопоточность Java: запоздалое будущее

Java
Многопоточность Java: запоздалое будущее

Написав столько всего, я, наконец, добрался до будущего. Эта тема действительно актуальна для него. Мы уже много раз видели этот объект в пуле потоков раньше. В этой статье мы сосредоточимся на нем.

1. Что будет в будущем

1.3 Краткое введение в будущую задачу

эффект: future может использоваться для асинхронного получения результатов многопоточной задачи, Callable используется для генерации результатов, а Future используется для получения результатовобработать: Процесс похож на аплодисменты и ожидание еды Ожидание еды процесс трудоемкий, но это не мешает нам звонить

  • Когда Future отправляется, бизнес-обработка запущена в нескольких потоках, и Get получает данные из нескольких потоков.
  • Когда дело не обрабатывается Get, текущий поток будет заблокирован до завершения обработки дела, поэтому необходимо обратить внимание на постановку задач в будущем

Использование будущего имеет следующие эффекты:

  • 1 Запустите многопоточную задачу
  • 2 Займитесь другими делами
  • 3 Соберите результаты многопоточных задач

Соответствующий метод Future:

  • cancel(boolean) : Отменить операцию
  • get() : получить результаты
  • get(long,TimeUtil) : Получить в указанное время
  • isCancelled() : была ли задача отменена до завершения
  • готово() :определить есть ли результат

Роль интерфейса Future состоит в том, чтобы сначала сгенерировать объект Future, поместить конкретную операцию в объект future и, наконец, получить окончательный результат с помощью метода get объекта future.

1.2 Future Task

FutureTask представляет собой асинхронную операцию, которую можно отменить, обеспечивая полный процесс Future.

  • Он имеет методы для запуска и отмены операций, запроса завершения операции и получения результатов операции.
  • Результат можно получить только после завершения операции, если операция не завершена, метод get будет заблокирован.

Обратим внимание на следующие детали исходного кода:


// Node 1 : 实现了 RunnableFuture

// ------------------------------------

// Node 2 : 提供了7种状态
private static final int NEW          = 0; // 新建
private static final int COMPLETING   = 1; // 完成
private static final int NORMAL       = 2; // 正常
private static final int EXCEPTIONAL  = 3; // 异常
private static final int CANCELLED    = 4; // 取消
private static final int INTERRUPTING = 5; // 中断(中)
private static final int INTERRUPTED  = 6; // 打断

// 过程的流转 : 
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED

// ------------------------------------

// Node 3 : 内部属性
// 底层Callable 对象
private Callable<V> callable;
// 输出对象
private Object outcome;
// 运行线程
private volatile Thread runner;
// 等待线程的Treiber堆栈
private volatile WaitNode waiters;

// ------------------------------------

// Node 4 : 内部方法 
// 方法一 : 获取参数
V report(int s) // 为已完成的任务返回结果或抛出异常
    1. Object x = outcome;
    2. return (V)x;
    // 注意 : 状态为 CANCELLED 时会抛出异常 CancellationException

// 方法二 : 取消
public boolean cancel(boolean mayInterruptIfRunning)  // 取消
    1. UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        // 这里神奇的做了一个CAS操作, 判断当前的状态
    2. Thread t = runner; + t.interrupt();
        // 打断线程
    3. UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); 
        // CAS 方式修改状态

// 方法三 : get 类型
public V get() throws
public V get(long timeout, TimeUnit unit)
    1. if (s <= COMPLETING &&(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        // 核心代码就是关注是否完成


// 方法四 : run
Step 1 : CAS 操作状态
Step 2 : 准备一个 Callable<V> c = callable;
Step 3 : state == NEW 后 , result = c.call();
    // 此时阻塞等待
Step 4 : 设置结果 : set(result); 
Step 5 : 当然是修改状态啦
Over !!! 

// 方法五 : runAndReset
// 在不设置结果的情况下执行计算,然后将这个future重置为初始状态 (其实主要是结尾修改了状态)

核心 : return ran && s == NEW;


// 方法五 : finishCompletion
// 删除并通知所有等待的线程,调用done(),并使callable为空

// 2个for 循环保证执行
    for (WaitNode q; (q = waiters) != null;) {
            // CAS 保证操作的准确性
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        // 提供许可
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }



2. Будущее использование

Future на самом деле относительно прост в использовании, просто инициируйте ожидание, но учтите, что Future заблокирует основной поток.

public class FutureService extends AbstractService implements ApplicationRunner, Callable<String> {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    private static Long startTime;
    private static Long endTime;

    private String salt;
    private Integer sleepNum;

    public FutureService() {
    }

    public FutureService(String salt, Integer sleepNum) {
        this.salt = salt;
        this.sleepNum = sleepNum;
    }

    @Override
    public String call() throws Exception {

        logger.info("------> 业务逻辑开始执行 :{} <-------", salt);
        StringBuffer sb = new StringBuffer();
        for (int i = 0; i < sleepNum; i++) {
            sb.append(salt);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {

            }
        }
        endTime = System.currentTimeMillis();
        logger.info("------> {} - 业务执行完成 :{} <-------", salt, sb.toString());
        getTime(startTime, endTime);
        return sb.toString();
    }


    @Override
    public void run(ApplicationArguments args) throws Exception {
        logger.info("------> 创建一个初始连接池 <-------");
        ExecutorService executor = Executors.newFixedThreadPool(3);

        logger.info("------> 开始业务一 future - a :  <-------");
        FutureTask<String> future = new FutureTask<String>(new FutureService("a", 10));
        startTime = System.currentTimeMillis();
        executor.submit(future);
        logger.info("------> 业务一请求完毕!主线程执行 <-------");

        logger.info("------> 开始业务二 future - b :  <-------");
        FutureTask<String> future2 = new FutureTask<String>(new FutureService("b", 5));
        startTime = System.currentTimeMillis();
        executor.submit(future2);
        logger.info("------> 业务三请求完毕!主线程执行 <-------");

        logger.info("------> 开始业务三 future - c :  <-------");
        FutureTask<String> future3 = new FutureTask<String>(new FutureService("c", 3));
        startTime = System.currentTimeMillis();
        executor.submit(future3);
        logger.info("------> 业务三请求完毕!主线程执行 <-------");
        
        logger.info("------> future2 数据处理完成:{} <-------", future2.get());
        logger.info("------> 2-1 测试主线程是否阻塞 <-------");
        logger.info("------> future1 数据处理完成:{} <-------", future.get());
        logger.info("------> 1-3 测试主线程是否阻塞 <-------");
        logger.info("------> future3 数据处理完成:{} <-------", future3.get());
    }
}

19.839 [           main] this is run <-------
19.839 [           main] 开始业务一 future - a :  <-------
19.839 [           main] 业务一请求完毕!主线程执行 <-------
19.839 [           main] 开始业务二 future - b :  <-------
19.840 [           main] 业务三请求完毕!主线程执行 <-------
19.840 [           main] 开始业务三 future - c :  <-------
19.840 [           main] 业务三请求完毕!主线程执行 <-------
19.840 [pool-4-thread-2] 业务逻辑开始执行 :b <-------
19.840 [pool-4-thread-1] 业务逻辑开始执行 :a <-------
19.840 [pool-4-thread-3] 业务逻辑开始执行 :c <-------
22.843 [pool-4-thread-3] c - 业务执行完成 :ccc <-------
22.843 [pool-4-thread-3] time is :3.0 <-------
24.844 [pool-4-thread-2] b - 业务执行完成 :bbbbb <-------
24.844 [pool-4-thread-2] time is :5.0 <-------
24.844 [           main] future2 数据处理完成:bbbbb <-------
24.844 [           main] 2-1 测试主线程是否阻塞 <-------
29.847 [pool-4-thread-1] a - 业务执行完成 :aaaaaaaaaa <-------
29.847 [pool-4-thread-1] time is :10.0 <-------
29.847 [           main] future1 数据处理完成:aaaaaaaaaa <-------
29.847 [           main] 1-3 测试主线程是否阻塞 <-------
29.847 [           main] future3 数据处理完成:ccc <-------
    
// 流程 : 
// Main 线程中 , 哭有看到 abc 时顺序执行的 , 从 submit 开始 , 开始多线程执行 (所以顺序不再固定, 变成了 bac)
// 从多线程里面看 , c > b > a 执行完成
// 当 b 业务完成后 , 因为main 一直阻塞到 futurb.get 的阶段 , 所以B future 获取值 , main 线程遇到 a future 继续阻塞
// 当 a future get 完成后 ,  c 才能get
    
// 总结 : 
> future submit 多线程执行
> future get 会阻塞主线程等待 , 当 get 时 , 多线程才会把数据提供出来    
    

3. Будущие вопросы и ответы

Согласно наиболее распространенному использованию пула потоков, мы вернем объект Future при передаче executor.submit, а затем получим его через объект Future.

Во-первых: мы рассуждаем таким образом ExecutorService executor = Executors.newFixedThreadPool(1); Future future = executor.submit(A Callable Object);

Вопрос 1: На чем основан фундамент Future?

  • Шаг 1: При вызове через submit нижний слой вызовет:

return new FutureTask<T>(runnable, value);

  • Шаг 2: он будет повышен до родительского класса RunnableFuture во внешнем слое и будет повышен до Future, когда вернется.

RunnableFuture<T> ftask = newTaskFor(task, result);

Резюме: Таким образом, базовый класс реализации в основном можно рассматривать как FutureTask, а задачу можно фактически рассматривать как класс реализации Runnable.

Вопрос 2: Как работает Future?

Будущая операция в основном основана на run()

  1. Выполняется вызовом callable.call()
  2. Если вызов выполнен успешно, сохраните результат с помощью метода set и сохраните результат в результате;
  3. Если есть исключение при выполнении вызова, сохраните исключение через setException;

Вопрос 3: На чем основан будущий обратный вызов -- get(long,TimeUtil)?

Завершите вызов, позвонив в отчет(ы)

Вопрос 4: Как блокирует будущее?

Когда суждение не завершено, awaitDone будет вызываться для ожидания, а конкретная логика будет проанализирована позже.

if (s <= COMPLETING){
      s = awaitDone(false, 0L);
}

awaitDone в основном выполняет следующие действия:

  1. Если основной поток прерывается, генерируется исключение прерывания;
  2. Оцените текущее состояние FutureTask, если оно больше, чем COMPLETING, это означает, что задача была выполнена, а затем вернуться напрямую;
  3. Если текущее состояние равно COMPLETING, это означает, что задача была выполнена, в это время основному потоку нужно только отказаться от ресурсов процессора через метод yield и дождаться, пока состояние станет NORMAL;
  4. Инкапсулировать текущий поток через класс WaitNode и добавить его в список ожидающих через UNSAFE;
  5. Наконец, приостановите поток через парк LockSupport или parkNanos;

Вопрос 5: Как отменить будущее?

  • отменить (логическое значение mayInterruptIfRunning) в t.interrupt()
  • И UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); изменяет состояние

Вопрос 6: Как будущее оценивает отмену и результат isCancelled -- isDone ?

return state >= CANCELLED;

4. Будущее и вызываемое

Это нужно видеть из ExecutorService.submit(), ExecutorService имеет два основных метода отправки, будь то Callable или Runnable, это не сложно найти через возвращаемое значение, которое в итоге становится Future

<T> Future<T> submit(Callable<T> task);
	- RunnableFuture<T> ftask = newTaskFor(task);
		-  return new ForkJoinTask.AdaptedCallable<T>(callable);
	- execute(ftask);
        
<T> Future<T> submit(Runnable task, T result);
	- RunnableFuture<T> ftask = newTaskFor(task, result);
	- execute(ftask);
        
// 例如这里可以直接将 Callable 作为参数传进去 : 
Future<String> future = executor.submit(createCallable());
public Callable createCallable() {
        Callable<Module> call = new Callable<Module>() {
            public Module call() throws Exception {
                // .....
            }
        };
        return call;
    }
 

5. Производное использование ScheduledFutureTask

// ScheduledFutureTask 简介
• time:任务执行时间;
• period:任务周期执行间隔;
• sequenceNumber:自增的任务序号。
    
// 执行顺序 : 在等待队列里调度不再按照FIFO,而是按照执行时间,谁即将执行,谁就排在前面。
M- getDelay(TimeUnit unit)
M- int compareTo(Delayed other)  


// ScheduledFutureTask 主要在 ScheduledThreadPoolExecutor中


// Node 1 : ScheduledThreadPoolExecutor内部类 ScheduledFutureTask
private class ScheduledFutureTask<V>
            extends FutureTask<V> implements RunnableScheduledFuture<V> 
                        
    

ссылка на основной кодПул потоковэтот документ

Спасибо :

многопоточная коллекция

Исходный код Таро

Мертвая серия