введение
В повседневной работе по разработке пулы потоков часто несут наиболее важную бизнес-логику в приложении, поэтому нам необходимо уделять больше внимания выполнению пулов потоков, включая обработку и анализ исключений. В этой статье основное внимание уделяется тому, как правильно использовать пулы потоков, и приводятся некоторые практические советы. Эта статья затронет некоторые знания о принципе реализации пулов потоков, но не будет слишком расширяться. В Интернете есть много статей о принципе работы пулов потоков и анализе исходного кода, заинтересованные студенты могут обратиться к ним самостоятельно.
Обработка исключений для пулов потоков
UncaughtExceptionHandler
Все мы знаем, что методу run в интерфейсе Runnable не разрешено генерировать исключения, поэтому основной поток, производный от потока, может быть не в состоянии напрямую получить информацию об исключении во время выполнения потока. Например:
public static void main(String[] args) throws Exception {
Thread thread = new Thread(() -> {
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
System.out.println(1 / 0); // 这行会导致报错!
});
thread.setUncaughtExceptionHandler((t, e) -> {
e.printStackTrace(); //如果你把这一行注释掉,这个程序将不会抛出任何异常.
});
thread.start();
}
Почему это так? На самом деле, если мы посмотрим на исходный код в Thread, мы обнаружим, что если Thread встречает исключение во время выполнения, он сначала определяет, установил ли текущий поток UncaughtExceptionHandler, Если нет, то оно будет получено из ThreadGroup, где находится поток. расположен. (**Примечание:** Каждый поток имеет свою собственную группу ThreadGroup, даже если вы ее не укажете, и реализует интерфейс UncaughtExceptionHandler.) Давайте посмотрим на реализацию интерфейса UncaughtExceptionHandler по умолчанию в группе ThreadGroup:
public void uncaughtException(Thread t, Throwable e) {
if (parent != null) {
parent.uncaughtException(t, e);
} else {
Thread.UncaughtExceptionHandler ueh =
Thread.getDefaultUncaughtExceptionHandler();
if (ueh != null) {
ueh.uncaughtException(t, e);
} else if (!(e instanceof ThreadDeath)) {
System.err.print("Exception in thread \""
+ t.getName() + "\" ");
e.printStackTrace(System.err);
}
}
}
Если у вас есть этот родительский ThreadGroup ThreadGroup, вызывается uncaughtException родительского ThreadGroup или вызывается глобальное значение по умолчаниюThread.DefaultUncaughtExceptionHandler
, если глобальный обработчик не установлен, он просто находит информацию об исключении в System.err, поэтому мы должны реализовать его интерфейс UncaughtExceptionHandler при создании потока, что может упростить вам поиск и устранение проблемы.
Отправка задач в пул потоков через выполнение
Возвращаясь к теме пулов потоков, если мы не будем пытаться... перехватывать исключения в задачах, которые мы отправляем в пулы потоков, и возникает исключение при выполнении, как это повлияет на пул потоков? Ответ - никакого эффекта, пул потоков еще может нормально работать, но исключение проглатывается. Обычно это не очень хорошо, потому что нам нужно получить исходный объект исключения для анализа проблемы.
Итак, как я могу получить исходный объект исключения? Начнем изучать эту проблему с исходного кода пула потоков. Конечно, статей по анализу исходного кода пулов потоков в Интернете много, но из-за нехватки места наиболее актуальная часть кода приведена непосредственно здесь:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
Этот метод представляет собой код, который фактически выполняет задачи, отправленные в пул потоков. Здесь мы опустим не относящуюся к делу логику и сосредоточимся на логике строк с 19 по 32, из которых строка 23 фактически начинает выполнение задачи, отправленной в пул потоков, так что же делает строка 20? На самом деле, перед выполнением задачи, отправленной в пул потоков, можно выполнить некоторую предварительную работу.Точно так же мы видим строку 31, которая должна выполнить некоторую последующую работу после выполнения отправленной задачи. Давайте проигнорируем метод beforeExecute и сосредоточимся на методе afterExecute. Мы видим, что в процессе выполнения задачи, как только возникнет исключение любого типа, оно будет отправлено в метод afterExecute.Однако, глядя на исходный код пула потоков, мы можем обнаружить, что afterExecute по умолчанию является пустая реализация, поэтому нам необходимо наследовать ThreadPoolExecutor для реализации этого метода afterExecute. (Глядя на исходный код, мы можем обнаружить, что этот метод afterExecute имеет защищенный тип. Из официальных комментариев мы также можем видеть, что этот метод рекомендуется для реализации в подклассах.) Конечно, этот метод нельзя реализовать по своему желанию. , и необходимо выполнить определенные шаги.Также упоминаются официальные примечания, вот выдержки следующим образом:
* <pre> {@code
* class ExtendedExecutor extends ThreadPoolExecutor {
* // ...
* protected void afterExecute(Runnable r, Throwable t) {
* super.afterExecute(r, t);
* if (t == null && r instanceof Future<?>) {
* try {
* Object result = ((Future<?>) r).get();
* } catch (CancellationException ce) {
* t = ce;
* } catch (ExecutionException ee) {
* t = ee.getCause();
* } catch (InterruptedException ie) {
* Thread.currentThread().interrupt(); // ignore/reset
* }
* }
* if (t != null)
* System.out.println(t);
* }
* }}</pre>
Таким образом, исключение, которое могло быть проглочено пулом потоков, может быть успешно перехвачено, что удобно для устранения неполадок.
Но здесь еще небольшая проблема, мы заметили, что в методе RunWorker, выполнениеtask.run();
После оператора выбрасываются различные типы исключений, так куда же деваются эти выброшенные исключения? Фактически, объект исключения здесь в конечном итоге будет передан в метод dispatchUncaughtException класса Thread.Исходный код выглядит следующим образом:
private void dispatchUncaughtException(Throwable e) {
getUncaughtExceptionHandler().uncaughtException(this, e);
}
Вы можете видеть, что он получит класс реализации UncaughtExceptionHandler, а затем вызовет в нем метод uncaughtException, который вернется к конкретной логике, реализованной UncaughtExceptionHandler, проанализированной в предыдущем разделе. Тогда для получения максимально примитивного объекта исключения, помимо реализации интерфейса UncaughtExceptionHandler, можно также рассмотреть возможность реализации метода afterExecute.
Отправка задач в пул потоков через submit
Это также очень просто, давайте сначала вернемся к исходному коду метода отправки:
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
Метод выполнения здесь вызывает метод выполнения в ThreadPoolExecutor, а логика выполнения такая же, как и при отправке задач в пул потоков через выполнение. Давайте сосредоточимся здесь на методе newTaskFor, исходный код которого выглядит следующим образом:
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
Вы можете видеть, что отправленный объект Callable инкапсулирован с помощью FutureTask. Затем мы знаем, что в конечном итоге он будет выполнен в указанном выше методе runWorker, а основная логика выполнения такова:task.run();
эта строка кода. Мы знаем, что задача здесь на самом деле относится к типу FutureTask, поэтому нам необходимо взглянуть на реализацию метода run в FutureTask:
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
Вы можете видеть, что наиболее важный код, связанный с исключением, находится в строке 17, т.е.setException(ex);
это место. Давайте посмотрим на реализацию этого места:
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
Ключевым моментом здесь является присвоение объекта исключения результату. Результат является переменной-членом в FutureTask. После того, как мы получили объект Future, вызвав метод отправки, мы вызываем его метод получения. Основным методом является метод отчета. код каждого метода приведен ниже:
Первый — это метод get:
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
Вы можете видеть, что метод отчета, наконец, вызывается, и его исходный код выглядит следующим образом:
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
Вышеуказанные решения о состоянии статуса. Если текущая задача не выполняется нормально или не отменяется, то X вот фактически является исходным объектом исключения, который будет обернутым исполнением ExecutionException. Следовательно, когда вы вызываете метод получения, может быть брошен файл ExecutionException, а затем вы можете получить наиболее примитивный объект исключения, вызывая его метод getCause.
Подводя итог, можно сказать, что существует два основных способа решения проблемы, связанной с тем, что для задач, отправленных в пул потоков, может быть выдано исключение:
- Попробуйте... поймать отправленные задачи самостоятельно, но здесь есть одна плохая вещь: если вы отправляете несколько типов задач в пул потоков, каждый тип задачи должен попытаться... поймать исключение само по себе Live, больше сложный. и если вы просто
catch(Exception e)
, некоторые исключения, включая типы ошибок, могут быть пропущены, поэтому на всякий случай вы можете рассмотретьcatch(Throwable t)
. - Реализуйте метод afterExecute пула потоков самостоятельно или реализуйте интерфейс UncaughtExceptionHandler класса Thread.
Вот пример моего личного создания пула потоков для справки:
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(DEFAULT_QUEUE_SIZE);
statisticsThreadPool = new ThreadPoolExecutor(DEFAULT_CORE_POOL_SIZE, DEFAULT_MAX_POOL_SIZE,
60, TimeUnit.SECONDS, queue, new ThreadFactoryBuilder()
.setThreadFactory(new ThreadFactory() {
private int count = 0;
private String prefix = "StatisticsTask";
@Override
public Thread newThread(Runnable r) {
return new Thread(r, prefix + "-" + count++);
}
}).setUncaughtExceptionHandler((t, e) -> {
String threadName = t.getName();
logger.error("statisticsThreadPool error occurred! threadName: {}, error msg: {}", threadName, e.getMessage(), e);
}).build(), (r, executor) -> {
if (!executor.isShutdown()) {
logger.warn("statisticsThreadPool is too busy! waiting to insert task to queue! ");
Uninterruptibles.putUninterruptibly(executor.getQueue(), r);
}
}) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceof Future<?>) {
try {
Future<?> future = (Future<?>) r;
future.get();
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset
}
}
if (t != null) {
logger.error("statisticsThreadPool error msg: {}", t.getMessage(), t);
}
}
};
statisticsThreadPool.prestartAllCoreThreads();
Настройка количества потоков
Мы знаем, что в целом есть два типа задач: CPU-интенсивный и IO-интенсивный. Затем, перед лицом процессорных задач, количество потоков не должно быть слишком большим. Как правило, это разумное значение для выбора числа ядер CPU + 1 или 2 раза больше номеров ядер. Следовательно, мы можем рассмотреть возможность установки CorePoolsize на количество CPU Cores + 1, а MaxPoolsize до 2 раз больше номеров ядер. Точно так же, когда столкнулся с интенсивными задачами, мы можем рассмотреть возможность умножения количества ядер в 4 раза в качестве количества основных потоков, а затем умножение количества ядер в 5 раз в качестве максимального количества потоков для установки количества потоков , Этот параметр будет быстрее, чем было бы более разумно установить значение непосредственно путем хлопания головы.
Конечно, общее количество нитей не должно быть слишком много, и разумно контролировать его в течение 100 потоков. В противном случае слишком много потоков могут привести к частому переключению контекста, что приводит к производительности системы не так хороши, как раньше.
Как правильно закрыть пул потоков
Когда дело доходит до того, как правильно закрыть пул потоков, здесь также делается небольшой акцент. Чтобы достичь цели изящного завершения работы, мы должны сначала вызвать метод завершения работы.Вызов этого метода означает, что пул потоков не будет получать никаких новых задач, но задачи, которые были отправлены, будут продолжать выполняться, в том числе в очередь. Поэтому вам также следует вызвать метод awaitTermination.Этот метод может установить максимальный период ожидания пула потоков перед закрытием.Если пул потоков может быть нормально закрыт до истечения периода ожидания, этот метод вернет значение true, в противном случае по истечении времени ожидания период истекает, он вернет true, вернет false. Вообще говоря, мы не можем ждать бесконечно, поэтому нам нужно заранее оценить разумный тайм-аут, а затем использовать этот метод.
Если метод awaitTermination возвращает false, и вы хотите выполнить другую работу по утилизации ресурсов после того, как пул потоков будет максимально закрыт, вы можете рассмотреть возможность повторного вызова метода shutdownNow.В это время все задачи, которые не были обработаны в очереди будет отброшен, и поток будет установлен в то же время Бит флага прерывания для каждого потока в пуле. shutdownNow не гарантирует, что запущенный поток перестанет работать, если задача, представленная потоку, не отреагирует должным образом на прерывание. На этом этапе вы можете рассмотреть возможность продолжения вызова метода awaitTermination или просто сдаться и сделать следующее.
Другие полезные методы в пулах потоков
Вы могли заметить, что когда я создавал пул потоков, я также вызывал этот метод:prestartAllCoreThreads.这个方法有什么作用呢?我们知道一个线程池创建出来之后,在没有给它提交任何任务之前,这个线程池中的线程数为0。有时候我们事先知道会有很多任务会提交给这个线程池,但是等它一个个去创建新线程开销太大,影响系统性能,因此可以考虑在创建线程池的时候就将所有的核心线程全部一次性创建完毕,这样系统起来之后就可以直接使用了。
На самом деле пул потоков также предоставляет некоторые другие интересные методы. Например, теперь мы представляем себе сценарий: когда пул потоков находится под высокой нагрузкой и вот-вот взорвется, срабатывает политика отклонения. Есть ли способ облегчить эту проблему? На самом деле они есть, потому что пул потоков предоставляет методы для установки количества основных потоков и максимального количества потоков, которыеметод setCorePoolSizeиМетод setMaximumPoolSize. Да, количество потоков также можно изменить после создания пула потоков! ** Поэтому в условиях высокой нагрузки на пул потоков мы можем справиться с этим так:
- Запустите регулярный поток опроса (типа опекуна) и регулярно определяйте количество потоков в пуле потоков. В частности, вызовите метод getActiveCount.
- Когда они обнаружат, что количество потоков превышает количество потоков ядра, учитывайте значения CorePoolSize и MaximumPoolSize, умноженные на 2 одновременно, конечно, не рекомендуется устанавливать большое количество потоков, поскольку поток не лучше, вы можете рассмотрите возможность установки верхнего предела, например 50, 100 и т.п.
- В то же время, чтобы получить количество задач в очереди, в частности, вызовите метод getQueue, а затем вызовите метод размера. Когда количество задач в очереди меньше половины размера очереди, мы можем думать, что нагрузка на пул потоков сейчас не так высока, поэтому мы можем рассмотреть возможность восстановления CorePoolSize и MaximumPoolSize обратно, когда пул потоков был расширен перед. , который делится на 2
В частности, как показано ниже:
Это один из способов, которым я лично рекомендую использовать пулы потоков.
Является ли пул потоков лучшим решением?
Пулы потоков не всегда являются наиболее эффективным решением. Если это сценарий, который преследует экстремальную производительность, вы можете рассмотреть возможность использованияDisruptor, которая является высокопроизводительной очередью. За исключением Disruptor, будет ли лучшее решение, основанное исключительно на JDK? Ответ положительный.
Мы знаем, что в пуле потоков несколько потоков делят одну очередь, поэтому в случае большого количества задач требуется частое чтение и запись этой очереди, а для предотвращения конфликтов необходимы блокировки. На самом деле, читая исходный код пула потоков, можно обнаружить, что он переполнен различными кодами блокировки, есть ли лучший способ реализовать это? Фактически, мы можем рассмотреть возможность создания списка пулов однопоточных потоков, каждый из которых использует ограниченную очередь для достижения многопоточности. Преимущество этого заключается в том, что очереди в каждом пуле потоков будут обрабатываться только одним потоком, поэтому проблемы конкуренции нет.
На самом деле, эта идея использования пространства для времени натягивает механизм внедрения EventLoop в Netty. Просто подумайте, если производительность нитей пул действительно настолько хороша, почему бы не использовать Netty?
Другие места, на которые стоит обратить внимание
- Ни при каких обстоятельствах не следует использовать масштабируемые пулы потоков (создание и уничтожение потоков дорого обходится).
- Неограниченные очереди не должны использоваться ни при каких обстоятельствах, за исключением одиночных тестов (обычно используются ограниченные очереди ArrayBlockingQueue и LinkedBlockingQueue, первая реализована на основе массивов, а вторая на основе связанных списков. С точки зрения производительности LinkedBlockingQueue имеет пропускная способность выше, но производительность ниже.Стабильный, какой из них использовать в реальной ситуации и решать самостоятельно после тестирования.Кстати, newFixedThreadPool of Executors использует LinkedBlockingQueue)
- RejectedExecutionHandler рекомендуется реализовывать самостоятельно, встроенный JDK не очень полезен, в нем можно реализовать свою логику. Если вам нужна какая-то конкретная контекстная информация, вы можете добавить что-то свое в класс реализации Runnable, который можно использовать непосредственно в RejectedExecutionHandler.
Как не пропустить задачи
На самом деле это относится к частному случаю, то есть, например, внезапному всплеску трафика, что приводит к очень высокой нагрузке на пул потоков, то есть, когда политика отклонения вот-вот будет запущена, что мы можем сделать, чтобы предотвратить отправку задачи от потери как можно больше. Вообще говоря, при возникновении такой ситуации следует как можно скорее включить сигнал тревоги, чтобы уведомить персонал НИОКР о необходимости разобраться с ней. После чего то ли ограничить ток, то ли увеличить машину, то ли вообще использовать кафку, редис или даже базу данных для временного хранения данных задачи, но ведь далеко вода не спасет ближний огонь, если мы надеемся официально решить это Проблема Прежде постарайтесь максимально облегчить, о чем вы можете подумать?
Первое, что нужно учитывать, это динамическое увеличение количества потоков в пуле потоков, о котором я упоминал ранее, но если он был расширен, он не должен продолжать расширяться в это время, иначе пропускная способность системы может быть ниже. В этом случае вам следует реализовать RejectedExecutionHandler самостоятельно, а именно в классе реализации открыть однопоточный пул потоков, а затем вызвать метод put метода getQueue исходного пула потоков и попытаться подключить задачи, которые не могут снова подключитесь. Конечно, когда очередь заполнена, ее нельзя подключить, но, по крайней мере, это блокирует только один поток и не влияет на основной процесс.
Конечно, это лишь временное решение, в условиях всплесков трафика в отрасли существует много зрелых практик, но с точки зрения пулов потоков этот подход является временным решением.
об авторе
Лу Ядун, технический эксперт интернет-компании в области управления рисками, в основном фокусируется на высокой производительности, высоком уровне параллелизма, а также на базовых принципах и настройке промежуточного программного обеспечения.