Использование, расширение и оптимизация пулов потоков для параллельного программирования

Java задняя часть алгоритм Google

предисловие

Метод многопоточного проектирования программного обеспечения действительно может максимально увеличить вычислительную мощность современных многоядерных процессоров и повысить пропускную способность и производительность производственных систем. Однако, если система создает большое количество потоков одновременно, накладные расходы, вызванные частым переключением контекста между потоками, замедлят работу всей системы. В тяжелых случаях это может даже вызвать исчерпание памяти, чтобы вызвать исключения OOM. Поэтому в реальной производственной среде необходимо контролировать количество потоков, а слепое создание большого количества новых автомобилей вредно для системы.

Итак, как мы можем максимально использовать производительность процессора, но при этом сохранить стабильность системы? Один из способов сделать это — использовать пул потоков.

Короче говоря, после использования пула потоков создание потока обрабатывает получение бездействующих потоков из пула потоков, а закрытие потока становится возвратом потока в пул. То есть повторное использование потоков улучшено.

И JDK предоставляет мне готовые инструменты для пула потоков после 1.5, давайте научимся их использовать сегодня.

  1. Какие пулы потоков может создавать фабрика пулов потоков Executors?
  2. Как вручную создать пул потоков
  3. Как масштабировать пул потоков
  4. Как оптимизировать информацию об исключениях пула потоков
  5. Как спроектировать количество потоков в пуле потоков

1. Какие пулы потоков может создавать фабрика пулов потоков Executors?

Начнем с простейшего примера использования пула потоков:

  static class MyTask implements Runnable {

    @Override
    public void run() {
      System.out
          .println(System.currentTimeMillis() + ": Thread ID :" + Thread.currentThread().getId());
      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }

  public static void main(String[] args) {
    MyTask myTask = new MyTask();
    ExecutorService service1 = Executors.newFixedThreadPool(5);
    for (int i = 0; i < 10; i++) {
      service1.submit(myTask);
    }
    service1.shutdown();
  }

результат операции:

运行结果

Мы создали экземпляр пула потоков и установили номер потока по умолчанию 5 и отправили 10 задач в пул потоков.Распечатайте текущие миллисекунды и идентификатор потока, из результатов мы видим 5 одинаковых идентификаторов в результатах.Поток печатает миллисекунды.

Это самый простой пример.

Далее поговорим о других методах создания потоков.

1. Фиксированный пул потоковExecutorService service1 = Executors.newFixedThreadPool(5); Этот метод возвращает пул потоков с фиксированным количеством потоков. Количество потоков в этом пуле потоков всегда одинаково. При отправке новой задачи, если в пуле потоков есть свободный поток, она будет выполнена немедленно. Если нет, новая задача будет временно сохранена в очереди задач (максимальное число неограниченной очереди по умолчанию), thread is idle , задача в очереди задач обрабатывается.

2. Одноэлементный пул потоковExecutorService service3 = Executors.newSingleThreadExecutor(); Этот метод возвращает пул потоков только с одним потоком. Если в пул потоков отправлено более одной задачи, задача будет сохранена в очереди задач (максимальное число неограниченной очереди по умолчанию), а задачи в очереди будут выполняться в порядке «первым поступил», «первым- out, когда поток простаивает.

3. Кэш-пул потоковExecutorService service2 = Executors.newCachedThreadPool(); Этот метод возвращает пул потоков, который может регулировать количество потоков в соответствии с реальной ситуацией. Количество потоков в пуле потоков неизвестно, но если есть незанятые потоки, которые можно повторно использовать, повторно используемые потоки будут использоваться в первую очередь. Все потоки работают.При отправке новой задачи создается новый поток для ее обработки. Все потоки вернутся в пул потоков для повторного использования после выполнения текущей задачи.

4. Пул потоков вызовов задачExecutorService service4 = Executors.newScheduledThreadPool(2); Этот метод также возвращает объект ScheduledThreadPoolExecutor, пул потоков может указывать количество потоков.

Использование первых трех потоков ничем не отличается, ключевым является четвертый.Хотя существует множество сред планирования задач потоков, мы все же можем изучить пул потоков. Как это использовать? Вот пример:

class A {

  public static void main(String[] args) {
    ScheduledThreadPoolExecutor service4 = (ScheduledThreadPoolExecutor) Executors
        .newScheduledThreadPool(2);

    // 如果前面的任务没有完成,则调度也不会启动
    service4.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        try {
          // 如果任务执行时间大于间隔时间,那么就以执行时间为准(防止任务出现堆叠)。
          Thread.sleep(10000);
          System.out.println(System.currentTimeMillis() / 1000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }// initialDelay(初始延迟) 表示第一次延时时间 ; period 表示间隔时间
    }, 0, 2, TimeUnit.SECONDS);


    service4.scheduleWithFixedDelay(new Runnable() {
      @Override
      public void run() {
        try {
          Thread.sleep(5000);
          System.out.println(System.currentTimeMillis() / 1000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }// initialDelay(初始延迟) 表示延时时间;delay + 任务执行时间 = 等于间隔时间 period
    }, 0, 2, TimeUnit.SECONDS);

    // 在给定时间,对任务进行一次调度
    service4.schedule(new Runnable() {
      @Override
      public void run() {
        System.out.println("5 秒之后执行 schedule");
      }
    }, 5, TimeUnit.SECONDS);
  }
  }

}

Приведенный выше код создает планирование планирования задачи планирования блокнота и вызывает три метода соответственно. Необходимо сосредоточиться на объяснении методов планируемых методов SCHEDULETFIXEDDELAY. Функции этих двух методов очень похожи. Бывший алгоритм временного интервала основан на Время указанного периода и время выполнения задач, а последнее - это указанное время выполнения задач + время выполнения задач. Если студенты заинтересованы, вы можете запустить вышеуказанный код, чтобы увидеть. То же самое можно увидеть.

Итак, JDK инкапсулирует для нас 4 метода создания пулов потоков.Однако обратите внимание, что поскольку эти методы сильно инкапсулированы, при их неправильном использовании не будет возможности устранить проблемы.Поэтому я предлагаю программистам вручную создать пулы потоков. пул потоков, и предпосылкой ручного создания является высокая степень понимания настроек параметров пула потоков. Итак, давайте посмотрим, как вручную создать пул потоков.

2. Как вручную создать пул потоков

Ниже приведен шаблон для ручного создания пула потоков:

  /**
   * 默认5条线程(默认数量,即最少数量),
   * 最大20线程(指定了线程池中的最大线程数量),
   * 空闲时间0秒(当线程池梳理超过核心数量时,多余的空闲时间的存活时间,即超过核心线程数量的空闲线程,在多长时间内,会被销毁),
   * 等待队列长度1024,
   * 线程名称[MXR-Task-%d],方便回溯,
   * 拒绝策略:当任务队列已满,抛出RejectedExecutionException
   * 异常。
   */
  private static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 20, 0L,
      TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024)
      , new ThreadFactoryBuilder().setNameFormat("My-Task-%d").build()
      , new AbortPolicy()
  );

Мы видели,ThreadPoolExecutorТо есть у пула потоков есть 7 параметров, давайте посмотрим вместе:

  1. corePoolSizeКоличество основных потоков в пуле потоков
  2. maximumPoolSizeМаксимальное количество нитей
  3. keepAliveTimeПростое время (когда резьбовой пул превысил количество ядер, избыток простого времени выживает время, то есть нить холостого хода количества основных нитей, как долго, будет уничтожена)
  4. unitединица времени
  5. workQueueКогда основной поток заполнен работой, очередь, в которой нужно сохранить задачу,
  6. threadFactoryФабрика для создания потоков
  7. handlerПолитика отклонения при заполнении очереди

Мы не будем говорить о первых нескольких параметрах, это очень просто, в основном последние несколько параметров, очередь, фабрика потоков и политика отклонения.

Давайте сначала посмотрим на очередь, Пул потоков предоставляет 4 очереди по умолчанию.

  1. Неограниченная очередь: размер по умолчанию int max, поэтому может исчерпать системную память, вызвать OOM, очень опасно.
  2. Очередь с прямой отправкой: емкость отсутствует, она не будет сохранена, а новые потоки создаются напрямую, поэтому необходимо задать большое количество пулов потоков. В противном случае реализовать стратегию отказа легко, а также очень опасно.
  3. Ограниченная очередь: Если ядро ​​заполнено, оно будет храниться в очереди. Если ядро ​​заполнено и очередь заполнена, потоки будут создаваться до достижения максимального размера пула. Если очередь заполнена и максимальное количество потоков будет достигнуто, политика отказа будет выполнена.
  4. Очередь с приоритетом: выполнение задач в соответствии с приоритетом. Размер также можно установить.

Арендодатель использовал неограниченную очередь в своем собственном проекте, но установите размер задачи до 1024. Если у вас есть много задач, рекомендуется разделить в несколько резьбовых пулов. Не кладите яйца в одну корзину.

Отказаться смотреть на политику, какова стратегия отказа? Когда очередь заполнена, способы обработки этих задач все еще отправляются. Существует четыре стратегии JDK по умолчанию.

  1. AbortPolicy : создает исключение напрямую, препятствуя правильной работе системы.
  2. CallerRunsPolicy : Пока пул потоков не закрыт, эта политика запускает текущую отброшенную задачу непосредственно в потоке вызывающей стороны. Очевидно, что это на самом деле не приведет к падению задачи, однако производительность потока, отправляющего задачу, скорее всего, резко упадет.
  3. DiscardOldestPolicy: эта политика отбрасывает самый старый запрос, то есть задачу, которая должна быть выполнена, и пытается снова отправить текущую задачу.
  4. DiscardPolicy: эта политика автоматически отбрасывает задачи, которые не могут быть обработаны без какой-либо обработки.Если задачи могут быть потеряны, я думаю, что это лучшее решение.

Конечно, если вас не устраивает политика отклонения, предоставляемая JDK, вы можете реализовать ее самостоятельно, просто реализуя интерфейс RejectedExecutionHandler и переписывая метод rejectExecution.

Наконец, нить фабрика, нить пула резьбы по никам завод все для создания, и потока по умолчанию слишком просто, мы посмотрим на то, как завод потоков по умолчанию для создания потоков:

/**
     * The default thread factory
     */
    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

Его можно рассматривать как пул- + имя идентификатора потока пула потоков + -thread- + номер потока. Установка потоков, не являющихся демонами. Приоритет по умолчанию.

Что, если мы захотим изменить имя? Да, реализовать интерфейс ThreadFactory и переопределить метод newThread. Но уже есть искусственные колеса, такие как фабрика ThreadFactoryBuilder, предоставленная Google guaua, используемая в нашем примере. Вы можете настроить имя потока, будь то защищенный, приоритет, обработка исключений и т. д. с помощью мощных функций.

3. Как расширить пул потоков

Итак, можем ли мы расширить функциональность пула потоков? Например, записывайте время выполнения задач потока. На самом деле пул потоков JDK зарезервировал для нас интерфейсы.Среди основных методов пула потоков есть два пустых метода, которые зарезервированы для нас. Существует также метод, который вызывается при выходе из пула потоков. Давайте посмотрим на пример:

/**
 * 如何扩展线程池,重写 beforeExecute, afterExecute, terminated 方法,这三个方法默认是空的。
 *
 * 可以监控每个线程任务执行的开始和结束时间,或者自定义一些增强。
 *
 * 在 Worker 的 runWork 方法中,会调用这些方法
 */
public class ExtendThreadPoolDemo {

  static class MyTask implements Runnable {

    String name;

    public MyTask(String name) {
      this.name = name;
    }

    @Override
    public void run() {
      System.out
          .println("正在执行:Thread ID:" + Thread.currentThread().getId() + ", Task Name = " + name);
      try {
        Thread.sleep(100);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }


  public static void main(String[] args) throws InterruptedException {
    ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<>()) {
      @Override
      protected void beforeExecute(Thread t, Runnable r) {
        System.out.println("准备执行:" + ((MyTask) r).name);
      }

      @Override
      protected void afterExecute(Runnable r, Throwable t) {
        System.out.println("执行完成: " + ((MyTask) r).name);
      }

      @Override
      protected void terminated() {
        System.out.println("线程池退出");
      }
    };

    for (int i = 0; i < 5; i++) {
      MyTask myTask = new MyTask("TASK-GEYM-" + i);
      es.execute(myTask);
      Thread.sleep(10);

    }

    es.shutdown();
  }

}

Мы переопределяем метод beforeExecute, который вызывается перед выполнением задачи, и метод afterExecute, вызываемый после выполнения задачи. Существует также завершаемый метод, который вызывается при выходе из пула потоков. Каков результат выполнения?

Как видите, методы before и after вызываются до и после выполнения каждой задачи. Эквивалент выполнения среза. Завершенный метод вызывается после вызова метода выключения.

4. Как оптимизировать информацию об исключениях пула потоков

Как оптимизировать информацию об исключениях пула потоков? Прежде чем говорить об этой проблеме, поговорим о баге, который непросто найти:

Посмотрите на код:

  public static void main(String[] args) throws ExecutionException, InterruptedException {

    ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 0L,
        TimeUnit.MILLISECONDS, new SynchronousQueue<>());

    for (int i = 0; i < 5; i++) {
      executor.submit(new DivTask(100, i));
    }


  }


  static class DivTask implements Runnable {
    int a, b;

    public DivTask(int a, int b) {
      this.a = a;
      this.b = b;
    }

    @Override
    public void run() {
      double re = a / b;
      System.out.println(re);
    }
  }

Результаты:

Примечание: Есть только 4 результата, один из которых проглочен и не содержит никакой информации. Зачем? Если вы внимательно посмотрите на код, то обнаружите, что при выполнении 100/0 обязательно будет сообщено об ошибке, но сообщения об ошибке нет, а это головная боль, почему? На самом деле, если вы используете метод execute, будет напечатано сообщение об ошибке, а когда вы используете метод submit без вызова его метода get, исключение будет проглочено, потому что, если возникает исключение, оно возвращается как возвращаемое значение. .

Как это сделать? Можно, конечно, использовать метод execute, но можно и по-другому: переписать метод submit, арендодатель написал пример, давайте посмотрим:

  static class TraceThreadPoolExecutor extends ThreadPoolExecutor {

    public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
        TimeUnit unit, BlockingQueue<Runnable> workQueue) {
      super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    public void execute(Runnable command) {
//      super.execute(command);
      super.execute(wrap(command, clientTrace(), Thread.currentThread().getName()));
    }

    @Override
    public Future<?> submit(Runnable task) {
//      return super.submit(task);
      return super.submit(wrap(task, clientTrace(), Thread.currentThread().getName()));
    }

    private Exception clientTrace() {
      return new Exception("Client stack trace");
    }


    private Runnable wrap(final Runnable task, final Exception clientStack,
        String clientThreaName) {
      return new Runnable() {
        @Override
        public void run() {
          try {
            task.run();
          } catch (Exception e) {
            e.printStackTrace();
            clientStack.printStackTrace();
            throw e;
          }
        }
      };
    }
  }

Мы переписали метод submit для инкапсуляции информации об исключении.Если возникнет исключение, будет напечатана информация о стеке. Давайте посмотрим, каков результат использования переписанного пула потоков?

По результатам мы ясно видим причину ошибки: на ноль! И информация о стеке понятна и удобна. Оптимизирована стратегия пула потоков по умолчанию.

5. Как спроектировать количество потоков в пуле потоков

Размер пула потоков оказывает определенное влияние на производительность системы.Слишком большое или слишком маленькое количество потоков не может обеспечить оптимальную производительность системы, но определение размера пула потоков не должно быть очень сложным. точный. Поскольку, пока избегают максимальных и минимальных условий, влияние размера пула потоков на производительность не будет слишком большим.Вообще говоря, при определении размера необходимо учитывать количество процессоров, объем памяти и другие факторы. В книге «Практика» приводится эмпирическая формула для оценки размера пула потоков:

Формула все еще немного сложна.Короче говоря, если вы выполняете операцию с интенсивным использованием ЦП, то количество потоков и количество ядер ЦП должны быть одинаковыми, чтобы избежать множества бесполезных переключений контекстов потоков.Если вы выполняете операцию ввода-вывода- интенсивный, нужно много ждать, тогда количество потоков можно поставить больше, например ядер ЦП умножить на 2.

Что касается того, как получить количество ядер ЦП, Java предоставляет метод:

Runtime.getRuntime(). AvailableProcessors();

Возвращает количество ядер процессора.

Суммировать

Ну вот, у нас есть понимание, как использовать пул потоков, здесь арендодатель рекомендует создать пул потоков вручную, чтобы иметь точное представление о параметрах в пуле потоков, и отлаживать или настраивать система, хорошо, когда хорошо. Например, установить соответствующее количество основных потоков, максимальное количество потоков, политику отклонения, фабрику потоков, размер и тип очереди и т. д. Это также может быть пользовательский поток фабрики потоков семейства G. .

В следующей статье мы углубимся в исходный код, чтобы увидеть, как реализован пул потоков JDK. Поэтому сначала ознакомьтесь с использованием пулов потоков! ! !

удачи! ! !