Предыстория кровавого урока: используйте пул потоков для переноса стоковых данных, но всегда происходит сбой переноса пакета данных, ненормальная печать журнала
причина убийства
слышалparallelStream
Параллельные потоки — это хорошо из-за ежедневной разработки.stream
Сценариев последовательных потоков много.В этот раз нужно написать программу миграции и ее можно использовать.Так почему бы вам не использовать ее для установки? остроумие, я знаюНа фоне JVM для выполнения вышеуказанных функций используется общий пул fork/join, который используется всеми параллельными потоками.По умолчанию пул fork/join выделяет поток для каждого процессора., соответствующий обходной путь — создать собственный пул потоков, например
ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
pool.submit(() -> {
list.parallelStream().collect(Collectors.toList());
});
Значит, мины были заложены отсюда.
представить или выполнить
public static void main(String[] args) throws InterruptedException, ExecutionException {
final ExecutorService pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
List<Integer> list = Lists.newArrayList(1, 2, 3, null);
//1.使用submit
pool.submit(() -> {
list.parallelStream().map(a -> a.toString()).collect(Collectors.toList());
});
TimeUnit.SECONDS.sleep(3);
//2.使用 execute
pool.execute(() -> {
list.parallelStream().map(a -> a.toString()).collect(Collectors.toList());
});
//3.使用submit,调用get()
pool.submit(() -> {
list.parallelStream().map(a -> a.toString()).collect(Collectors.toList());
}).get();
TimeUnit.SECONDS.sleep(3);
}
Читатели запускают приведенные выше варианты использования и обнаружат, что они используются отдельно.submit
метод не выводит журнал ошибок, а используетexecute
Метод распечатывает журнал ошибок, ноsubmit
возвращениеFutureJoinTask
перечислитьget()
метод, который выдает исключение. Так правда вскрывается, некоторые данные в пакетах имеют грязные данные, что является нулевым значением.При переходе к нулевому значению возникает исключение, но журнал исключений находится вsubmit
Метод пойман и не распечатан (душевная боль), а пойманное исключение завернуто в возвращенный класс результатаFutureJoinTask
, и больше не бросали.
Если вам не нужно возвращать результат асинхронно, не используйтеsubmit
метод
Вывод первый, ошибка, которую я сделал, заключается в том, чтобы мыслить ясноsubmit
иexecute
Разница только в том, что возвращает асинхронные результаты, и в том, что не возвращает одношаговые результаты, но правда жестока.существуетsubmit()
Логика в середине должна включать перехват исключения, выданного асинхронной задачей, и исключение не выбрасывается снова из-за неправильного использования метода.
Теперь задайте вопрос,ForkJoinPool#submit()
вернулся вForkJoinTask
Результат асинхронной задачи можно получить.Теперь, когда эта асинхронная задача выдает исключение, каков будет результат нашей попытки получить задачу? мы смотрим прямоForkJoinTask#get()
исходный код.
public final V get() throws InterruptedException, ExecutionException {
int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
doJoin() : externalInterruptibleAwaitDone();
Throwable ex;
if ((s &= DONE_MASK) == CANCELLED)
throw new CancellationException();
//这里可以直接看到,异步任务出现异常会在调用get()获取结果的时候,会被包装成ExecutionException再次抛出
if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
throw new ExecutionException(ex);
return getRawResult();
}
Когда в асинхронной задаче возникает исключение, при вызове get() для получения результата оно будет упаковано какExecutionException
Выдает снова, но где ловится исключение? Тоже самое, ветки всех веток надо переписатьThread#run()
метод, размещенный вForkJoinPool
Нить будет обернута какForkJoinWorkerThread
, так что смотримForkJoinWorkerThread#run()
реализация.
public void run() {
if (workQueue.array == null) { // only run once
Throwable exception = null;
try {
onStart();
pool.runWorker(workQueue);
} catch (Throwable ex) {
//出现异常,捕获,再次抛出会在调用ForkJoinTask#get()的时候
exception = ex;
} finally {
try {
onTermination(exception);
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
pool.deregisterWorker(this, exception);
}
}
}
}
Приведенный выше анализ основан наForkJoinPool
Да, не все пулы потоковsubmit
иexecute
Реализация метода аналогична этому, наш часто используемый пул потоковThreadPoolThread
Как бы выглядела реализация, та же идея, нам нужнанайти доставку вThreadPoolThread
Асинхронная задача, которая, наконец, обернута какThread
Подкласс или реализацияjava.lang.Runnable#run
, ответjava.util.concurrent.FutureTask
public void run() {
...
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
//捕获异常
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
}
....
}
Суммировать
java.util.concurrent.ExecutorService#submit(java.lang.Runnable)
Почему у пула потоков есть такая настройка?На самом деле наше мышление не должно ограничиваться пулом потоков, а помещаться вПолучить результат асинхронной задачи, независимо от того, является ли исключение также асинхронным результатом,FutureTask
В качестве реализации класса инструментов параллелизма, предоставляемого JDK, был дан хороший ответ, а именноПолучить результат асинхронной задачи. Исключения также являются асинхронными результатами. Если в асинхронной задаче возникает исключение времени выполнения, оно будет переупаковано и выдано при получении результата задачи.
Автор: пожалуйста, называйте меня красным шарфом
Источник:nuggets.capable/post/684490…
Этот блог приветствуется для перепечатки, но это заявление должно быть сохранено без согласия автора, а ссылка на исходный текст дана в явном месте на странице статьи, в противном случае сохраняется право привлечения к юридической ответственности. Кодировать слова непросто, ваши лайки — самая большая мотивация для моего письма.