Интервьюер: Скажите, сколько стратегий отказа от пула потоков вы знаете?

Java

предисловие

Пул потоков, я полагаю, что многие люди использовали его, и те, кто не использовал его, также изучили его. Однако считается, что политика отклонения пулов потоков известна гораздо меньшему количеству людей.

Четыре стратегии отклонения пула потоков

Когда очередь кэша задач пула потоков заполнена и количество потоков в пуле потоков достигает максимального размера пула, если еще есть поступающие задачи, будет принята стратегия отклонения задачи.Обычно существуют следующие четыре стратегии:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务
ThreadPoolExecutor.CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务

Политика отклонения пула потоков по умолчанию

Поскольку есть четыре стратегии отказа на выбор, какова стратегия отказа по умолчанию для пула потоков? Глядя на исходный код класса java.util.concurrent.ThreadPoolExecutor, мы видим:

/**
 * The default rejected execution handler
 */
private static final RejectedExecutionHandler defaultHandler =
    new AbortPolicy();

Политика отклонения по умолчанию для пула потоков — AbortPolicy, то есть задача отбрасывается и создается исключение RejectedExecutionException. Мы можем проверить это с помощью кода, который выглядит следующим образом:

public class ThreadPoolTest {

    public static void main(String[] args) {

        BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100);
        ThreadFactory factory = r -> new Thread(r, "test-thread-pool");
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5,
                0L, TimeUnit.SECONDS, queue, factory);
        while (true) {
            executor.submit(() -> {
                try {
                    System.out.println(queue.size());
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }

}

Вот пул потоков по умолчанию, политика отклонения не задана, а максимальная очередь потоков установлена ​​на 100. Запустите код:

Результат соответствует ожидаемому, что также доказывает, что политикой отклонения по умолчанию для пула потоков является ThreadPoolExecutor.AbortPolicy: прервать задачу и создать исключение RejectedExecutionException.

Установить политику отклонения пула потоков

Если мы хотим установить другие стратегии отклонения пула потоков в соответствии с реальными бизнес-сценариями, мы можем установить их через перегруженный конструктор ThreadPoolExecutor:

Я считаю, что в текущей разработке все используют Spring.На самом деле, мы также можем создать пул потоков через org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor, предоставленный Spring. следующее:

@Configuration
public class TaskExecutorConfig implements AsyncConfigurer {
    /**
     * Set the ThreadPoolExecutor's core pool size.
     */
    private static final int CORE_POOL_SIZE = 5;
    /**
     * Set the ThreadPoolExecutor's maximum pool size.
     */
    private static final int MAX_POOL_SIZE = 5;
    /**
     * Set the capacity for the ThreadPoolExecutor's BlockingQueue.
     */
    private static final int QUEUE_CAPACITY = 1000;

    /**
     * 通过重写getAsyncExecutor方法,制定默认的任务执行由该方法产生
     * <p>
     * 配置类实现AsyncConfigurer接口并重写getAsyncExcutor方法,并返回一个ThreadPoolTaskExevutor
     * 这样我们就获得了一个基于线程池的TaskExecutor
     */
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(CORE_POOL_SIZE);
        taskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
        taskExecutor.setQueueCapacity(QUEUE_CAPACITY);
        taskExecutor.initialize();
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        return taskExecutor;
    }
}

Вы можете установить политику отклонения через setRejectedExecutionHandler ThreadPoolTaskExecutor.

Анализ сценариев политики отказа

AbortPolicy

ThreadPoolExecutor.AbortPolicy: прерывает задачу и генерирует исключение RejectedExecutionException.

A handler for rejected tasks that throws a {@code RejectedExecutionException}.

Это политика отклонения по умолчанию для пула потоков.Когда задача больше не может быть отправлена, выдается исключение, и состояние выполнения программы сообщается вовремя. Если это относительно важный бизнес, рекомендуется использовать эту стратегию отклонения, чтобы, когда система не может поддерживать больший объем параллелизма, исключение можно было обнаружить вовремя.

DiscardPolicy

ThreadPoolExecutor.DiscardPolicy: отбрасывает задачи, но не генерирует исключения. Если очередь потоков заполнена, последующие отправленные задачи будут автоматически отброшены.

A handler for rejected tasks that silently discards therejected task.

Использование этой стратегии может помешать нам обнаружить аномальные состояния системы. Рекомендуется, чтобы некоторые несущественные предприятия приняли эту стратегию. Например, я использую эту стратегию отклонения, чтобы подсчитать количество прочтений моего блога.

DiscardOldestPolicy

ThreadPoolExecutor.DiscardOldestPolicy: отменить задачу в начале очереди, а затем повторно отправить отклоненную задачу.

A handler for rejected tasks that discards the oldest unhandled request and then retries {@code execute}, unless the executor is shut down, in which case the task is discarded.

Эта стратегия отказа — своего рода стратегия отказа, которая любит новое и ненавидит старое. Следует тщательно взвесить, следует ли принять такую ​​стратегию отказа, исходя из того, позволяет ли реальный бизнес отказаться от старых задач.

CallerRunsPolicy

ThreadPoolExecutor.CallerRunsPolicy: задача обрабатывается вызывающим потоком.

A handler for rejected tasks that runs the rejected task directly in the calling thread of the {@code execute} method, unless the executor has been shut down, in which case the task is discarded.

Если задача отклонена, вызывающий поток (поток, отправивший задачу) выполняет задачу напрямую, мы можем проверить это с помощью кода:

Измените предыдущий код следующим образом:

public static void main(String[] args) {

    BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(10);
    ThreadFactory factory = r -> new Thread(r, "test-thread-pool");
    ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5,
                                                         0L, TimeUnit.SECONDS, queue, factory, new ThreadPoolExecutor.CallerRunsPolicy());
    for (int i = 0; i < 1000; i++) {
        executor.submit(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + ":执行任务");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}

Измените максимальное значение очереди на 10 и распечатайте имя потока. Результат выполнения следующий:

Из результатов видно, что основной поток main также выполняет задачу, что показывает, что эта политика отклонения непосредственно выполняется вызывающим потоком (потоком, который отправил задачу) для отброшенной задачи.

Суммировать

В этой статье представлены и демонстрируются четыре стратегии отклонения пула потоков. Выбор стратегии зависит от реального бизнес-сценария.