серия многопоточности Java: ThreadPoolExecutor

Java задняя часть GitHub Алибаба

Пользовательский пул потоков ThreadPoolExecutor

Картинка в начале (картинка взята изРуководство по разработке Java для Alibaba (подробная версия)), с последующим редактированием

Хорошо, давайте начнем редактирование. Вы можете увидеть тему этого сообщения в блоге на картинке, пользовательский пул потоков ThreadPoolExecutor.

содержание

  1. Введен конструктор ThreadPoolexecutor
  2. Ядро количество потоков corePoolSize
  3. Максимальное количество потоков maxPoolSize
  4. время выживания потока keepAliveTime
  5. Единица времени выживания потока
  6. Фабрика threadFactory, создающая потоки
  7. очередь
  8. политика отказа
  9. Расширение пула потоков

Введение в конструктор ThreadPoolExecutor

Прежде чем вводить метод ношения пула потоков, мы должны сначала ввести класс ThreadPoolExecutor, потому что большинство методов фабрики Executors возвращают объекты ThreadPoolExecutor, давайте сначала посмотрим на его конструктор.

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {...}

Введение параметра

параметр тип значение
corePoolSize int количество основных потоков
maximumPoolSize int максимальное количество потоков
keepAliveTime long время выживания
unit TimeUnit единица времени
workQueue BlockingQueue Очередь для хранения потоков
threadFactory ThreadFactory Фабрика для создания потоков
handler RejectedExecutionHandler избыточные обработчики потоков (политика отклонения)

Количество основных потоков corePoolSize

Этот параметр указывает базовое количество потоков в пуле потоков, то есть количество основных потоков.

Максимальное количество потоков maxPoolSize

Этот параметр представляет собой максимальное количество потоков, которые можно создать в пуле потоков.Когда используется ограниченная очередь и задачи, хранящиеся в очереди, заполнены, пул потоков создаст новый поток (максимальное значение не будет превышать значение устанавливается этим параметром). должны знать о том,Этот параметр недействителен при использовании неограниченных очередей.

время выживания потока keepAliveTime

Это время, в течение которого поток может существовать, когда он простаивает.По истечении этого времени поток будет уничтожен.

Единица времени выживания потока

Единица времени выживания потока, там наносекунды (нс), МИКРОСЕКОНДЫ (микросекунды), МИЛЛИСЕКУНДЫ (мс), СЕКУНДЫ (с), МИНУТЫ (мин), ЧАСЫ (hr), ДНИ (дни). Код TimeUnit выглядит следующим образом

public enum TimeUnit {
    NANOSECONDS {...},

    MICROSECONDS {...},

    MILLISECONDS {...},

    SECONDS {...},

    MINUTES {...},

    HOURS {...},

    DAYS {...};
}

Фабрика threadFactory, создающая потоки

Фабрика, создающая поток, обычно представляет собой DefaultThreadFactory, возвращаемую методом Executors.defaultThreadFactory().Конечно, можно использовать и другие имена, чтобы задать более осмысленные имена.

Класс DefaultThreadFactory выглядит следующим образом.

/**
 * The default thread factory
 */
static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

очередь

Хранится делится на ограниченный и неограниченные очереди очередей для блокировки очереди задач, ожидающих выполнения. Там синхронноусую, ArrayBlockingQueue, LinkedBlockingcyue, HellowQueue, PriorityBlockingQueue, LinkedTransferceue, Задержка работы, LinkedBlockingDeque. Ниже описано ограниченные и неограниченные две общие очередь. Инжир блокировки в следующий класс

ограниченная очередь

При использовании ограниченной очереди, если необходимо добавить новую задачу, если фактическое количество потоков в пуле потоков меньше, чем corePoolSize (количество основных потоков), потоки будут созданы первыми. в пуле потоков больше, чем количество corePoolSize (количество основных потоков), задача будет добавлена ​​в очередь.Если очередь заполнена, будет создан новый поток, исходя из того, что количество средних сайтов не больше, чем maxPoolSize (максимальное количество потоков).Если количество потоков больше, чем maxPoolSize (максимальное количество потоков), будет выполнена политика отклонения.

неограниченная очередь

При использовании неограниченной очереди максимальный размер пула (максимальное количество потоков) и политика отклонения не сработают, поскольку очередь не ограничена, поэтому очередь не может быть заполнена. По сравнению с ограниченной очередью, когда добавляется новая задача, она будет входить в очередь и ждать. Но это также имеет некоторые проблемы, такие как скорость выполнения потока медленнее, чем скорость отправки задачи, что приведет к быстрому росту неограниченной очереди, пока системные ресурсы не будут исчерпаны.

политика отказа

Когда используется ограниченная очередь, и количество потоков достигает максимума после заполнения очереди задачами, вступает в действие стратегия отклонения. ThreadPoolExecutor по умолчанию использует политику отклонения AbortPolicy. Диаграмма класса RejectedExecutionHandler выглядит следующим образом.

Давайте посмотрим, как вызвать ThreadPoolExecutor RejectedExecutionHandler, вы можете просмотреть метод прямого выполнения

public class ThreadPoolExecutor extends AbstractExecutorService {
    
    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();

            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }else if (!addWorker(command, false))
                //拒绝线程
                reject(command);
        }
}

Видно, что после ряда операций будет вызван метод reject, если условия не выполняются, то давайте рассмотрим метод reject.

final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

Вы можете видеть, что вызывается метод rejectExecution интерфейса RejectedExecutionHandler. Что ж, теперь давайте взглянем на несколько политик отказа, предоставляемых jdk.

Тестовый код для политики отклонения находится здесь

Примечание. В будущем я напишу анализ исходного кода ThreadPoolExecutor, в котором будут представлены различные процессы ThreadPoolExecutor.

AbortPolicy

Из приведенного ниже кода видно, что информация об исключении выдается напрямую, но пул потоков по-прежнему может нормально работать.

public static class AbortPolicy implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}

образец кода

класс потока

public class Task implements Runnable{

   private int id ;

   public Task(int id){
      this.id = id;
   }

   public int getId() {
      return id;
   }
   public void setId(int id) {
      this.id = id;
   }

   @Override
   public void run() {
      //
      System.out.println(LocalTime.now()+" 当前线程id和名称为:" + this.id);
      try {
         Thread.sleep(1000);
      } catch (Exception e) {
         e.printStackTrace();
      }
   }


   public String toString(){
      return "当前线程的内容为:{ id : " + this.id + "}";
   }

}

тестовый код

public class TestAbortPolicy {

    public static void main(String[] args) {
        //定义了1个核心线程数,最大线程数1个,队列长度2个
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                1,
                1,
                60,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(2),
                new ThreadPoolExecutor.AbortPolicy());


        //直接提交4个线程
        executor.submit(new Task(1));
        executor.submit(new Task(2));
        executor.submit(new Task(3));
        //提交第四个抛异常
        executor.submit(new Task(4));

    }
}

Результаты

当前线程id和名称为:1
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@1540e19d rejected from java.util.concurrent.ThreadPoolExecutor@677327b6[Running, pool size = 1, active threads = 1, queued tasks = 2, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
	at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
	at com.learnConcurrency.executor.customThreadPool.testRejectedExecutionHandler.TestAbortPolicy.main(TestAbortPolicy.java:25)
当前线程id和名称为:2
当前线程id和名称为:3

Вы можете видеть, что добавление четвертого потока вызывает исключение

CallerRunsPolicy

Сначала определите, закрыт ли пул потоков, если нет, выполните поток напрямую. Закрыть ничего не делает.

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

Код почти такой же, как и выше, если вы хотите его проверить, вы можете проверить TestCallerRunsPolicy на github, результаты выполнения следующие

14:58:19.462 当前线程id和名称为:4
14:58:19.462 当前线程id和名称为:1
14:58:20.464 当前线程id和名称为:5
14:58:20.464 当前线程id和名称为:2
14:58:21.464 当前线程id和名称为:3
14:58:22.464 当前线程id和名称为:6

DiscardPolicy

Мы видим, что нет никакого кода, это тоже отклоненная потоковая задача отбрасывается без какого-либо лечения.

public static class DiscardPolicy implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

DiscardOldestPolicy

Во-первых, определите, закрыт ли пул потоков.Если он не закрыт, отбросьте самый старый запрос и попробуйте снова отправить текущую задачу. Закрыть ничего не делает.

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
   
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}

Код почти такой же, как и выше, если вы хотите его проверить, вы можете проверить TestDiscardOldestPolicy на github, результат выполнения выглядит следующим образом.

15:02:28.484 当前线程id和名称为:1
15:02:29.486 当前线程id和名称为:5
15:02:30.487 当前线程id和名称为:6

Вы можете видеть, что потоки 2, 3 и 4 были заменены.

Пользовательская политика отказа

Просто реализуйте интерфейс RejectedExecutionHandle следующим образом: MyRejected

public class MyRejected implements RejectedExecutionHandler{

   @Override
   public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
      System.out.println("自定义处理:开始记录日志");
      System.out.println(r.toString());
      System.out.println("自定义处理:记录日志完成");
   }

}

тестовый код

public class TestCustomeRejectedPolicy {

    public static void main(String[] args) {
        //定义了1个核心线程数,最大线程数1个,队列长度2个
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                1,
                1,
                60,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(2),
                new MyRejected());


        executor.execute(new Task(1));
        executor.execute(new Task(2));
        executor.execute(new Task(3));
        executor.execute(new Task(4));
        executor.execute(new Task(5));
        executor.execute(new Task(6));


        executor.shutdown();
    }
}

выходной результат

自定义处理:开始记录日志
当前线程的内容为:{ id : 4}
自定义处理:记录日志完成
自定义处理:开始记录日志
当前线程的内容为:{ id : 5}
自定义处理:记录日志完成
自定义处理:开始记录日志
当前线程的内容为:{ id : 6}
自定义处理:记录日志完成
15:12:39.267 当前线程id和名称为:1
15:12:40.268 当前线程id和名称为:2
15:12:41.268 当前线程id和名称为:3

Process finished with exit code 0

Здесь, если у вас есть тщательное наблюдение, вам может быть любопытно, зачем использовать метод Execute вместо отправки?

В настоящее время, поскольку после использования метода submit входящий поток будет инкапсулирован в RunnableFuture, а MyRejected, который я написал, вызвал метод toString, а класс Task переопределил метод toString, но инкапсулированный в RunnableFuture, будет введен следующий контент.

自定义处理:开始记录日志
java.util.concurrent.FutureTask@1540e19d
自定义处理:记录日志完成
自定义处理:开始记录日志
java.util.concurrent.FutureTask@677327b6
自定义处理:记录日志完成
自定义处理:开始记录日志
java.util.concurrent.FutureTask@14ae5a5
自定义处理:记录日志完成
15:18:17.262 当前线程id和名称为:1
15:18:18.263 当前线程id和名称为:2
15:18:19.264 当前线程id和名称为:3

Process finished with exit code 0

Расширение пула потоков

Три метода в классе ThreadPoolExecutor являются пустыми методами, которые можно переписать для отслеживания потоков посредством наследования. Переопределяя методы beforeExecute и afterExecute, вы можете добавить ведение журнала, синхронизацию, мониторинг и многое другое. Завершенный метод вызывается при закрытии потока, и здесь могут выполняться такие операции, как уведомления и журналы.

//任务执行前
protected void beforeExecute(Thread t, Runnable r) { }
//任务执行后
protected void afterExecute(Runnable r, Throwable t) { }
//线程池关闭
protected void terminated() { }

образец кода

public class Main {

    public static void main(String[] args) {
        ThreadPoolExecutor pool = new MyThreadPoolExecutor(
                2,              //coreSize
                4,              //MaxSize
                60,          //60
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(4));

        for (int i = 0; i < 8; i++) {
            int finalI = i + 1;
            pool.submit(() -> {
                try {
                    Thread.sleep(new Random().nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        pool.shutdown();
    }

    static class MyThreadPoolExecutor extends ThreadPoolExecutor{
        private final AtomicInteger tastNum = new AtomicInteger();
        private final ThreadLocal<Long> startTime = new ThreadLocal<>();

        public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }

        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            super.beforeExecute(t, r);
            startTime.set(System.nanoTime());
            System.out.println(LocalTime.now()+" 执行之前-任务:"+r.toString());
        }

        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            long endTime = System.nanoTime();
            long time = endTime - startTime.get();
            tastNum.incrementAndGet();
            System.out.println(LocalTime.now()+" 执行之后-任务:"+r.toString()+",花费时间(纳秒):"+time);
            super.afterExecute(r, t);
        }

        @Override
        protected void terminated() {
            System.out.println("线程关闭,总共执行线程数:"+tastNum.get());
            super.terminated();
        }
    }

}

Результаты

15:43:23.329 执行之前-任务:java.util.concurrent.FutureTask@469dad33
15:43:23.329 执行之前-任务:java.util.concurrent.FutureTask@1446b68c
15:43:23.329 执行之前-任务:java.util.concurrent.FutureTask@5eefc31e
15:43:23.329 执行之前-任务:java.util.concurrent.FutureTask@33606b2
15:43:23.513 执行之后-任务:java.util.concurrent.FutureTask@33606b2,花费时间(纳秒):216399556
15:43:23.513 执行之前-任务:java.util.concurrent.FutureTask@236e71ad
15:43:23.601 执行之后-任务:java.util.concurrent.FutureTask@1446b68c,花费时间(纳秒):304505594
15:43:23.601 执行之前-任务:java.util.concurrent.FutureTask@107920dc
15:43:23.733 执行之后-任务:java.util.concurrent.FutureTask@5eefc31e,花费时间(纳秒):436283680
15:43:23.733 执行之前-任务:java.util.concurrent.FutureTask@502826b3
15:43:23.808 执行之后-任务:java.util.concurrent.FutureTask@469dad33,花费时间(纳秒):512242583
15:43:23.808 执行之前-任务:java.util.concurrent.FutureTask@96741ab
15:43:23.924 执行之后-任务:java.util.concurrent.FutureTask@107920dc,花费时间(纳秒):322900976
15:43:24.059 执行之后-任务:java.util.concurrent.FutureTask@236e71ad,花费时间(纳秒):546324680
15:43:24.498 执行之后-任务:java.util.concurrent.FutureTask@502826b3,花费时间(纳秒):765309335
15:43:24.594 执行之后-任务:java.util.concurrent.FutureTask@96741ab,花费时间(纳秒):785868205
线程关闭,总共执行线程数:8

расположение кода

Адрес GitHub

адрес здесь

Я думаю, что хорошо указать звезду

использованная литература

[1] Искусство параллельного программирования на Java

[2] Практика параллельного программирования на Java