Контратака шлака: восемнадцать вопросов пула потоков

задняя часть опрос
Контратака шлака: восемнадцать вопросов пула потоков

Привет всем, я третий ребенок, и я рад снова встретиться с вами. Пул потоков — это вопрос знаний, который необходимо задать на собеседовании В этом разделе мы встретимся с интервьюером и поймем пул потоков.

1. Что такое пул потоков?

Пул потоков:Проще говоря, это пул для управления потоками.

  • Это помогает нам управлять потоками и избегать увеличения потребления ресурсов при создании и уничтожении потоков.. Поскольку поток на самом деле является объектом, для создания объекта ему необходимо пройти через процесс загрузки класса, уничтожить объект и пройти через процесс сборки мусора GC, что требует дополнительных ресурсов.
  • Улучшить отзывчивость.Если задача прибывает, это должно быть намного медленнее, чем получение потока из пула потоков и повторное создание потока для выполнения.
  • повторное использование.После того, как поток израсходован, он помещается обратно в пул, что позволяет добиться эффекта повторного использования и экономии ресурсов.

2. Можете ли вы рассказать о применении пулов потоков на работе?

Раньше у нас было требование подключаться к третьей стороне, и нам нужно было отправлять данные третьей стороне.Многопоточность была введена для повышения эффективности передачи данных, а для управления потоками использовался пул потоков.

业务示例

Основной код выглядит следующим образом:

主要代码

Полный адрес исполняемого кода:git ee.com/fighter3/дней спустя…

Параметры пула потоков следующие:

  • corePoolSize: Параметр ядра потока выбирает количество ЦП × 2

  • maxPoolSize: максимальное количество потоков выбирается равным количеству основных потоков.

  • keepAliveTime: время выживания неосновных бездействующих потоков напрямую установлено на 0.

  • unit: TimeUnit.SECONDS секунды выбираются для времени, в течение которого неосновные потоки остаются в живых.

  • workQueue: очередь ожидания пула потоков, используйте LinkedBlockingQueue, чтобы заблокировать очередь

В то же время синхронизированный используется для блокировки, чтобы гарантировать, что данные не будут передаваться повторно:

  synchronized (PushProcessServiceImpl.class) {}

ps: Этот пример просто проталкивает данные, на самом деле его можно комбинировать с другими сервисами, такими как очистка данных и статистика данных, которые можно применять.

3. Можете ли вы кратко описать рабочий процесс пула потоков?

Чтобы использовать общую аналогию:

Есть бизнес-зал с шестью окнами, и теперь три окна открыты, и теперь в трех окнах сидят и работают три продавца и сестры.

Что может случиться, когда третий ребенок пойдет заниматься бизнесом?

  1. Третий ребенок обнаружил, что в служебном окне есть место, и пошел прямо к юной леди, чтобы заняться бизнесом.

直接办理

  1. Третий ребенок обнаружил, что свободного окна нет, поэтому отстоял очередь в зоне ожидания.

排队等待

  1. Младший не обнаружил праздного окна, зона ожидания была заполнена, Бенгбу вживую, менеджер смотрит, пусть остальные младшие сестры спешили вернуться на работу, спешат к коду зоны ожидания, чтобы сделать новое окно, третий в очереди на зона очереди. Маленькая сестра сложнее, если они не могут найти период времени, то менеджер по продажам, а затем пусть отдыхают.

排队区满

  1. Третий ребенок увидел, что все шесть окон заполнены, а в зоне ожидания нет места. Третий ребенок торопится и хочет напакостить Менеджер вышел быстро Что делать менеджеру?

等待区,排队区都满

  1. Наша банковская система рухнула

  2. Кто просил тебя сделать это, к кому ты ходил

  3. Посмотрите, если вы спешите, перейдите в команду и добавьте стопор

  4. Ни в коем случае сегодня

Описанный выше процесс почти аналогичен общему процессу пула потоков JDK.

  1. Количество основных пулов потоков, соответствующих трем работающим окнам: corePoolSize.
  2. Общее количество бизнес-окон соответствует 6: maxPoolSize
  3. Открытое временное окно будет закрыто, если с ним никто не будет обращаться в течение определенного периода времени.
  4. Область очереди — это очередь ожидания: workQueue
  5. Когда это не может быть обработано, решение, данное банком, соответствует: RejectedExecutionHandler
  6. threadFactory Этот параметр представляет собой фабрику потоков в JDK, которая используется для создания объектов потоков и обычно не перемещается.

Таким образом, рабочий процесс нашего пула потоков лучше понятен:

  1. Когда пул потоков только что создан, в нем нет ни одного потока. Очередь задач передается в качестве параметра. Однако даже если в очереди есть задачи, пул потоков не будет выполнять их сразу.
  2. При вызове метода execute() для добавления задачи пул потоков будет принимать следующие решения:
  • Если количество запущенных потоков меньше, чем corePoolSize, создайте поток для немедленного запуска задачи;
  • Если количество запущенных потоков больше или равно corePoolSize, поставить задачу в очередь;
  • Если в это время очередь заполнена, а количество запущенных потоков меньше maxPoolSize, то создайте неосновной поток для немедленного запуска задачи;
  • Если очередь заполнена и количество запущенных потоков больше или равно максимальному размеру пула, пул потоков будет обрабатывать его в соответствии с политикой отклонения.

线程池执行流程

  1. Когда поток завершает задачу, он берет следующую задачу из очереди для выполнения.

  2. Когда поток не занят и превышает определенное время (keepAliveTime), пул потоков решит, что если количество текущих потоков больше, чем corePoolSize, поток будет остановлен. Таким образом, после того, как все задачи пула потоков будут выполнены, он в конечном итоге уменьшится до размера corePoolSize.

4. Каковы основные параметры пула потоков?

  1. corePoolSize

Это значение используется для инициализации количества основных потоков в пуле потоков.Когда количество пулов потоков в пуле потоков corePoolSizeКогда по умолчанию нужно добавить задачу для создания пула потоков. Когда количество потоков = corePoolSize, новые задачи добавляются в workQueue.

  1. maximumPoolSize

maximumPoolSizeУказывает максимально допустимое количество потоков = (количество неосновных потоков + количество основных потоков), когдаBlockingQueueТакже полный, но общее количество потоков в пуле потоков maximumPoolSizeЗатем снова будет создан новый поток.

  1. keepAliveTime

Неосновные потоки = (maximumPoolSize — corePoolSize), неосновные потоки простаивают и не работают в течение максимального времени выживания.

  1. unit

Единица времени, в течение которой неосновные потоки в пуле потоков будут оставаться активными.

  • TimeUnit.DAYS; дни
  • TimeUnit.HOURS;часы
  • TimeUnit.MINUTES; минуты
  • TimeUnit.SECONDS; с
  • TimeUnit.MILLISECONDS; миллисекунды
  • TimeUnit.MICROSECONDS;микросекунды
  • TimeUnit.NANOSECONDS; наносекунды
  1. workQueue

Очередь ожидания пула потоков, поддерживающая ожидание выполненияRunnableобъект. При запуске, когда количество потоков = corePoolSize, новые задачи добавляются вworkQueue, еслиworkQueueЕсли она заполнена, старайтесь выполнять задачи с непрофильными потоками, а очередь ожидания должна быть максимально ограничена.

  1. threadFactory

Фабрика, используемая при создании нового потока, может использоваться для установки имени потока, независимо от того, является ли он потоком демона и т. д.

  1. handler

corePoolSize,workQueue,maximumPoolSizeСтратегия насыщения для выполнения, когда ничего не доступно.

5. Каковы политики отклонения пула потоков?

По аналогии с предыдущим примером способ обработки, когда с делом не справиться, может помочь памяти:

四种策略

  • AbortPolicy : создает исключение напрямую, эта политика используется по умолчанию.
  • CallerRunsPolicy: используйте поток, в котором находится вызывающий объект, для выполнения задачи.
  • DiscardOldestPolicy: отбросить самую старую задачу в очереди блокировки, то есть задачу в начале очереди.
  • DiscardPolicy : текущая задача сразу отбрасывается.

Если вы хотите реализовать собственную стратегию отклонения, вы можете реализовать интерфейс RejectedExecutionHandler.

6. Какие виды рабочих очередей существуют в пуле потоков?

Обычно используемые очереди блокировки в основном включают следующее:

  • ArrayBlockingQueue: ArrayBlockingQueue (ограниченная очередь) — это ограниченная очередь блокировки, реализованная с помощью массива, отсортированного по принципу FIFO.
  • LinkedBlockingQueue: LinkedBlockingQueue (можно установить очередь емкости) — это очередь блокировки, основанная на структуре связанного списка. Задачи сортируются по принципу FIFO. Емкость может быть установлена ​​дополнительно. Если не задано, это будет неограниченная очередь блокировки с максимальной длиной Integer.MAX_VALUE и пропускная способность. Обычно выше, чем ArrayBlockingQuene; пул потоков newFixedThreadPool использует эту очередь.
  • DelayQueue: DelayQueue (очередь с задержкой) — это очередь для отложенного выполнения задачи на определенный период времени. Сортировать по указанному времени выполнения от меньшего к большему, иначе сортировать по порядку постановки в очередь. Пул потоков newScheduledThreadPool использует эту очередь.
  • PriorityBlockingQueue: PriorityBlockingQueue — неограниченная очередь блокировки с приоритетом.
  • SynchronousQueue: SynchronousQueue (синхронная очередь) — это блокирующая очередь, в которой не хранятся элементы.Каждая операция вставки должна ждать, пока другой поток не вызовет операцию удаления, иначе операция вставки всегда будет заблокирована, а пропускная способность обычно выше, чем у LinkedBlockingQueue. Пул потоков newCachedThreadPool использует эту очередь.

7. В чем разница между пулом потоков submit execute и submit?

  1. execute используется для отправки задач, которые не требуют возвращаемого значения
threadsPool.execute(new Runnable() { 
    @Override public void run() { 
        // TODO Auto-generated method stub } 
    });
  1. Метод submit() используется для отправки задач, требующих возвращаемого значения. Пул потоков вернет объект типа future, через который можно использовать объект future для определения успешности выполнения задачи, а возвращаемое значение можно получить через метод get() будущего
Future<Object> future = executor.submit(harReturnValuetask); 
try { Object s = future.get(); } catch (InterruptedException e) { 
    // 处理中断异常 
} catch (ExecutionException e) { 
    // 处理无法执行任务异常 
} finally { 
    // 关闭线程池 executor.shutdown();
}

8. Знаете ли вы, как закрыть пул потоков?

путем вызова пула потоковshutdownилиshutdownNowметод закрытия пула потоков. Их принцип состоит в том, чтобы пройти рабочие потоки в пуле потоков, а затем вызвать метод прерывания потока один за другим, чтобы прервать поток, поэтому задача, которая не может ответить на прерывание, может никогда не быть завершена.

  1. shutdown() устанавливает состояние пула потоков на выключение и не останавливается немедленно:
  1. Прекратить получение внешней отправки задачи
  2. Внутренне запущенные задачи и задачи, ожидающие в очереди, будут выполнены
  3. Дождитесь завершения второго шага, затем действительно остановитесь.
  1. shutdownNow() устанавливает состояние пула потоков на остановку. Обычно он останавливается сразу, на самом деле не обязательно:
  1. Как и в случае с shutdown(), сначала перестаньте получать отправленные извне задачи.
  2. Игнорировать задачи, ожидающие в очереди
  3. Попытка прервать выполнение задачи
  4. Вернуться к невыполненным заданиям

Разница между shutdown и shutdownnow заключается в следующем:

shutdownNow() может немедленно остановить пул потоков, при этом останавливаются как запущенные, так и ожидающие задачи. Это вступает в силу немедленно, но риск также относительно высок. shutdown() просто закрывает канал отправки, а использование submit() недопустимо; как запустить внутренние задачи или как запустить, а затем полностью остановить пул потоков после запуска.

9. Как настроить количество потоков в пуле потоков?

Потоки — дефицитные ресурсы в Java, а пул потоков не настолько велик, насколько это возможно, и не настолько мал, насколько это возможно. Задачи подразделяются на ресурсоемкие, интенсивно использующие операции ввода-вывода и смешанные.

  1. Интенсивность вычислений: большинство из них используют ЦП и память, шифрование, бизнес-обработку логических операций и т. д.
  2. Интенсивный ввод-вывод: связь с базой данных, передача данных по сети и т. д.
  1. Обычно рекомендуется, чтобы пул потоков не был слишком большим для типов с интенсивными вычислениями, обычно число ЦП + 1, +1 связано с тем, что может бытьстраница отсутствует(То есть на жестком диске могут быть какие-то данные, для чтения которых в память требуется еще один поток). Если количество пулов потоков слишком велико, переключение контекста потока и планирование задач могут выполняться часто. Код для получения текущего количества ядер ЦП выглядит следующим образом:
Runtime.getRuntime().availableProcessors();
  1. Интенсивный ввод-вывод: количество потоков соответственно больше, а количество ядер ЦП машины * 2.
  2. Гибридный: его можно считать полностью разделенным на задачи с интенсивным использованием ЦП и интенсивным вводом-выводом.Если время выполнения не сильно отличается, разделение может улучшить пропускную способность, в противном случае оно не требуется.

Конечно, в практических приложениях не существует фиксированной формулы, и ее необходимо корректировать в сочетании с тестированием и мониторингом.

10. Какие виды общих пулов потоков существуют?

Существует четыре основных типа, все из которых создаются с помощью класса инструментов Excutors.Руководство по разработке Java от Alibaba запрещает использование этого метода для создания пулов потоков.

  • newFixedThreadPool (пул потоков с фиксированным количеством потоков)

  • newCachedThreadPool (пул потоков для кешируемых потоков)

  • newSingleThreadExecutor (однопоточный пул потоков)

  • newScheduledThreadPool (пул потоков с временным и периодическим выполнением)

11. Можете ли вы рассказать о принципах четырех общих пулов потоков?

Создание первых трех пулов потоков напрямую вызывает метод построения ThreadPoolExecutor.

newSingleThreadExecutor

  public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

Функции пула потоков

  • Количество основных потоков равно 1.
  • Максимальное количество потоков также равно 1.
  • Очередь блокировки — это неограниченная очередь LinkedBlockingQueue, что может привести к OOM.
  • KeepAliveTime равно 0

SingleThreadExecutor运行流程

процесс работы:

  • Отправить задачу
  • Есть ли нить в пуле резьбы, если нет, создайте новую тему для выполнения задачи
  • Если есть, добавьте задачу в очередь блокировки
  • Текущий единственный поток извлекает задачи из очереди, выполняет одну, затем продолжает выборку, и один поток выполняет задачу.

Применимая сцена

Он подходит для сценариев, в которых задачи выполняются последовательно, по одной задаче за раз.

newFixedThreadPool

  public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

Особенности пула потоков:

  • Количество основных потоков совпадает с максимальным количеством потоков.
  • Нет так называемого non-idle time, т.е. keepAliveTime равно 0
  • Очередь блокировки — это неограниченная очередь LinkedBlockingQueue, что может привести к OOM.

FixedThreadPool

процесс работы:

  • Отправить задачу
  • Если количество потоков меньше основного потока, создайте основной поток для выполнения задачи.
  • Если количество потоков равно количеству основных потоков, добавьте задачу в очередь блокировки LinkedBlockingQueue.
  • Если поток завершает выполнение задачи, перейдите в очередь блокировки, чтобы получить задачу и продолжить выполнение.

сцены, которые будут использоваться

FixedThreadPool подходит для обработки ресурсоемких задач, следит за тем, чтобы ЦП в случае длительного использования рабочих потоков выделял как можно меньше потоков, то есть подходит для выполнения долгосрочной задачи.

newCachedThreadPool

   public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

Особенности пула потоков:

  • Количество основных потоков равно 0
  • Максимальное количество потоков — Integer.MAX_VALUE, оно бесконечно, что может привести к OOM из-за бесконечного создания потоков.
  • Очередь блокировки — SynchronousQueue.
  • Время простоя неосновных потоков составляет 60 секунд.

Когда скорость отправки задач больше, чем скорость обработки задач, каждый раз при отправке задачи обязательно создается поток. В крайних случаях создается слишком много потоков, истощающих ресурсы ЦП и памяти. Поскольку потоки, бездействующие в течение 60 секунд, завершаются, CachedThreadPool, который остается бездействующим в течение длительного времени, не будет потреблять никаких ресурсов.

CachedThreadPool执行流程

процесс работы:

  • Отправить задачу
  • Поскольку основных потоков нет, задачи добавляются непосредственно в очередь SynchronousQueue.
  • Определите, есть ли простаивающий поток, и если да, вынесите задачу на выполнение.
  • Если нет незанятого потока, создайте новый поток для выполнения.
  • Поток, выполнивший задачу, еще может существовать 60 секунд, если он получает задачу в течение этого периода, он может продолжать жить, в противном случае он уничтожается.

Применимая сцена

Используется для одновременного выполнения большого количества краткосрочных небольших задач.

newScheduledThreadPool

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

Функции пула потоков

  • Максимальное количество потоков — Integer.MAX_VALUE, также есть риск OOM
  • Очередь блокировки — DelayedWorkQueue.
  • KeepAliveTime равно 0
  • scheduleAtFixedRate(): выполнять периодически с определенной скоростью
  • scheduleWithFixedDelay(): выполнить после определенной задержки

ScheduledThreadPool执行流程

Рабочий механизм

  • Поток получает запланированную задачу ScheduledFutureTask с истекшим сроком действия из DelayQueue (DelayQueue.take()). Выполненная задача означает, что время ScheduledFutureTask больше или равно текущему времени.
  • Поток выполняет эту ScheduledFutureTask.
  • Поток изменяет переменную времени ScheduledFutureTask на время следующего выполнения.
  • Поток помещает ScheduledFutureTask после измененного времени обратно в DelayQueue (DelayQueue.add()).

ScheduledThreadPoolExecutor执行流程

сцены, которые будут использоваться

Сценарии, в которых задачи выполняются периодически, и сценарии, в которых необходимо ограничить количество потоков.

12. Будут ли проблемы с пулами потоков с неограниченными очередями?

Например, newFixedThreadPool использует неограниченную очередь блокировки LinkedBlockingQueue.Если поток получает задачу, время выполнения задачи относительно велико, что приведет к накоплению все большего количества задач в очереди, что приведет к использованию машинной памяти взлететь и в конечном итоге привести к OOM.

13. Как быть с пулом потоков?

При использовании пула потоков для обработки задач код задачи может вызвать исключение RuntimeException. После создания исключения пул потоков может перехватить его или создать новый поток для замены ненормального потока. Мы можем не заметить, что задача запущена. ненормально, поэтому нам нужно учитывать исключения пула потоков.

Общие методы обработки исключений:

线程池异常处理

14. Можете ли вы сказать мне, сколько состояний имеет пул потоков?

Пул потоков имеет следующие состояния: RUNNING, SHUTDOWN, STOP, TIDYING, TERMINATED.

   //线程池状态
   private static final int RUNNING    = -1 << COUNT_BITS;
   private static final int SHUTDOWN   =  0 << COUNT_BITS;
   private static final int STOP       =  1 << COUNT_BITS;
   private static final int TIDYING    =  2 << COUNT_BITS;
   private static final int TERMINATED =  3 << COUNT_BITS;

Диаграмма переключения состояний пула потоков:

线程池状态切换图

RUNNING

  • Пул потоков в этом состоянии будет получать новые задачи и обрабатывать задачи в очереди блокировки;
  • Вызовите метод shutdown() пула потоков, чтобы переключиться в состояние SHUTDOWN;
  • Вызовите метод shutdownNow() пула потоков, чтобы переключиться в состояние STOP;

SHUTDOWN

  • Пул потоков в этом состоянии не будет получать новые задачи, но будет обрабатывать задачи в очереди блокировки;
  • Очередь пуста, и задачи, выполняемые в пуле потоков, также пусты, переходя в состояние TIDYING;

STOP

  • Потоки в этом состоянии не будут получать новые задачи, не будут обрабатывать задачи в очереди блокировки и будут прерывать запущенные задачи;
  • Задача, выполняемая в пуле потоков, пуста и переходит в состояние TIDYING;

TIDYING

  • Это состояние указывает на то, что все задачи были завершены, а количество записанных задач равно 0.
  • После завершения () войдите в состояние TERMINATED

TERMINATED

  • Это состояние указывает на то, что пул потоков полностью завершен.

15. Как пул потоков реализует динамическое изменение параметров?

Пул потоков предоставляет несколько методов установки для установки параметров пула потоков.

JDK 线程池参数设置接口来源参考[7]

Здесь есть две основные идеи:

动态修改线程池参数

  • В рамках нашей микросервисной архитектуры мы можем использовать центры конфигурации, такие как Nacos, Apollo и т. д., или можем разработать центры конфигурации самостоятельно. Бизнес-служба считывает конфигурацию пула потоков и получает соответствующий экземпляр пула потоков для изменения параметров пула потоков.

  • Если вы ограничиваете использование центра конфигурации, вы также можете расширить его самостоятельноThreadPoolExecutor, Переопределите метод, отслеживайте изменения параметров пула потоков и динамически изменяйте параметры пула потоков.

16. Вы понимаете настройку пула потоков?

Не существует фиксированной формулы для настройки пула потоков. Обычно пулы потоков оцениваются заранее. Общие схемы оценки следующие:

线程池评估方案 来源参考[7]

Перед подключением к сети необходимо провести достаточное количество тестов, а после выхода в сеть установить идеальный механизм мониторинга пула потоков.

Объедините механизм мониторинга и сигнализации, чтобы проанализировать проблему пула потоков или оптимизировать точку и настроить конфигурацию в сочетании с механизмом настройки динамических параметров пула потоков.

После этого обратите внимание на внимательное наблюдение и отрегулируйте в любое время.

线程池调优

Для конкретных случаев настройки, пожалуйста, обратитесь к [7] Meituan Technology Blog.

17. Можете ли вы спроектировать и реализовать пул потоков?

⭐Этот вопрос часто появляется в интервью Али.

Принцип реализации пула потоков можно посмотретьЕсли бы кто-то говорил о таких пулах потоков раньше, я бы понял!, разумеется, реализуем сами, нужно только понять основной процесс пула потоков — см. [6]:

线程池主要实现流程

Наша собственная реализация должна завершить этот основной процесс:

  • В пуле потоков есть N рабочих потоков.
  • Отправьте задачу в пул потоков для запуска
  • Если пул потоков заполнен, поставьте задачу в очередь
  • Наконец, когда есть свободное время, заставьте задачи в очереди выполняться.

Код реализации [6]:

public class MyThreadPoolExecutor implements Executor {

    //记录线程池中线程数量
    private final AtomicInteger ctl = new AtomicInteger(0);

    //核心线程数
    private volatile int corePoolSize;
    //最大线程数
    private volatile int maximumPoolSize;

    //阻塞队列
    private final BlockingQueue<Runnable> workQueue;

    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, BlockingQueue<Runnable> workQueue) {
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
    }

    /**
     * 执行
     *
     * @param command
     */
    @Override
    public void execute(Runnable command) {
        //工作线程数
        int c = ctl.get();
        //小于核心线程数
        if (c < corePoolSize) {
            //添加任务失败
            if (!addWorker(command)) {
                //执行拒绝策略
                reject();
            }
            return;
        }
        //任务队列添加任务
        if (!workQueue.offer(command)) {
            //任务队列满,尝试启动线程添加任务
            if (!addWorker(command)) {
                reject();
            }
        }
    }

    /**
     * 饱和拒绝
     */
    private void reject() {
        //直接抛出异常
        throw new RuntimeException("Can not execute!ctl.count:"
                + ctl.get() + "workQueue size:" + workQueue.size());
    }

    /**
     * 添加任务
     *
     * @param firstTask
     * @return
     */
    private boolean addWorker(Runnable firstTask) {
        if (ctl.get() >= maximumPoolSize) return false;
        Worker worker = new Worker(firstTask);
        //启动线程
        worker.thread.start();
        ctl.incrementAndGet();
        return true;
    }

    /**
     * 线程池工作线程包装类
     */
    private final class Worker implements Runnable {
        final Thread thread;
        Runnable firstTask;

        public Worker(Runnable firstTask) {
            this.thread = new Thread(this);
            this.firstTask = firstTask;
        }

        @Override
        public void run() {
            Runnable task = firstTask;
            try {
                //执行任务
                while (task != null || (task = getTask()) != null) {
                    task.run();
                    //线程池已满,跳出循环
                    if (ctl.get() > maximumPoolSize) {
                        break;
                    }
                    task = null;
                }
            } finally {
                //工作线程数增加
                ctl.decrementAndGet();
            }
        }

        /**
         * 从队列中获取任务
         *
         * @return
         */
        private Runnable getTask() {
            for (; ; ) {
                try {
                    System.out.println("workQueue size:" + workQueue.size());
                    return workQueue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    //测试
    public static void main(String[] args) {
        MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor(2, 2,
                new ArrayBlockingQueue<Runnable>(10));
        for (int i = 0; i < 10; i++) {
            int taskNum = i;
            myThreadPoolExecutor.execute(() -> {
                try {
                    Thread.sleep(1500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任务编号:" + taskNum);
            });
        }
    }
}

Таким образом завершается класс, реализующий основной процесс пула потоков.

18. Что делать, если выполнение одномашинного пула потоков отключено?


Мы можем выполнять управление транзакциями для задач, которые обрабатываются и блокируются в очереди, или настойчиво обрабатывать задачи в очереди блокировки, а при отключении питания или сбое системы и невозможности продолжения операции ее можно отменить с помощью средств журнала возврата正在处理провел успешную операцию. Затем повторно выполните всю очередь блокировки.

То есть: очередь на блокировку персистентная, обрабатывается контроль транзакций задачи, обрабатывается откат задачи после сбоя питания, и работа восстанавливается через лог, данные в очереди на блокировку перезагружаются после сервера. перезагружается.


Ссылаться на:

[1] «Искусство параллельного программирования на Java».

[2] «Практика программирования на Java».

[3]. По правде говоря, на этот раз вы точно сможете легко изучить пулы потоков.

[4]. Требования к собеседованию: анализ пула потоков Java

[5]. Интервьюер спрашивает: «Вы используете слишком много потоков в своем проекте?» Расскажите ему об этом случае!

[6] Брат Сяо Фу "Руководство по Java Face Sutra"

[7]. Принцип реализации пула потоков Java и его практика в бизнесе Meituan


Статья впервые публикуется в личном паблике三分恶, добро пожаловать, обратите внимание!