Нет публики:Маленькое кофейное шоу Java,Веб-сайт:javaxks.com
Автор: kingsleylam, ссылка: cnblogs.com/kingsleylam/p/11241625.html
Недавно я прочитал исходный код пула потоков JDK ThreadPoolExecutor и имею общее представление о процессе выполнения задач в пуле потоков, на самом деле этот процесс очень прост и понятен, поэтому повторяться не буду , Другие написали это намного лучше, чем я.
Однако меня больше интересует, как пул потоков повторно использует рабочие потоки, поэтому я кратко проанализировал его и углубил свое понимание пула потоков.
Далее в качестве примера для анализа используется JDK1.8.
1. runWorker(Worker w)
После запуска рабочего потока он входит в метод runWorker(Worker w).
Есть цикл while, цикл оценивает, пуста ли задача, если она не пуста, выполнить задачу, если задача не может быть получена или возникает исключение, выйти из цикла и выполнить processWorkerExit(w, completeAbruptly); В этот метод, переместите рабочий поток, чтобы избавиться от него.
Есть два источника для получения задач: первый — firstTask, задача, которая выполняется при первом запуске рабочего потока и может быть выполнена не более одного раза, после чего задача должна быть взята из метода getTask(). Кажется, что ключом является getTask().Если исключение не рассматривается, возврат null означает выход из цикла и завершение потока. Следующий шаг — посмотреть, при каких обстоятельствах getTask() вернет null.
(ограниченное пространство, сегментированный перехват, опускание шагов выполнения заданий посередине)
2. GetTask() возвращает NULL
Есть два случая, в которых будет возвращено значение null, см. красное поле.
В первом случае пул потоков находится в состоянии STOP, TIDYING, TERMINATED или SHUTDOWN, а рабочая очередь пуста;
Во втором случае количество рабочих потоков превысило максимальное количество потоков или время ожидания текущего рабочего потока истекло, а есть другие рабочие потоки или очередь задач пуста. Это сложнее понять, короче говоря, сначала запомните, потом будете использовать.
В дальнейшем условие 1 и условие 2 используются для обозначения условий оценки этих двух случаев соответственно.
3. Проанализируйте пул потоков, чтобы повторно использовать рабочие потоки по сценарию.
3.1 Сценарий, в котором все задачи выполняются в состоянии RUNNING без вызова shutdown()
В этом сценарии количество рабочих потоков будет уменьшено до размера количества основных потоков (если оно не превышено, перезапуск не требуется).
Например, в пуле потоков количество основных потоков равно 4, а максимальное количество потоков равно 8. В начале рабочих потоков 4. Когда очередь задач заполнена, количество рабочих потоков необходимо увеличить до 8. Когда последующие задачи почти выполнены и поток не может получить задачу, состояние 4 рабочих потоков будет восстановлен (в зависимости от значения allowCoreThreadTimeOut, здесь обсуждается значение по умолчанию false, то есть основной поток не будет истекать по тайм-ауту. Если это значение равно true, все рабочие потоки могут быть уничтожены).
Условие 1, упомянутое выше, может быть исключено первым, состояние пула потоков уже равно STOP, TIDYING, TERMINATED или SHUTDOWN, а рабочая очередь пуста. Поскольку пул потоков всегда РАБОТАЕТ, это суждение всегда ложно. В этом сценарии можно использовать условие 1, когда условие 1 не существует.
Далее анализируется, как работает поток, когда задача не может быть удалена.
шаг 1. Есть два способа получить задачи из очереди задач. Вы все еще можете заблокировать, если вы ждете тайм-аут. Определяющим фактором является временная переменная. Эта переменная присваивается впереди, если текущее количество потоков больше, чем количество основных потоков, переменная timed имеет значение true, в противном случае — значение false (как упоминалось выше, здесь обсуждается только случай, когда значение allowCoreThreadTimeOut равно false). Очевидно, речь идет о случае, когда время верно. keepAliveTime вообще не устанавливается, а значение по умолчанию равно 0, поэтому в принципе его можно считать неблокирующим, и результат выполнения задачи возвращается сразу.
После того, как поток истечет и ожидает пробуждения, обнаруживается, что задача не может быть снята, timeOut становится истинным, и начинается следующий цикл.
шаг 2. Приходите к выводу об условии 1, пул потоков всегда РАБОТАЕТ и не входит в блок кода.
Шаг 3. Приходим к решению условия 2. В это время очередь задач пуста, условие выполнено, CAS уменьшает количество потоков, в случае успеха возвращает ноль, в противном случае повторяем шаг 1.
Здесь следует отметить, что несколько потоков могут одновременно принять решение об условии 2. Будет ли уменьшено количество более поздних потоков вместо ожидаемого количества основных потоков?
Например, текущее количество потоков составляет всего 5. В это время одновременно просыпаются два потока. Благодаря решению условия 2 и одновременному уменьшению числа оставшееся количество потоков составляет всего 3, что не соответствует ожиданиям.
На самом деле не будет. Чтобы предотвратить эту ситуацию, compareAndDecrementWorkerCount(c) использует метод CAS.Если CAS дает сбой, продолжайте, переходите к следующему циклу и переоценивайте.
Как и в приведенном выше примере, один из потоков выйдет из строя CAS, а затем снова войдет в цикл.Обнаружено, что количество рабочих потоков всего 4, а время неверно.Этот поток не будет уничтожен и может быть заблокирован все время (workQueue.take()).
Я долго думал об этом, прежде чем пришел к ответу, я думал о том, как сделать так, чтобы количество потоков ядра можно было восстановить без блокировки. Это оказалось загадкой CAS.
Из этого также видно, что хотя есть основные потоки, но потоки не различают, являются ли они ядрами или неосновными.Это не означает, что ядра создаются первыми, а неосновные создаются после количества ядер. потоки превышены.Какие потоки сохраняются в конце, совершенно случайно.
3.2 Вызов shutdown(), сценарий, в котором все задачи выполнены
В этом сценарии, будь то основные или неосновные потоки, все рабочие потоки будут уничтожены.
После вызова shutdown() всем бездействующим рабочим потокам отправляется сигнал прерывания.
Наконец, передайте false и вызовите следующий метод.
Видно, что перед выдачей сигнала прерывания он определяет, был ли он прерван, и получает эксклюзивную блокировку рабочего потока.
Когда выдается сигнал прерывания, рабочий поток либо готовится получить задачу в getTask(), либо выполняет задачу, тогда он не будет выдан, пока не завершит выполнение текущей задачи, потому что, когда рабочий поток выполняет задачу , рабочий поток также добавит Lock. После того, как рабочий поток завершает выполнение задачи, он снова переходит к getTask().
Так что нам просто нужно посмотреть, как работать с исключениями прерывания в getTask().
3.2.1 Задача выполнена, поток заблокирован и ожидает.
Очень просто, сигнал прерывания пробуждает его и переходит к следующему циклу. Когда достигается условие 1, условие выполняется, количество рабочих потоков уменьшается, возвращается ноль, а поток завершается внешним уровнем.
Функция decrementWorkerCount() здесь вращается и должна быть уменьшена на 1.
3.2.2 Задача выполнена не полностью
После вызова shutdown() незавершенные задачи должны быть выполнены до завершения работы пула. Так что вполне возможно, что поток все еще работает в это время.
Это обсуждение будет разделено на два этапа
Фаза 1 имеет много задач, и все рабочие потоки могут получать задачи.
Выход потока здесь не задействован, его можно пропустить и просто проанализировать производительность потока после получения сигнала прерывания.
Предположим, есть поток A, который получает задачи через getTask(). В этот момент прерывается A. При получении задачи, будь то poll() или take(), будет выдано исключение прерывания. Исключение перехватывается, и происходит повторный вход в следующий цикл.Пока очередь не пуста, вы можете продолжать выборку задач.
Поток A прерван, возьмите задачу снова и вызовите workQueue.poll() или workQueue.take(), не вызовет ли это исключение? Можно ли еще нормально выносить задания?
Это зависит от реализации workQueue. workQueue является типом BlockingQueue.В качестве примера можно взять общие LinkedBlockingQueue и ArrayBlockingQueue. Этот метод, в свою очередь, вызывает методAcquireInterruptably(int arg) AQS.
AcquireInterruptably(int arg), будь то оценка исключения прерывания на входе или блокировка в методе parkAndCheckInterrupt(), когда он пробуждается прерыванием и оценивает исключение прерывания, используется Thread.interrupted(). Этот метод вернет прерванное состояние потока и сбросит прерванное состояние! Другими словами, поток больше не находится в прерванном состоянии, так что при повторном вызове задачи не будет сообщено об ошибке.
Таким образом, для потока, готовящегося к выборке задачи, это равносильно пустой трате цикла. Это может быть побочным эффектом прерывания потока. Конечно, это не влияет на работу в целом.
Проанализировав это, я не могу не вздохнуть. Здесь BlockingQueue просто сбрасывает прерванное состояние. Как появился этот замечательный дизайн? Дуг Леа Орз.
Этап 2. Задача вот-вот будет завершена.
На данный момент задача закэширована, например, есть 4 рабочих потока и осталось только 2 задачи, то 2 потока могут получить задачи и 2 потока могут быть заблокированы.
Поскольку суждения перед получением задачи блокировки нет, будет ли казаться, что все потоки прошли предыдущую проверку и доходят до места, где workQueue получает задачу, просто очередь задач пуста, а все потоки заблокированы? Поскольку было выполнено shutdown(), в поток не может быть отправлен сигнал прерывания, поэтому поток заблокирован и не может быть перезапущен.
Этого не произойдет.
Предположим, что есть четыре рабочих потока A, B, C и D, которые одновременно оценивают условие 1 и условие 2 и достигают места, где выполняется задача. Тогда в рабочей очереди все еще есть по крайней мере одна задача, и по крайней мере один поток может получить эту задачу. Рекомендуется: Сборник практических занятий по Java-интервью
Предположим, A, B получают задание, C, D блокируют.
А, Б Следующие шаги:
шаг 1. После завершения выполнения задачи снова вызовите функцию getTask(). На этот раз она удовлетворяет условию 1, возвращает ноль, и поток готов к повторному использованию.
step2.processWorkerExit(Рабочий w, логическое значение завершеноВнезапно) Повторить поток.
Переработка просто убивает нить? Давайте взглянем на метод processWorkerExit(Worker w, boolean CompletedAbruptly) .
Как видите, в дополнение к work.remove(w) для удаления строки также вызывается tryTerminate().
Первое условие суждения не имеет подусловия, пропустите его. Второе условие: рабочий поток все еще существует, а затем случайным образом прерывает бездействующий поток.
Затем возникает проблема: прерывание бездействующего потока не означает, что он должен прервать заблокированный поток. Если A и B выходят одновременно, возможно ли, что A прерывает B, B прерывает A и AB прерывает друг друга, так что нет потока, который можно было бы прервать и разбудить заблокированный поток?
Ответ по-прежнему, слишком много думать ...
Предполагая, что A может попасть сюда, это означает, что A был удален из набора рабочих процессов рабочего потока (processWorkerExit(Worker w, boolean CompletedAbruptly) был удален до tryTerminate()). Тогда А перебивает Б, и Б приходит сюда, чтобы перебить, и А не найдется среди рабочих.
То есть выходящие потоки не могут прерывать друг друга.После того, как я выхожу из коллекции, я прерываю вас, вы не можете меня прерывать, потому что я уже вышел из коллекции, вы можете только прерывать другие. Тогда, даже если N потоков завершится одновременно, по крайней мере в конце останется один поток, который прервет оставшиеся заблокированные потоки.
Как и в домино, сигнал прерывания распространяется.
После того, как любой из заблокированных потоков C и D будет прерван и пробужден, действие шага 1 будет повторяться снова и снова, пока все заблокированные потоки не будут прерваны и пробуждены.
Вот почему в tryTerminate() при передаче false необходимо прервать только любой бездействующий поток.
Думая об этом, я снова испытываю чувство восхищения (кантонский диалект) Дуга Ли. Это также очень хорошо разработано.
4. Резюме
ThreadPoolExecutor перезапускает рабочие потоки, а поток getTask() возвращает null и будет перезапущен.
Есть два сценария.
1. Сценарий, в котором shutdown() не вызывается и все задачи выполняются в состоянии RUNNING
Если количество потоков больше, чем corePoolSize, поток блокируется из-за тайм-аута. По истечении тайм-аута CAS уменьшает количество рабочих потоков. Если CAS завершается успешно, он возвращает значение null, и поток перезапускается. В противном случае перейти к следующему циклу. Когда количество рабочих потоков меньше или равно corePoolSize, он всегда может быть заблокирован.
2. Вызов shutdown() для завершения выполнения всех задач.
shutdown() отправит сигнал прерывания всем потокам, и есть две возможности.
2.1) Все потоки блокируются
Прерывание пробуждения, вход в цикл, все удовлетворяют первому условию оценки if, все возвращают null, и все потоки перерабатываются.
2.2) Задание выполнено не полностью
По крайней мере один поток будет переработан. В методе processWorkerExit(Worker w, boolean CompletedAbruptly) вызывается функция tryTerminate() для отправки сигнала прерывания любому бездействующему потоку. Все заблокированные потоки в конечном итоге будут пробуждены и переработаны один за другим.