Классические вопросы Java-интервью: тема пула потоков

интервью задняя часть
Классические вопросы Java-интервью: тема пула потоков

Резюме часто задаваемых вопросов в интервью по разработке Java, адрес GitHub:GitHub.com/В облаке 1…, продолжайте обновлять~, если это вам поможет, добро пожаловать, Звезда

1. Что такое пул потоков

Основная идея пула потоков — это пул объектов.При запуске программы открывается пространство памяти, в котором хранится множество (не мертвых) потоков.Планированием выполнения потоков в пуле занимается менеджер пула. При наличии задачи потока одна из них берется из пула, а объект потока возвращается в пул после завершения выполнения, что позволяет избежать снижения производительности, вызванного повторным созданием объектов потока, и сэкономить системные ресурсы.

2. Преимущества использования пулов потоков

  1. Количество создаваемых и уничтожаемых потоков уменьшается, и каждый рабочий поток можно повторно использовать для выполнения нескольких задач.
  2. Использование пула потоков позволяет эффективно контролировать максимальное количество одновременных потоков.Вы можете настроить количество рабочих потоков в пуле потоков в соответствии с емкостью системы, чтобы предотвратить истощение сервера из-за чрезмерного потребления памяти (каждому потоку требуется около 1 МБ памяти, чем больше потоков открывается, тем больше потребляется памяти и, наконец, вылетает).
  3. Некоторое простое управление потоками, такое как: отложенное выполнение, стратегия выполнения циклов по времени и т. д., может быть хорошо реализовано с использованием пулов потоков.

3. Основные компоненты пула потоков

Пул потоков состоит из следующих четырех основных компонентов:

  1. Диспетчер пула потоков (ThreadPool): используется для создания пулов потоков и управления ими, включая создание пулов потоков, уничтожение пулов потоков и добавление новых задач;
  2. WorkThread: поток в пуле потоков находится в состоянии ожидания, когда нет задачи, и может выполнять задачи циклически;
  3. Интерфейс задачи (Task): интерфейс, который каждая задача должна реализовать для рабочего потока, чтобы запланировать выполнение задачи, в основном определяет вход задачи, завершающую работу после выполнения задачи, статус выполнения задачи, и т.д.;
  4. Очередь задач (taskQueue): используется для хранения необработанных задач. Обеспечивает буферный механизм.

4. Класс ThreadPoolExecutor

Когда дело доходит до пулов потоков, мы должны сосредоточиться на классе java.uitl.concurrent.ThreadPoolExecutor, самом основном классе в пуле потоков ThreadPoolExecutor.Схема класса UML ThreadPoolExecutor, обычно используемая в пулах потоков в JDK, выглядит следующим образом:

Мы можем создать пул потоков через ThreadPoolExecutor

new ThreadPoolExecutor(corePoolSize, maximumPoolSize,keepAliveTime, 
milliseconds,runnableTaskQueue, threadFactory,handler);

1. Для создания пула потоков требуется ввести несколько параметров

  • corePoolSize (базовый размер пула потоков): когда задача отправляется в пул потоков, пул потоков создает поток для выполнения задачи. Даже если другие бездействующие базовые потоки могут выполнять новые задачи, поток будет создан. Когда количество задач, которые необходимо выполнить, больше чем базовый размер пула потоков, он больше не будет выполняться. Если вызывается метод prestartAllCoreThreads пула потоков, пул потоков заранее создаст и запустит все основные потоки.
  • maxPoolSize (максимальный размер пула потоков): максимальное количество потоков, которое может быть создано пулом потоков. Если очередь заполнена, а количество уже созданных потоков меньше максимального количества потоков, пул потоков создаст новые потоки для выполнения задач. Стоит отметить, что этот параметр не действует, если используется неограниченная очередь задач.
  • runnableTaskQueue (очередь задач): блокирующая очередь для хранения задач, ожидающих выполнения.
  • ThreadFactory: используется для установки фабрики для создания потоков.Вы можете задать более осмысленное имя для каждого созданного потока с помощью фабрики потоков, что очень полезно при отладке и обнаружении проблем.
  • RejectedExecutionHandler (политика отклонения): когда очередь и пул потоков заполнены, что указывает на насыщение пула потоков, необходимо принять стратегию для обработки новых отправленных задач. Эта политика по умолчанию — AbortPolicy, что означает, что исключение создается, когда новая задача не может быть обработана. Ниже приведены четыре стратегии, предоставляемые JDK1.5. n AbortPolicy: прямое создание исключения.
  • keepAliveTime (живой поток сохраняет время): время, в течение которого рабочий поток пула потоков остается в живых после простоя. Следовательно, если задач много и время выполнения каждой задачи относительно короткое, это время можно увеличить, чтобы улучшить использование потоков.
  • TimeUnit (единица времени сохранения активности потока): Необязательные единицы измерения: дни (DAYS), часы (HOURS), минуты (MINUTES), миллисекунды (MILLISECONDS), микросекунды (MICROSECONDS, одна тысячная миллисекунды) и наносекунды (NANOSECONDS, одна тысячная микросекунды) ).

2. Отправить задачу в пул потоков

Мы можем отправлять задачи в пул потоков с помощью методов execute() или submit(), но они разные.

  • Метод execute() не имеет возвращаемого значения, поэтому невозможно судить об успешном выполнении задачи пулом потоков.
threadsPool.execute(new Runnable() {
    @Override
    public void run() {
    // TODO Auto-generated method stub
   }
});
  • Метод submit() возвращает будущее, затем мы можем использовать это будущее, чтобы определить, успешно ли выполнена задача, и получить возвращаемое значение с помощью метода get будущего.
try {
     Object s = future.get();
   } catch (InterruptedException e) {
   // 处理中断异常
   } catch (ExecutionException e) {
   // 处理无法执行任务异常
   } finally {
   // 关闭线程池
   executor.shutdown();
}

3. Закрытие пула потоков

Мы можем закрыть пул потоков с помощью метода shutdown() или shutdownNow(), но они также отличаются

  • Принцип отключения заключается в том, чтобы просто установить состояние пула потоков в состояние SHUTDOWN, а затем прервать все потоки, которые не выполняют задачи.
  • Принцип shutdownNow состоит в том, чтобы пройти рабочие потоки в пуле потоков, а затем вызвать метод прерывания потока один за другим, чтобы прервать поток, поэтому задача, которая не может ответить на прерывание, может никогда не быть завершена. shutdownNow сначала установит состояние пула потоков в STOP, затем попытается остановить все потоки, которые выполняют или приостанавливают задачи, и возвращает список задач, ожидающих выполнения.

4. Стратегия, выполняемая ThreadPoolExecutor

    /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         * 如果当前的线程数小于核心线程池的大小,根据现有的线程作为第一个Worker运行的线程,
         * 新建一个Worker,addWorker自动的检查当前线程池的状态和Worker的数量,
         * 防止线程池在不能添加线程的状态下添加线程
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *  如果线程入队成功,然后还是要进行double-check的,因为线程池在入队之后状态是可能会发生变化的
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         * 
         * 如果task不能入队(队列满了),这时候尝试增加一个新线程,如果增加失败那么当前的线程池状态变化了或者线程池已经满了
         * 然后拒绝task
         */
        int c = ctl.get();
        //当前的Worker的数量小于核心线程池大小时,新建一个Worker。
        if (workerCountOf(c) < corePoolSize) { 
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //如果当前CorePool内的线程大于等于CorePoolSize,那么将线程加入到BlockingQueue。
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))//recheck防止线程池状态的突变,如果突变,那么将reject线程,防止workQueue中增加新线程
                reject(command);
            else if (workerCountOf(recheck) == 0)//上下两个操作都有addWorker的操作,但是如果在workQueue.offer的时候Worker变为0,
              //那么将没有Worker执行新的task,所以增加一个Worker.   addWorker(null, false);
        }
        //如果workQueue满了,那么这时候可能还没到线程池的maxnum,所以尝试增加一个Worker
        else if (!addWorker(command, false))
            reject(command);//如果Worker数量到达上限,那么就拒绝此线程
    }

5. Три типа блокирующих очередей

BlockingQueue workQueue = null; workQueue = new ArrayBlockingQueue(5);//Очередь на основе массива "первым пришел - первым обслужен", ограничена workQueue = new LinkedBlockingQueue();//Очередь в порядке очереди на основе связанного списка, без ограничений workQueue = new SynchronousQueue();//Небуферизованная очередь ожидания, неограниченная

  1. Если количество потоков не достигает corePoolSize, создайте новый поток (основной поток) для выполнения задачи
  2. Когда количество потоков достигает corePools, задача перемещается в очередь и ожидает
  3. Очередь заполнена, новый поток (не основной поток) выполняет задачу
  4. Когда очередь заполнена и общее количество потоков достигает максимального размера пула, (RejectedExecutionHandler) будет сгенерировано исключение.

Новый поток -> количество достигнутых ядер -> очередь присоединения -> новый поток (не основной) -> достигнуто максимальное количество -> политика отклонения срабатывания

5. Четыре стратегии отказа

  1. AbortPolicy: прервать задачу и выдать RejectedExecutionException
 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
  1. DiscardPolicy: Отменить задачу без создания исключения.
  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
  1. DisCardOldSetPolicy: отменить задачу в начале очереди, а затем отправить новую задачу.
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
  1. CallerRunPolicy: задача обрабатывается вызывающим потоком (поток, отправивший задачу, основной поток).
 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }

5. Java предоставляет 5 пулов потоков через Executors

В настоящее время исполнители предоставляют 5 различных конфигураций создания пула потоков:

  • newCachedThreadPool(), который используется для обработкибольшое количество короткого времениПул потоков рабочих задач имеет несколько отличительных характеристик: он пытается кэшировать потоки и повторно использовать их, а когда кэшированные потоки недоступны, создаются новые рабочие потоки; если поток простаивал более60секунд он завершается и удаляется из кеша; при длительном бездействии этот пул потоков не будет потреблять никаких ресурсов. Внутри он использует SynchronousQueue в качестве рабочей очереди.
  • newFixedThreadPool(int nThreads), повторно использует указанное количество (nThreads) потоков, за которыми используетсяНеограниченныйРабочая очередь с не более чем активными рабочими потоками nThreads в любой момент времени. Это означает, что если количество задач превышает количество активных очередей, он будет ожидать появления незанятого потока в рабочей очереди; если рабочий поток выйдет, будет создан новый рабочий поток, чтобы восполнить указанное количество nThreads. .
  • newSingleThreadExecutor(), отличающийся тем, что количество рабочих потоков ограничено 1, выполняющихНеограниченныйРабочая очередь, поэтому она гарантирует, что все задачи выполняются последовательно, максимум одна задача активна, и не позволяет пользователям изменять экземпляр пула потоков, поэтому можно избежать изменения количества потоков.
  • newSingleThreadScheduledExecutor() и newScheduledThreadPool(int corePoolSize), который создает ScheduledExecutorService, который может бытьвременная или периодическаяграфик работы, разницаОдин рабочий поток или несколько заданийнить.
  • newWorkStealingPool(int parallelism), это пул потоков, который часто упускается из виду.В Java 8 добавлен этот метод создания, который будет внутренне создаватьForkJoinPool,использоватьWork-StealingАлгоритмы, обрабатывающие задачи параллельно, без гарантии порядка обработки.

6. Настройки параметров пула потоков

Настройка параметров напрямую связана с нагрузкой на систему.Важными параметрами нагрузки на систему являются:

  • tasks, количество задач, подлежащих обработке в секунду (для системных требований)
  • threadtasks, количество задач, которые каждый поток может обработать на одну заметку (для самого потока)
  • время отклика, система допускает максимальное время отклика задачи, например, время отклика каждой задачи не должно превышать 2 секунды.

corePoolSize

В системе есть задачи, которые должны обрабатываться в секунду, поэтому каждый поток может обрабатывать задачи threadtasks на заметку. , требуемое количество потоков: tasks/threadtasks, то есть количество потоков tasks/threadtasks.

Предполагая, что количество задач в секунду в системе 100~1000, а каждый поток может обрабатывать 10 задач на одну банкноту, требуется от 100/10 до 1000/10, то есть 10~100 потоков. Тогда corePoolSize нужно поставить больше 10, а конкретное число лучше всего исходя из принципа 8020, т.к. количество задач в секунду в системе 100~1000, то есть в 80% случаев количество задач в секунду в системе меньше 1000*20%=200, тогда corePoolSize можно поставить 200/10=20.

queueCapacity

Длина очереди задач зависит от количества основных потоков и требований системы к времени отклика задачи. Длина очереди может быть установлена ​​равной количеству задач, обрабатываемых всеми основными потоками в секунду * время отклика каждой задачи = общее время отклика задач в секунду, т.е. (corePoolSize*threadtasks)время отклика: (2010)*2=400, то есть длину очереди можно установить равной 400.

maxPoolSize

Когда загрузка системы достигает максимального значения, количество потоков ядра уже не может вовремя обрабатывать все задачи, в это время необходимо увеличить количество потоков. 200 задач в секунду требуют 20 потоков, затем при достижении 1000 задач в секунду, (tasks - queueCapacity)/threadtasks равно (1000-400)/10, что равно 60 потокам, а maxPoolSize можно установить равным 60.

Если задана слишком большая длина очереди, время ответа задачи будет слишком большим, не пишите следующее:

LinkedBlockingQueue queue = new LinkedBlockingQueue();

На самом деле это установка длины очереди на Integer.MAX_VALUE, что приведет к тому, что количество потоков всегда будет равно corePoolSize и больше никогда не увеличится.Когда количество задач резко увеличивается, время ответа задачи также резко увеличивается.

keepAliveTime

Когда нагрузка снижается, количество потоков может быть уменьшено.Когда время простоя потока превышает keepAliveTime, ресурсы потока будут автоматически освобождены. По умолчанию пул потоков останавливает лишние потоки и сохраняет потоки не менее corePoolSize.

allowCoreThreadTimeout

По умолчанию основной поток не завершается, вы можете закрыть основной поток, установив для этого параметра значение true.

Вообще говоря, считается, что эмпирическое значение размера пула потоков должно быть установлено следующим образом: (где N — количество ЦП)

  • Если это приложение с интенсивным использованием ЦП, размер пула потоков устанавливается равным N+1.
  • Если это приложение с интенсивным вводом-выводом, размер пула потоков устанавливается равным 2N+1.

7. Пять состояний пула потоков

  1. Состояние инициализации пула потоков — RUNNING, который может получать новые задачи и обрабатывать добавленные задачи.
  2. Когда пул потоков находится в состоянии SHUTDOWN, он не получает новых задач, но может обрабатывать добавленные задачи. При вызове интерфейса shutdown() пула потоков пул потоков изменяется с RUNNING -> SHUTDOWN.
  3. Когда пул потоков находится в состоянии STOP, он не получает новых задач, не обрабатывает добавленные задачи и прерывает выполнение задач. При вызове интерфейса shutdownNow() пула потоков пул потоков изменяется с (RUNNING или SHUTDOWN) -> STOP.
  4. Когда все задачи будут завершены, «количество задач», записанное ctl, равно 0, и пул потоков перейдет в состояние TIDYING. Когда пул потоков перейдет в состояние TIDYING, будет выполнена функция-ловушка terminated(). в классе ThreadPoolExecutor пусто. Если пользователь хочет выполнить соответствующую обработку, когда пул потоков становится TIDYING, это может быть достигнуто путем перегрузки функции terminated().
  5. Когда пул потоков находится в состоянии SHUTDOWN, очередь блокировки пуста и задачи, выполняемые в пуле потоков, также пусты, это будет SHUTDOWN -> TIDYING.
  6. Когда пул потоков находится в состоянии STOP и задачи, выполняемые в пуле потоков, пусты, это будет STOP -> TIDYING.

Когда пул потоков полностью завершен, он переходит в состояние TERMINATED. Когда пул потоков находится в состоянии TIDYING, после выполнения функции terminated() он будет TIDYING -> TERMINATED.

8. Закройте пул потоков

Пул потоков предоставляет два метода закрытия пула потоков: shutdownDown() и shutdownNow().

shutDown()

Когда пул потоков вызывает этот метод, состояние пула потоков немедленно изменяется на состояние SHUTDOWN. На этом этапе вы не можете больше добавлять задачи в пул потоков, иначе будет выброшено исключение RejectedExecutionException. Однако в это время пул потоков не завершится немедленно и не завершится до тех пор, пока задачи, добавленные в пул потоков, не будут обработаны.

shutdownNow()

Согласно описанию документации JDK, общий смысл таков: когда этот метод выполняется, состояние пула потоков немедленно меняется на состояние STOP, и предпринимаются попытки остановить все выполняющиеся потоки и больше не обрабатывать задачи, все еще ожидающие в пуле. Конечно, он вернет эти невыполненные задачи.

Он пытается завершить поток, вызывая метод Thread.interrupt(), но, как мы все знаем, этот метод имеет ограниченный эффект, если в потоке нет таких приложений, как сон, ожидание, условие, временная блокировка и т. д. , метод прерывания () Текущий поток не может быть прерван. Таким образом, ShutdownNow() не означает, что пул потоков должен иметь возможность выйти немедленно, возможно, перед выходом ему придется дождаться завершения всех выполняемых задач.

9. Как установить количество потоков в различных сценариях

1. Как использовать пул потоков для предприятий с высокой степенью параллелизма и коротким временем выполнения задач?

Количество потоков в пуле потоков может быть установлено равным количеству ядер ЦП + 1, чтобы уменьшить переключение контекста потока.

2. Как использовать пул потоков для предприятий с низким уровнем параллелизма и длительным временем выполнения задач?

Это должно определить, где тратится время выполнения

  • Если рабочее время сосредоточено на операциях ввода-вывода в течение длительного времени, то есть на задачах с интенсивным вводом-выводом, поскольку операции ввода-вывода не занимают ЦП, поэтому не позволяйте всему ЦП бездействовать, вы можете соответствующим образом увеличить количество потоков в пул потоков (2 * количество ядер ЦП), что позволяет ЦП обрабатывать больше задач.

  • Если рабочее время сосредоточено на вычислительных операциях в течение длительного времени, то есть задачах, интенсивно использующих ЦП, это то же самое, что (1) количество ядер ЦП + 1, количество потоков в пуле потоков устанавливается равным меньше, а переключение контекстов потоков сокращается.

3. Как использовать пул потоков для предприятий с высокой степенью параллелизма и длительным временем выполнения бизнеса?

Ключом к решению такого типа задач является не пул потоков, а проектирование всей архитектуры.

10. Почему не рекомендуется использовать пул потоков JUC?

Этот способ обработки делает рабочие правила пула потоков более понятными и позволяет избежать риска исчерпания ресурсов.

1. newFixedThreadPool и newSingleThreadExecutor

Две основные проблемы выше заключаются в том, что накопленная очередь обработки запросов может потреблять очень большой объем памяти, даже OOM

2, новыйCachedThreadPool и новыйScheduledThreadPool

Две основные проблемы выше заключаются в том, что максимальное количество потоков равно Integer.MAX_VALUE, что может создать очень большое количество потоков, даже OOM.

11. Вопросы

1. Как реализовать отложенную смерть непрофильных потоков?

Блокируя очередь poll(), пусть поток заблокируется и подождет некоторое время, если задача не получена, поток умирает

2. Почему пул потоков может предотвратить освобождение потоков и выполнение различных задач в любое время?

for (;;) {
          
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

В бесконечном цикле рабочая очередь workQueue всегда будет принимать задачи:

  • Основной поток всегда будет застревать в методе workQueue.take(), позволяя потоку дождаться получения задачи, а затем вернуться.
  • Неосновной поток будет workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS). Если время ожидания не было получено, следующий цикл оценивает, compareAndDecrementWorkerCount вернет значение null, метод run() объекта Worker оценивает тело цикла как значение null, задача завершается, а затем система повторно использует потоки.

Блокируя очередь take(), пусть поток ждет, пока задача не будет получена.

3. Как освободить основную нить?

Установите для параметра allowCoreThreadTimeOut значение true. Поэкспериментируйте со следующим кодом

{
    // 允许释放核心线程,等待时间为100毫秒
    es.allowCoreThreadTimeOut(true);
    for(......){
        // 向线程池里添加任务,任务内容为打印当前线程池线程数
        Thread.currentThread().sleep(200);
    }
}

Количество потоков всегда будет 1. Если значение allowCoreThreadTimeOut равно false, количество потоков будет постепенно достигать насыщения, а затем все будут блокироваться и ждать вместе.

4. Может ли неосновной поток стать основным потоком?

Пул потоков не различает основные потоки и неосновные потоки, а настраивает его в соответствии с текущей емкостью и состоянием пула потоков, поэтому кажется, что есть основные потоки и неосновные потоки, но на самом деле это соответствует состоянию параллелизма. ожидается пулом потоков.

5. Как Runnable выполняется в пуле потоков?

Поток выполняет Worker, а Worker постоянно получает задачи из очереди блокировки для выполнения. .

1. newFixedThreadPool (фиксированное количество пулов потоков);

используемые сцены:

Базовый пул потоков fixedThreadPool равен максимальному пулу потоков, и можно гарантировать, что текущее число потоков будет стабильным. Можно избежать частого повторного использования и создания потоков. Следовательно, он подходит для обработки ресурсоемких задач, гарантируя, что при длительном использовании ЦП рабочими потоками выделяется как можно меньше потоков, то есть он подходит для долгосрочных задач.

Недостатками этого метода являются:

После достижения максимальной емкости пула потоков, если задача завершается и освобождает занятый поток, поток остается в состоянии ожидания и не умирает до тех пор, пока следующая задача снова не займет поток. Это может использовать неограниченную очередь для хранения задач в очереди.Когда большое количество задач превышает максимальную емкость пула потоков и требует обработки, очередь увеличивается по беспроводной сети, что приводит к быстрому истощению ресурсов сервера.

2. newCachedThreadPool (пул потоков для кешируемых потоков);

используемые сцены:

Самая большая особенность newCacehedThreadPool заключается в том, что количество потоков не является фиксированным. Пока бездействующий поток простаивает дольше, чем keepAliveTime, он будет перезапущен. Если есть новая задача, проверьте, не простаивает ли какой-либо поток, и если нет, создайте новую задачу напрямую. Поэтому он подходит для краткосрочных небольших задач с нефиксированным параллелизмом.

Недостатками этого метода являются:

В пуле потоков нет максимального количества потоков. Если одновременно отправляется большое количество задач, может быть создано слишком много потоков, и ресурсы будут исчерпаны.