Чертовы уроки — как правильно использовать методы отправки и выполнения пула потоков

Java

Предыстория кровавого урока: используйте пул потоков для переноса стоковых данных, но всегда происходит сбой переноса пакета данных, ненормальная печать журнала

причина убийства

слышалparallelStreamПараллельные потоки — это хорошо из-за ежедневной разработки.streamСценариев последовательных потоков много.В этот раз нужно написать программу миграции и ее можно использовать.Так почему бы вам не использовать ее для установки? остроумие, я знаюНа фоне JVM для выполнения вышеуказанных функций используется общий пул fork/join, который используется всеми параллельными потоками.По умолчанию пул fork/join выделяет поток для каждого процессора., соответствующий обходной путь — создать собственный пул потоков, например

ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
pool.submit(() -> {
            list.parallelStream().collect(Collectors.toList());
        });

Значит, мины были заложены отсюда.

представить или выполнить

  public static void main(String[] args) throws InterruptedException, ExecutionException {
        final ExecutorService pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
        List<Integer> list = Lists.newArrayList(1, 2, 3, null);
        //1.使用submit
        pool.submit(() -> {
            list.parallelStream().map(a -> a.toString()).collect(Collectors.toList());
        });
        TimeUnit.SECONDS.sleep(3);
        //2.使用 execute
        pool.execute(() -> {
            list.parallelStream().map(a -> a.toString()).collect(Collectors.toList());
        });
        //3.使用submit,调用get()
        pool.submit(() -> {
            list.parallelStream().map(a -> a.toString()).collect(Collectors.toList());
        }).get();
        TimeUnit.SECONDS.sleep(3);
    }

Читатели запускают приведенные выше варианты использования и обнаружат, что они используются отдельно.submitметод не выводит журнал ошибок, а используетexecuteМетод распечатывает журнал ошибок, ноsubmitвозвращениеFutureJoinTaskперечислитьget()метод, который выдает исключение. Так правда вскрывается, некоторые данные в пакетах имеют грязные данные, что является нулевым значением.При переходе к нулевому значению возникает исключение, но журнал исключений находится вsubmitМетод пойман и не распечатан (душевная боль), а пойманное исключение завернуто в возвращенный класс результатаFutureJoinTask, и больше не бросали.

Если вам не нужно возвращать результат асинхронно, не используйтеsubmitметод

​ Вывод первый, ошибка, которую я сделал, заключается в том, чтобы мыслить ясноsubmitиexecuteРазница только в том, что возвращает асинхронные результаты, и в том, что не возвращает одношаговые результаты, но правда жестока.существуетsubmit()Логика в середине должна включать перехват исключения, выданного асинхронной задачей, и исключение не выбрасывается снова из-за неправильного использования метода.

Теперь задайте вопрос,ForkJoinPool#submit()вернулся вForkJoinTaskРезультат асинхронной задачи можно получить.Теперь, когда эта асинхронная задача выдает исключение, каков будет результат нашей попытки получить задачу? мы смотрим прямоForkJoinTask#get()исходный код.

public final V get() throws InterruptedException, ExecutionException {
    int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
        doJoin() : externalInterruptibleAwaitDone();
    Throwable ex;
    if ((s &= DONE_MASK) == CANCELLED)
        throw new CancellationException();
    //这里可以直接看到,异步任务出现异常会在调用get()获取结果的时候,会被包装成ExecutionException再次抛出
    if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
        throw new ExecutionException(ex);
    return getRawResult();
}

Когда в асинхронной задаче возникает исключение, при вызове get() для получения результата оно будет упаковано какExecutionExceptionВыдает снова, но где ловится исключение? Тоже самое, ветки всех веток надо переписатьThread#run()метод, размещенный вForkJoinPoolНить будет обернута какForkJoinWorkerThread, так что смотримForkJoinWorkerThread#run()реализация.

public void run() {
    if (workQueue.array == null) { // only run once
        Throwable exception = null;
        try {
            onStart();
            pool.runWorker(workQueue);
        } catch (Throwable ex) {
            //出现异常,捕获,再次抛出会在调用ForkJoinTask#get()的时候
            exception = ex;
        } finally {
            try {
                onTermination(exception);
            } catch (Throwable ex) {
                if (exception == null)
                    exception = ex;
            } finally {
                pool.deregisterWorker(this, exception);
            }
        }
    }
}

Приведенный выше анализ основан наForkJoinPoolДа, не все пулы потоковsubmitиexecuteРеализация метода аналогична этому, наш часто используемый пул потоковThreadPoolThreadКак бы выглядела реализация, та же идея, нам нужнанайти доставку вThreadPoolThreadАсинхронная задача, которая, наконец, обернута какThreadПодкласс или реализацияjava.lang.Runnable#run, ответjava.util.concurrent.FutureTask

 public void run() {
      ...
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    //捕获异常
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } 
     ....
    }

Суммировать

java.util.concurrent.ExecutorService#submit(java.lang.Runnable)Почему у пула потоков есть такая настройка?На самом деле наше мышление не должно ограничиваться пулом потоков, а помещаться вПолучить результат асинхронной задачи, независимо от того, является ли исключение также асинхронным результатом,FutureTaskВ качестве реализации класса инструментов параллелизма, предоставляемого JDK, был дан хороший ответ, а именноПолучить результат асинхронной задачи. Исключения также являются асинхронными результатами. Если в асинхронной задаче возникает исключение времени выполнения, оно будет переупаковано и выдано при получении результата задачи.

​ Автор: пожалуйста, называйте меня красным шарфом

Источник:nuggets.capable/post/684490…

Этот блог приветствуется для перепечатки, но это заявление должно быть сохранено без согласия автора, а ссылка на исходный текст дана в явном месте на странице статьи, в противном случае сохраняется право привлечения к юридической ответственности. Кодировать слова непросто, ваши лайки — самая большая мотивация для моего письма.