Как изящно использовать и понимать пулы потоков

интервью Java задняя часть Spring
Как изящно использовать и понимать пулы потоков

предисловие

Детская обувь, которая обычно подвергается многопоточной разработке, должна иметь более или менее знания о пулах потоков.

Важность видимого пула потоков.

Проще говоря, использование пулов потоков преследует следующие цели:

  • Потоки — дефицитные ресурсы, и их нельзя создавать часто.
  • Развязка; создание потоков полностью отделено от выполнения, что удобно для обслуживания.
  • Его следует поместить в пул, который может быть повторно использован другими задачами.

Принцип пула потоков

Когда дело доходит до пулов потоков, вы будете думать о технологии пулов.Основная идея состоит в том, чтобы поместить драгоценные ресурсы в пул, извлекать их из него каждый раз, когда вы его используете, и возвращать его в пул, чтобы другие могли использовать его после того, как вы используете. это значение.

Как это реализовано в Java?

Связанные API были запущены после JDK 1.5.Обычные способы создания пулов потоков следующие:

  • Executors.newCachedThreadPool(): бесконечный пул потоков.
  • Executors.newFixedThreadPool(nThreads): создание пула потоков фиксированного размера.
  • Executors.newSingleThreadExecutor(): создать пул потоков для одного потока.

На самом деле, если вы посмотрите на исходный код, созданный этими тремя способами, вы обнаружите:

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

на самом деле использоватьThreadPoolExecutorкласс реализован.

Итак, давайте сосредоточимся наThreadPoolExecutorкак играть.

Первый — это API, который создает поток:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) 

Роль этих основных параметров:

  • corePoolSize— базовый размер пула потоков.
  • maximumPoolSizeМаксимальный размер потока для пула потоков.
  • keepAliveTimeиunitЭто время выживания после простоя потока.
  • workQueueБлокирующая очередь для хранения задач.
  • handlerСтратегия насыщения, когда и очередь, и максимальный пул потоков заполнены.

Разобравшись с этими параметрами, давайте взглянем на реальное приложение.

Обычно мы используем:

threadPool.execute(new Job());

Таким образом, задача отправляется в пул потоков, поэтому основная логикаexecute()функция.

Перед конкретным анализом давайте разберемся в состояниях, определенных в пуле потоков, Эти состояния тесно связаны с выполнением потока:

  • RUNNINGЕстественно, это состояние выполнения, что означает, что задачи в очереди выполнения задач могут быть приняты.
  • SHUTDOWNотносится к вызовуshutdown()метод, больше не принимают новые задачи, но задачи в очереди должны быть выполнены.
  • STOPотносится к вызовуshutdownNow()метод, больше не принимать новые задачи, в то же время отбрасывать все задачи в очереди блокировки и прерывать все выполняемые задачи.
  • TIDYINGВсе задачи выполнены, при вызовеshutdown()/shutdownNow()попытается обновить до этого состояния.
  • TERMINATEDЗавершенное состояние при выполненииterminated()Позже он будет обновлен до этого статуса.

Графически представлен как:

тогда посмотри наexecute()Как обрабатываются методы:

  1. Получить состояние текущего пула потоков.
  2. Создайте новый поток для запуска, когда текущее количество потоков меньше, чем coreSize.
  3. Если текущий поток выполняется и запись в очередь блокировки выполняется успешно.
  4. Дважды проверьте и снова получите состояние потока; если состояние потока изменится (незапущенное состояние), вам нужно удалить задачу из очереди блокировки и попытаться определить, полностью ли выполнен поток. Также применяйте политику отказа.
  5. Если текущий пул потоков пуст, создается и выполняется новый поток.
  6. Если решение на третьем шаге не выполняется, попробуйте создать новый поток и, если это не удастся, выполните политику отклонения.

Вот картинка из "Chat Concurrency", описывающая этот процесс:

Как настроить потоки

После завершения процесса давайте посмотрим, как должны быть настроены основные параметры, упомянутые выше?

Одно можно сказать наверняка: чем больше пул потоков, тем лучше.

Обычно нам нужно определить по характеру выполнения этой партии задач.

  • Задачи с интенсивным вводом-выводом: поскольку потоки не выполняются все время, вы можете настроить максимально возможное количество потоков, например, количество ЦП * 2.
  • На задачи с интенсивным использованием ЦП (много сложных операций) должно выделяться меньшее количество потоков, таких как размер количества ЦП.

Конечно, это эмпирические значения, и лучший способ — протестировать наилучшую конфигурацию в соответствии с реальной ситуацией.

Изящное отключение пулов потоков

Есть запущенные задачи и естественно закрывающиеся задачи.Из пяти упомянутых выше состояний видно, как закрыть пул потоков.

На самом деле есть только два путиshutdown()/shutdownNow().

Но у них есть важные отличия:

  • shutdown()Перестаньте принимать новые задачи после выполнения, и задачи в очереди будут выполнены.
  • shutdownNow()Он также перестает принимать новые задачи, но прерывает все задачи и изменяет состояние пула потоков на остановку.

Оба метода прерывают поток, и пользователь может решить, реагировать ли на прерывание.

shutdownNow()Чтобы быть более простым и грубым, вы можете выбрать различные методы в соответствии с реальной сценой.

Обычно я закрываю пул потоков следующим образом:

        long start = System.currentTimeMillis();
        for (int i = 0; i <= 5; i++) {
            pool.execute(new Job());
        }

        pool.shutdown();

        while (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
            LOGGER.info("线程还在执行。。。");
        }
        long end = System.currentTimeMillis();
        LOGGER.info("一共处理了【{}】", (end - start));

pool.awaitTermination(1, TimeUnit.SECONDS)Будет проверять каждую секунду, завершено ли выполнение (статусTERMINATED), при выходе из цикла while указывает на то, что пул потоков полностью завершен.

SpringBoot использует пул потоков

В 2018 году популярен SpringBoot, давайте посмотрим, как настроить и использовать пулы потоков в SpringBoot.

Поскольку используется SpringBoot, естественно использовать характеристики Spring, поэтому Spring нужен, чтобы помочь нам управлять пулом потоков:

@Configuration
public class TreadPoolConfig {


    /**
     * 消费队列线程
     * @return
     */
    @Bean(value = "consumerQueueThreadPool")
    public ExecutorService buildConsumerQueueThreadPool(){
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
                .setNameFormat("consumer-queue-thread-%d").build();

        ExecutorService pool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(5),namedThreadFactory,new ThreadPoolExecutor.AbortPolicy());

        return pool ;
    }



}

при его использовании:

    @Resource(name = "consumerQueueThreadPool")
    private ExecutorService consumerQueueThreadPool;


    @Override
    public void execute() {

        //消费队列
        for (int i = 0; i < 5; i++) {
            consumerQueueThreadPool.execute(new ConsumerQueueThread());
        }

    }

На самом деле это достаточно просто, то есть создается bean-компонент пула потоков, и его можно брать прямо из Spring при его использовании.

Мониторинг пула потоков

Говоря о SpringBoot, вы также можете использовать его компонент привода для мониторинга пула потоков.

Потоки — дефицитный ресурс в любом случае, мониторинг пула потоков может знать состояние и эффективность выполнения вашей собственной задачи.

Не буду вдаваться в детали актуатора, если интересно, можете посмотреть.это, подробно описал, как предоставить конечные точки мониторинга.

Фактически, сам ThreadPool уже предоставляет множество API для получения статуса потока:

Многие методы узнают свое значение, глядя на имя.Нам нужно только предоставить эту информацию конечной точке мониторинга SpringBoot, и мы можем просмотреть текущий статус пула потоков на странице визуализации.

Даже мы можем расширить несколько функций пула потоков, чтобы настроить логику мониторинга:

Глядя на эти имена и определения, вы знаете, что это делается подклассами.

Пользовательская логика может выполняться до, после и в состоянии завершения потока.

изоляция пула потоков

Пулы потоков кажутся красивыми, но они также приносят некоторые проблемы.

Если многие из наших предприятий зависят от одного и того же пула потоков, когда одно из предприятий использует все потоки по разным неконтролируемым причинам, пул потоков полностью занят.

Таким образом, другие предприятия не смогут нормально работать, что является огромным ударом по системе.

Например, наш Tomcat принимает пул потоков запросов, предполагая, что некоторые из них отвечают очень медленно, а ресурсы потоков не могут быть переработаны и освобождены; пул потоков медленно заполняется, и в худшем случае все приложение не может предоставлять услуги.

Итак, нам нужно поместить пул потоковкарантин.

Обычной практикой является разделение по видам деятельности:

Например, задача размещения заказа использует один пул потоков, а задача получения данных — другой пул потоков. Таким образом, даже если у одной из них возникнет проблема и она исчерпает пул потоков, это не повлияет на работу других задач.

гистрикс изоляция

такая потребностьHystrixпомог нам достичь этого.

Hystrix — это отказоустойчивый подключаемый модуль с открытым исходным кодом, обладающий такими функциями, как изоляция зависимостей и отказоустойчивый переход на более раннюю версию системы.

Давайте взглянемHystrixПростое приложение:

Во-первых, необходимо определить два пула потоков, которые используются для выполнения заказов и обработки пользователей соответственно.

/**
 * Function:订单服务
 *
 * @author crossoverJie
 *         Date: 2018/7/28 16:43
 * @since JDK 1.8
 */
public class CommandOrder extends HystrixCommand<String> {

    private final static Logger LOGGER = LoggerFactory.getLogger(CommandOrder.class);

    private String orderName;

    public CommandOrder(String orderName) {


        super(Setter.withGroupKey(
                //服务分组
                HystrixCommandGroupKey.Factory.asKey("OrderGroup"))
                //线程分组
                .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("OrderPool"))

                //线程池配置
                .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
                        .withCoreSize(10)
                        .withKeepAliveTimeMinutes(5)
                        .withMaxQueueSize(10)
                        .withQueueSizeRejectionThreshold(10000))

                .andCommandPropertiesDefaults(
                        HystrixCommandProperties.Setter()
                                .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD))
        )
        ;
        this.orderName = orderName;
    }


    @Override
    public String run() throws Exception {

        LOGGER.info("orderName=[{}]", orderName);

        TimeUnit.MILLISECONDS.sleep(100);
        return "OrderName=" + orderName;
    }


}


/**
 * Function:用户服务
 *
 * @author crossoverJie
 *         Date: 2018/7/28 16:43
 * @since JDK 1.8
 */
public class CommandUser extends HystrixCommand<String> {

    private final static Logger LOGGER = LoggerFactory.getLogger(CommandUser.class);

    private String userName;

    public CommandUser(String userName) {


        super(Setter.withGroupKey(
                //服务分组
                HystrixCommandGroupKey.Factory.asKey("UserGroup"))
                //线程分组
                .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("UserPool"))

                //线程池配置
                .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
                        .withCoreSize(10)
                        .withKeepAliveTimeMinutes(5)
                        .withMaxQueueSize(10)
                        .withQueueSizeRejectionThreshold(10000))

                //线程池隔离
                .andCommandPropertiesDefaults(
                        HystrixCommandProperties.Setter()
                                .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD))
        )
        ;
        this.userName = userName;
    }


    @Override
    public String run() throws Exception {

        LOGGER.info("userName=[{}]", userName);

        TimeUnit.MILLISECONDS.sleep(100);
        return "userName=" + userName;
    }


}

apiОн очень лаконичен и прост для понимания, подробности можно найти в официальной документации.

Затем запускается симуляция:

    public static void main(String[] args) throws Exception {
        CommandOrder commandPhone = new CommandOrder("手机");
        CommandOrder command = new CommandOrder("电视");


        //阻塞方式执行
        String execute = commandPhone.execute();
        LOGGER.info("execute=[{}]", execute);

        //异步非阻塞方式
        Future<String> queue = command.queue();
        String value = queue.get(200, TimeUnit.MILLISECONDS);
        LOGGER.info("value=[{}]", value);


        CommandUser commandUser = new CommandUser("张三");
        String name = commandUser.execute();
        LOGGER.info("name=[{}]", name);
    }

результат операции:

Видно, что две задачи разделены на два пула потоков для запуска, и они не мешают друг другу.

Получение результатов задачи задачи поддерживает синхронные блокирующие и асинхронные неблокирующие методы, которые можно выбрать самостоятельно.

Принцип его реализации на самом деле легко догадаться:

Используйте карту для хранения пулов потоков, соответствующих различным службам.

Это также может быть доказано конструктором прямо сейчас:

Следует отметить еще один момент:

Пользовательская команда не является одноэлементной. Каждое выполнение требует нового экземпляра, иначе будет сообщеноThis instance can only be executed once. Please instantiate a new instance.аномальный.

Суммировать

Технология пулинга действительно широко используется в мирное время, и ее освоение может значительно повысить эффективность.

Исходный код hystrix в конце статьи:

GitHub.com/crossover J я…

Наконец, вставлено небольшое объявление:

Java-InterviewПочти 8K звезда до сих пор.

На этот раз поставьте небольшую цель: стремитесь к влиянию1W star.

Спасибо за вашу поддержку и комплименты.

Добро пожаловать, чтобы обратить внимание на публичный аккаунт, чтобы общаться вместе: