Требования к собеседованию: анализ пула потоков Java

Java

предисловие

Владение пулом потоков является основным требованием для бэкенд-программистов, и я полагаю, что почти всех спросят о пуле потоков во время собеседования. Я собрал в Интернете несколько классических вопросов для интервью с пулами потоков и использовал их в качестве отправной точки, чтобы рассказать о своем понимании пулов потоков. Если есть какое-то недопонимание, очень надеюсь, что вы на него укажете, а потом давайте разбирать и изучать вместе.

гитхаб-адрес

GitHub.com/Я бы хотел 123/Java…

Классические вопросы для интервью

  • Вопрос интервью 1: Позвольте мне поговорить о пуле потоков Java, как работает функция каждого параметра?
  • Вопрос интервью 2: Согласно внутреннему механизму пула потоков, при отправке новой задачи, какие исключения следует учитывать.
  • Вопрос интервью 3: Какие виды рабочих очередей существуют в пуле потоков?
  • Вопрос интервью 4: Будут ли пулы потоков с неограниченными очередями вызывать всплески памяти?
  • Интервью Вопрос 5: Расскажите мне о нескольких распространенных пулах потоков и сценариях использования?

концепция пула потоков

Пул потоков:Проще говоря, это пул для управления потоками.

  • Это помогает нам управлять потоками и избегать увеличения потребления ресурсов при создании и уничтожении потоков.. Поскольку поток на самом деле является объектом, для создания объекта ему необходимо пройти через процесс загрузки класса, уничтожить объект и пройти через процесс сборки мусора GC, что требует дополнительных ресурсов.
  • Улучшить отзывчивость.Если задача прибывает, это должно быть намного медленнее, чем получение потока из пула потоков и повторное создание потока для выполнения.
  • повторное использование.После того, как поток израсходован, он помещается обратно в пул, что позволяет добиться эффекта повторного использования и экономии ресурсов.

Создание пула потоков

Пулы потоков могут быть созданы с помощью ThreadPoolExecutor, давайте взглянем на его конструктор:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime,TimeUnit unit,
   BlockingQueue<Runnable> workQueue,
   ThreadFactory threadFactory,
   RejectedExecutionHandler handler) 

Роль нескольких основных параметров:

  • corePoolSize:Максимальное количество основных потоков в пуле потоков
  • максимальный размер пула:Максимальное количество потоков в пуле потоков
  • держать в живых:Размер времени простоя неосновных потоков в пуле потоков
  • единица измерения:Единица времени выживания потока в режиме простоя
  • рабочая очередь:Блокирующая очередь для хранения задач
  • нитьФабрика:Используется для установки фабрики для создания потоков.Вы можете задавать осмысленные имена для созданных потоков, что удобно для устранения неполадок.
  • обработчик:Существует четыре основных типа событий стратегии насыщения в линейных городах.

выполнение задачи

Процесс выполнения пула потоков, соответствующий методу execute():

  • Когда задача отправлена ​​и количество основных потоков, оставшихся в пуле потоков, меньше, чем количество потоков corePoolSize, пул потоков создаст основной поток для обработки отправленной задачи.
  • Если количество основных потоков в пуле потоков заполнено, то есть число потоков равно corePoolSize, вновь отправленная задача будет помещена в очередь задач workQueue для постановки в очередь на выполнение.
  • Когда количество выживших потоков в пуле потоков равно corePoolSize, а очередь задач workQueue заполнена, определить, достигает ли количество потоков максимальное значениеPoolSize, то есть заполнено ли максимальное количество потоков, и если нет, создать неосновной поток для выполнения отправленной задачи.
  • Если текущее количество потоков достигает максимального размера пула и появляются новые задачи, политика отклонения будет использоваться напрямую.

Четыре стратегии отказа

  • AbortPolicy (выдает исключение, по умолчанию)
  • DiscardPolicy (отбрасывать задачи напрямую)
  • DiscardOldestPolicy (отбросить самую старую задачу в очереди и продолжить отправку текущей задачи в пул потоков)
  • CallerRunsPolicy (передается потоку, в котором находится вызов пула потоков для обработки)

Чтобы наглядно описать выполнение пула потоков, я использую аналогию:

  • Основная нить уподобляется штатным сотрудникам компании.
  • Непрофильные потоки уподобляются аутсорсинговым сотрудникам
  • Очереди блокировки сравниваются с пулами запросов
  • Отправка задачи похожа на запрос
  • Когда продукт выдвигает требование, формальный сотрудник (основной поток) первым принимает требование (выполняет задачу).
  • Если все штатные сотрудники востребованы, то есть количество основных потоков заполнено), продукт сначала поместит спрос в пул запросов (очередь блокировки).
  • Если пул спроса (очередь блокировки) также заполнен, но продукт в это время продолжает повышать спрос, что делать? Затем, пожалуйста, отдайте это на аутсорсинг (неосновные потоки).
  • Если все сотрудники (с максимальным числом потоков также заполнены) работают по требованию, выполните политику отклонения.
  • Если аутсорсинговый сотрудник выполняет требование, он покидает компанию после периода (keepAliveTime) простоя.

Хорошо, поехали.Вопрос интервью 1-> Пул потоков Java, как работает функция каждого параметра?Это было решено? Я думаю, что этот вопрос, ответ:CorePoolSize, maxPoolSize и другие параметры конструктора пула потоков и могут четко описать процесс выполнения пула потоков.Вот об этом.

Обработка исключений пула потоков

При использовании пула потоков для обработки задач код задачи может вызвать исключение RuntimeException. После создания исключения пул потоков может перехватить его или создать новый поток для замены ненормального потока. Мы можем не заметить, что задача запущена. ненормально, поэтому нам нужно учитывать исключения пула потоков.

При отправке новой задачи, как обрабатывать исключения?

Давайте сначала посмотрим на кусок кода:

       ExecutorService threadPool = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            threadPool.submit(() -> {
                System.out.println("current thread name" + Thread.currentThread().getName());
                Object object = null;
                System.out.print("result## "+object.toString());
            });
        }

Очевидно, что в этом коде будут исключения, давайте посмотрим на результаты выполнения

Хотя результат не выводится, исключение не генерируется, поэтому мы не можем понять, что в задаче возникло исключение, поэтому нам нужно добавить try/catch. Как показано ниже:

ОК, обработка исключений потока,Мы можем напрямую попробовать... поймать захват.

Процесс выполнения пула потоков exec.submit(runnable)

Выше есть ненормальный метод отправки через отладку (Я предлагаю вам также перейти к отладке, чтобы посмотреть Каждый метод на картинке, где я прервал.), основная блок-схема выполнения аномального метода отправки:

  //构造feature对象
  /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
     public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
       public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }
    //线程池执行
     public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
               int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
    //捕获异常
    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }

Благодаря приведенному выше анализу,Задача, выполняемая submit, может получить выброшенное исключение через метод get объекта Future, а затем обработать его.Давайте рассмотрим демонстрацию, чтобы увидеть, как метод get объекта Future обрабатывает исключения, как показано ниже:

Два других решения для обработки исключений пула потоков

В дополнение к вышеперечисленному1. Отловить исключение в коде задачи try/catch, 2. Получить выброшенное исключение через метод get объекта Future, а затем обработать егоВ дополнение к двум вариантам, есть также два вышеуказанных варианта:

3. Установите UncaughtExceptionHandler для рабочего потока и обработайте исключение в методе uncaughtException.

Давайте посмотрим непосредственно на правильную осанку, достигнутую вот так:

ExecutorService threadPool = Executors.newFixedThreadPool(1, r -> {
            Thread t = new Thread(r);
            t.setUncaughtExceptionHandler(
                    (t1, e) -> {
                        System.out.println(t1.getName() + "线程抛出的异常"+e);
                    });
            return t;
           });
        threadPool.execute(()->{
            Object object = null;
            System.out.print("result## " + object.toString());
        });

результат операции:

4. Перепишите метод afterExecute класса ThreadPoolExecutor для обработки переданной ссылки на исключение.

Вот демонстрация документации jdk:

class ExtendedExecutor extends ThreadPoolExecutor {
    // 这可是jdk文档里面给的例子。。
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t == null && r instanceof Future<?>) {
            try {
                Object result = ((Future<?>) r).get();
            } catch (CancellationException ce) {
                t = ce;
            } catch (ExecutionException ee) {
                t = ee.getCause();
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt(); // ignore/reset
            }
        }
        if (t != null)
            System.out.println(t);
    }
}}

Итак, когда вас спрашивают об обработке исключений пула потоков, как ответить?

.

Очередь работы пула потоков

Какие рабочие очереди существуют в пуле потоков?

  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • DelayQueue
  • PriorityBlockingQueue
  • SynchronousQueue

ArrayBlockingQueue

ArrayBlockingQueue (ограниченная очередь) — это ограниченная блокирующая очередь, реализованная с помощью массива, отсортированного по принципу FIFO.

LinkedBlockingQueue

LinkedBlockingQueue (может быть установлена ​​очередь емкости) — это очередь блокировки, основанная на структуре связанного списка, сортировка задач в соответствии с FIFO, и емкость может быть установлена ​​опционально.Если не задано, это будет неограниченная очередь блокировки с максимальной длиной Integer .MAX_VALUE, а пропускная способность обычно высока В ArrayBlockingQuene; пул потоков newFixedThreadPool использует эту очередь

DelayQueue

DelayQueue (отложенная очередь) — это очередь отложенного выполнения задачи на определенный период времени. Сортировать по указанному времени выполнения от меньшего к большему, иначе сортировать по порядку постановки в очередь. Пул потоков newScheduledThreadPool использует эту очередь.

PriorityBlockingQueue

PriorityBlockingQueue — неограниченная очередь блокировки с приоритетом;

SynchronousQueue

SynchronousQueue (синхронная очередь) — блокирующая очередь, в которой не хранятся элементы.Каждая операция вставки должна ждать, пока другой поток не вызовет операцию удаления, иначе операция вставки всегда находится в состоянии блокировки, а пропускная способность обычно выше, чем у LinkedBlockingQuene. Пул потоков newCachedThreadPool использует эту очередь.

По вопросам интервью:Какие рабочие очереди существуют в пуле потоков?Я думаю,Ответьте на приведенные выше типы ArrayBlockingQueue, LinkedBlockingQueue, SynchronousQueue и т. д., укажите их характеристики и используйте общие пулы потоков соответствующих очередей (например, пул потоков newFixedThreadPool с использованием LinkedBlockingQueue) для расширения и уточнения,Вот и все.

Несколько часто используемых пулов потоков

  • newFixedThreadPool (пул потоков с фиксированным количеством потоков)
  • newCachedThreadPool (пул кэшируемых потоков)
  • newSingleThreadExecutor (однопоточный пул потоков)
  • newScheduledThreadPool (пул потоков с временным и периодическим выполнением)

newFixedThreadPool

  public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

Особенности пула потоков:

  • Количество основных потоков совпадает с максимальным количеством потоков.
  • Нет так называемого non-idle time, т.е. keepAliveTime равно 0
  • Очередь блокировки — это неограниченная очередь LinkedBlockingQueue

Рабочий механизм:

  • Отправить задачу
  • Если количество потоков меньше основного потока, создайте основной поток для выполнения задачи.
  • Если количество потоков равно количеству основных потоков, добавьте задачу в очередь блокировки LinkedBlockingQueue.
  • Если поток завершает выполнение задачи, перейдите в очередь блокировки, чтобы получить задачу и продолжить выполнение.

пример кода

   ExecutorService executor = Executors.newFixedThreadPool(10);
                    for (int i = 0; i < Integer.MAX_VALUE; i++) {
                        executor.execute(()->{
                            try {
                                Thread.sleep(10000);
                            } catch (InterruptedException e) {
                                //do nothing
                            }
            });

IDE указывает параметры JVM: -Xmx8m -Xms8m :

Запуск приведенного выше кода вызовет OOM:

следовательно,Вопрос из интервью: будут ли пулы потоков с неограниченными очередями вызывать всплески памяти?

Отвечать: Да, newFixedThreadPool использует неограниченную очередь блокировки LinkedBlockingQueue.Если поток получает задачу, время выполнения задачи относительно велико (например, в приведенной выше демонстрации установлено значение 10 секунд), что приведет к накоплению очереди задач. все больше и больше, что приводит к машинной памяти. Использование продолжает расти,В конечном итоге приводит к OOM.

сцены, которые будут использоваться

FixedThreadPool подходит для обработки ресурсоемких задач, гарантируя, что при длительном использовании ЦП рабочими потоками выделяется как можно меньше потоков, то есть подходит для выполнения долгосрочных задач.

newCachedThreadPool

   public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

Особенности пула потоков:

  • Количество основных потоков равно 0
  • Максимальное количество потоков — Integer.MAX_VALUE.
  • Очередь блокировки — SynchronousQueue.
  • Время простоя неосновных потоков составляет 60 секунд.

Когда скорость отправки задач больше, чем скорость обработки задач, каждый раз при отправке задачи обязательно создается поток. В крайних случаях создается слишком много потоков, истощающих ресурсы ЦП и памяти. Поскольку потоки, бездействующие в течение 60 секунд, завершаются, CachedThreadPool, который остается бездействующим в течение длительного времени, не будет потреблять никаких ресурсов.

Рабочий механизм

  • Отправить задачу
  • Поскольку основных потоков нет, задачи добавляются непосредственно в очередь SynchronousQueue.
  • Определите, есть ли простаивающий поток, и если да, вынесите задачу на выполнение.
  • Если нет незанятого потока, создайте новый поток для выполнения.
  • Поток, выполнивший задачу, еще может существовать 60 секунд, если он получает задачу в течение этого периода, он может продолжать жить, в противном случае он уничтожается.

пример кода

  ExecutorService executor = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            executor.execute(() -> {
                System.out.println(Thread.currentThread().getName()+"正在执行");
            });
        }

результат операции:

сцены, которые будут использоваться

Используется для одновременного выполнения большого количества краткосрочных небольших задач.

newSingleThreadExecutor

  public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

Функции пула потоков

  • Количество основных потоков равно 1.
  • Максимальное количество потоков также равно 1.
  • Очередь блокировки — LinkedBlockingQueue.
  • KeepAliveTime равно 0

Рабочий механизм

  • Отправить задачу
  • Есть ли поток в пуле потоков, если нет, создайте новый поток для выполнения задачи
  • Если да, добавьте задачу в очередь блокировки
  • Единственный текущий поток, берет задачи из очереди, выполняет одну, а потом продолжает выборку, круглосуточно работает один человек (один поток).

пример кода

  ExecutorService executor = Executors.newSingleThreadExecutor();
                for (int i = 0; i < 5; i++) {
                    executor.execute(() -> {
                        System.out.println(Thread.currentThread().getName()+"正在执行");
                    });
        }

результат операции:

сцены, которые будут использоваться

Он подходит для сценариев, в которых задачи выполняются последовательно, по одной задаче за раз.

newScheduledThreadPool

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

Функции пула потоков

  • Максимальное количество потоков — Integer.MAX_VALUE.
  • Очередь блокировки — DelayedWorkQueue.
  • KeepAliveTime равно 0
  • scheduleAtFixedRate(): выполнять периодически с определенной скоростью
  • scheduleWithFixedDelay(): выполнить после определенной задержки

Рабочий механизм

  • добавить задачу
  • Потоки в пуле потоков берут задачи из DelayQueue.
  • Поток получает задачу, время которой больше или равно текущему времени из DelayQueue.
  • После выполнения измените время этой задачи на время, которое будет выполнено в следующий раз.
  • Эта задача возвращается в очередь DelayQueue.

пример кода

    /**
    创建一个给定初始延迟的间隔性的任务,之后的下次执行时间是上一次任务从执行到结束所需要的时间+* 给定的间隔时间
    */
    ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
        scheduledExecutorService.scheduleWithFixedDelay(()->{
            System.out.println("current Time" + System.currentTimeMillis());
            System.out.println(Thread.currentThread().getName()+"正在执行");
        }, 1, 3, TimeUnit.SECONDS);

результат операции:

    /**
    创建一个给定初始延迟的间隔性的任务,之后的每次任务执行时间为 初始延迟 + N * delay(间隔) 
    */
    ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
            scheduledExecutorService.scheduleAtFixedRate(()->{
            System.out.println("current Time" + System.currentTimeMillis());
            System.out.println(Thread.currentThread().getName()+"正在执行");
        }, 1, 3, TimeUnit.SECONDS);;

сцены, которые будут использоваться

Сценарии, в которых задачи выполняются периодически, и сценарии, в которых необходимо ограничить количество потоков.

Вернемся к вопросам интервью:Расскажите о нескольких распространенных пулах потоков и сценариях использования?

Ответьте на эти четыре классических пула потоков: newFixedThreadPool, newSingleThreadExecutor, newCachedThreadPool, newScheduledThreadPool, характеристики пула подпотоков, механизм работы, сценарии использования описываются отдельно, а затем анализируются возможные проблемы, такие как проблема парения памяти newFixedThreadPoolПросто

состояние пула потоков

Пул потоков имеет следующие состояния: RUNNING, SHUTDOWN, STOP, TIDYING, TERMINATED.

   //线程池状态
   private static final int RUNNING    = -1 << COUNT_BITS;
   private static final int SHUTDOWN   =  0 << COUNT_BITS;
   private static final int STOP       =  1 << COUNT_BITS;
   private static final int TIDYING    =  2 << COUNT_BITS;
   private static final int TERMINATED =  3 << COUNT_BITS;

Диаграмма переключения состояний пула потоков:

RUNNING

  • Пул потоков в этом состоянии будет получать новые задачи и обрабатывать задачи в очереди блокировки;
  • Вызовите метод shutdown() пула потоков, чтобы переключиться в состояние SHUTDOWN;
  • Вызовите метод shutdownNow() пула потоков, чтобы переключиться в состояние STOP;

SHUTDOWN

  • Пул потоков в этом состоянии не будет получать новые задачи, но будет обрабатывать задачи в очереди блокировки;
  • Очередь пуста, и задачи, выполняемые в пуле потоков, также пусты, переходя в состояние TIDYING;

STOP

  • Потоки в этом состоянии не будут получать новые задачи, не будут обрабатывать задачи в очереди блокировки и будут прерывать запущенные задачи;
  • Задача, выполняемая в пуле потоков, пуста и переходит в состояние TIDYING;

TIDYING

  • Это состояние указывает на то, что все задачи были завершены, а количество записанных задач равно 0.
  • После завершения () войдите в состояние TERMINATED

TERMINATED

  • Это состояние указывает на то, что пул потоков полностью завершен.

Ссылка и спасибо

Личный публичный аккаунт