[Параллельное программирование] Будущий режим и реализация в JDK

Java задняя часть

1.1 Что такое режим Future

Давайте сначала рассмотрим краткий пример. Когда мы обычно пишем функцию, операторы в функции выполняются синхронно, строка за строкой. Если строка выполняется очень медленно, программа должна дождаться завершения выполнения, прежде чем вернуть результат, но иногда мы можем не торопитесь, вам нужен результат выполнения одной из строк, и вы хотите, чтобы вызываемый объект вернулся немедленно. Например, Сяо Мин успешно создал учетную запись на определенном веб-сайте.После создания учетной записи будет отправлено уведомление по электронной почте.Если уведомление по электронной почте по какой-либо причине занимает много времени (учетная запись была успешно создана в это время), используется традиционный метод выполнения синхронизации.Нам нужно дождаться этого времени, прежде чем результат успешного создания будет возвращен во внешний интерфейс, но в это время, после успешного создания учетной записи, нам не нужно заботиться о том, электронная почта отправлена ​​успешно или нет. В настоящее время мы можем использовать режим «Будущее», чтобы Анна медленно обрабатывала его в фоновом режиме. Для этого запроса вызывающая сторона может сначала обработать некоторые другие задачи, а затем попытаться получить требуемую данные, когда данные действительно необходимы (например, когда вы хотите знать, успешно ли было отправлено электронное письмо или нет).

При использовании режима «Будущее» требуемые данные могут быть недоступны сразу после получения данных. Вместо этого сначала получите пакет, а затем перейдите к получению необходимых данных, когда они вам понадобятся.

1.2 Разница между режимом Future и традиционным режимом

Сначала посмотрите на диаграмму последовательности возврата запроса Очевидно, что традиционный режим выполняется синхронно в последовательном порядке и может ждать только при встрече с трудоемкими операциями. В отличие от режима Future, после запуска трудоемкой операции функция немедленно вернется, не блокируя клиентский поток. Следовательно, клиенту не нужно ждать при выполнении фактической трудоемкой операции, и он может делать другие вещи, пока результат не будет получен из рабочего потока, когда это необходимо.

2.1, практическая реализация простого режима будущего

Приведенный ниже класс DataFuture — это просто класс-оболочка, он создается без блокировки ожидания. Используйте метод setRealData для передачи данных после того, как рабочий поток подготовил данные. Клиенту нужно вызвать метод getRealData только тогда, когда ему действительно нужны данные.Если данные готовы в это время, они вернутся немедленно, в противном случае метод getRealData будет ждать завершения сбора данных.

public class DataFuture<T> {
    private T realData;
    private boolean isOK = false;

    public synchronized T getRealData() {
        while (!isOK) {
            try {
                // 数据未准备好则等待
                wait();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return realData;
    }
    
    public synchronized void setRealData(T data) {
        isOK = true;
        realData = data;
        notifyAll();
    }
}

Ниже реализуем сервер, клиент запрашивает данные на сервер, сервер сразу не идет загружать реальные данные, просто создаем DataFuture, создаем дочерний поток для загрузки реальных данных, сервер может возвращаться напрямую DataFuture.

public class Server {
    
    public DataFuture<String> getData() {
        final DataFuture<String> data = new DataFuture<>();
        Executors.newSingleThreadExecutor().execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                data.setRealData("最终数据");
            }
        });
        return data;
    }
}

Окончательный код вызова клиента выглядит следующим образом:

long start = System.currentTimeMillis();
Server server = new Server();
DataFuture<String> dataFuture = server.getData();

try {
    // 先执行其他操作
    Thread.sleep(5000);
    // 模拟耗时...
} catch (InterruptedException e) {
    e.printStackTrace();
}

System.out.print("结果数据:" + dataFuture.getRealData());
System.out.println("耗时: " + (System.currentTimeMillis() - start));

результат:

结果数据:最终数据
耗时: 5021

Для выполнения окончательных данных требуется около 5 секунд, а при последовательном выполнении — около 10 секунд.

2.2, Будущее и FutureTask в JDK

Давайте посмотрим на исходный код интерфейса Future:

public interface Future<V> {

    /**
     * 用来取消任务,取消成功则返回true,取消失败则返回false。
     * mayInterruptIfRunning参数表示是否允许取消正在执行却没有执行完毕的任务,设为true,则表示可以取消正在执行过程中的任务。
     * 如果任务已完成,则无论mayInterruptIfRunning为true还是false,此方法都返回false,即如果取消已经完成的任务会返回false;
     * 如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;
     * 如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。
     */
    boolean cancel(boolean mayInterruptIfRunning);

    /**
     * 表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回true
     */
    boolean isCancelled();

    /**
     * 表示任务是否已经完成,若任务完成,则返回true
     */
    boolean isDone();

    /**
     * 获取执行结果,如果最终结果还没得出该方法会产生阻塞,直到任务执行完毕返回结果
     */
    V get() throws InterruptedException, ExecutionException;

    /**
     * 获取执行结果,如果在指定时间内,还没获取到结果,则抛出TimeoutException
     */
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

Из приведенного выше исходного кода видно, что Future должен запрашивать выполнение задач Runnable или Callable, прерывать задачи и получать результаты. Давайте возьмем пример расчета суммы от 1 до 100 миллионов, чтобы увидеть, сколько времени тратится с использованием традиционного метода и использования Future. Давайте сначала посмотрим на традиционный код:

public class FutureTest {

    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        List<Integer> retList = new ArrayList<>();

        // 计算1000次1至1亿的和
        for (int i = 0; i < 1000; i++) {
            retList.add(Calc.cal(100000000));
        }
        System.out.println("耗时: " + (System.currentTimeMillis() - start));

        for (int i = 0; i < 1000; i++) {
            try {
                Integer result = retList.get(i);
                System.out.println("第" + i + "个结果: " + result);
            } catch (Exception e) {
            }
        }
        System.out.println("耗时: " + (System.currentTimeMillis() - start));
    }

    public static class Calc implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            return cal(10000);
        }

        public static int cal (int num) {
            int sum = 0;
            for (int i = 0; i < num; i++) {
                sum += i;
            }
            return sum;
        }
    }
}

Результат выполнения (занимает 40+ секунд):

耗时: 43659
第0个结果: 887459712
第1个结果: 887459712
第2个结果: 887459712
...
第999个结果: 887459712
耗时: 43688

Давайте посмотрим на программу, используя режим Future:

public class FutureTest {

    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        ExecutorService executorService = Executors.newCachedThreadPool();
        List<Future<Integer>> futureList = new ArrayList<>();

        // 计算1000次1至1亿的和
        for (int i = 0; i < 1000; i++) {
            // 调度执行
            futureList.add(executorService.submit(new Calc()));
        }
        System.out.println("耗时: " + (System.currentTimeMillis() - start));

        for (int i = 0; i < 1000; i++) {
            try {
                Integer result = futureList.get(i).get();
                System.out.println("第" + i + "个结果: " + result);
            } catch (InterruptedException | ExecutionException e) {
            }
        }
        System.out.println("耗时: " + (System.currentTimeMillis() - start));
    }

    public static class Calc implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            return cal(100000000);
        }

        public static int cal (int num) {
            int sum = 0;
            for (int i = 0; i < num; i++) {
                sum += i;
            }
            return sum;
        }
    }
}

Результат выполнения (занимает 12+ секунд):

耗时: 12058
第0个结果: 887459712
第1个结果: 887459712
...
第999个结果: 887459712
耗时: 12405

Можно видеть, что при вычислении суммы от 1000 до 100 миллионов раз при одновременном использовании режима Future конечное время выполняется примерно на 30 секунд быстрее, чем при использовании традиционного метода, а эффективность использования режима Future значительно повышается.

2.3. Будущая задача

После разговора о Future, у Future есть следующая FutureTask, потому что это интерфейс и его нельзя использовать для непосредственного создания объектов. Первый взгляд на реализацию FutureTask:

public class FutureTask<V> implements RunnableFuture<V>

Вы можете увидеть, что класс FutureTask RunnableFuture реализует интерфейс, а затем посмотрите Источник интерфейса RunnableFuture:

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

Видно, что интерфейс RunnableFuture наследует интерфейс Runnable и интерфейс Future, а это означает, что FutureTask может выполняться потоком как Runnable, а также может использоваться как Future для получения возвращаемого значения Callable.

Глядя на два конструктора FutureTask ниже, видно, что он подготовлен для этих двух операций.

public FutureTask(Callable<V> var1) {
    if (var1 == null) {
        throw new NullPointerException();
    } else {
        this.callable = var1;
        this.state = 0;
    }
}

public FutureTask(Runnable var1, V var2) {
    this.callable = Executors.callable(var1, var2);
    this.state = 0;
}

Примеры использования FutureTask:

public class FutureTest {

    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        Calc task = new Calc();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        executor.submit(futureTask);
        executor.shutdown();
    }

    public static class Calc implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            return cal(100000000);
        }

        public static int cal (int num) {
            int sum = 0;
            for (int i = 0; i < num; i++) {
                sum += i;
            }
            return sum;
        }
    }
}

2.4, недостатки будущего

Из вышеприведенного примера видно, что эффективность использования режима Future значительно повышается по сравнению с традиционным режимом.Использование режима Future позволяет в определенной степени выполнять задачи в пуле потоков асинхронно, но в то же время , есть и очевидный недостаток: callback нельзя поместить в поток, отличный от задачи.Выполнение, самая большая проблема с традиционными callbacks в том, что поток управления нельзя разделить на разные обработчики событий. Например, если основной поток должен дождаться результатов, возвращаемых каждым потоком асинхронного выполнения, чтобы выполнить следующую операцию, он должен заблокировать в методе future.get() ожидание возврата результата. на самом деле синхронно снова. , ситуация была еще хуже.

В Java 8 был введен новый класс реализации CompletableFuture, чтобы компенсировать указанные выше недостатки, использование CompletableFuture будет объяснено в следующей главе.