[Не ложиться спать допоздна] Пул нитей, полный галантерейных товаров...

задняя часть

Эта статья участвовала в мероприятии Haowen Convocation Order,Нажмите, чтобы просмотреть: Заявки на бэк-энд и фронт-энд двойные, призовой фонд в 20 000 юаней ждет вас, чтобы бросить вызов! "

img

Структура каталогов

1. Документы, написанные Дугом Ли в экспертной группе JCP JSR-166.

2. Определение 6 состояний потока в исходном коде JAVA8.

3. Основные параметры пула потоков и детализированный рабочий процесс (дополнение, выполнение, повторение потока....)

В-четвертых, количество потоков в пуле потоков, политика отклонения, детали конфигурации очереди блокировки.

5. Фактическая конфигурация пула потоков и расширенная функция пула потоков

img

Введение

Дуг Ли из JCP JSR-166Написано с помощью экспертов и выпущено в общественное достояние, например О, купите на Creative Commons.org/public…

ОдинExecutorService, который выполняет каждую отправленную задачу, используя, возможно, один из нескольких потоков в пуле, обычно используяExecutorsЗаводской метод его настройки. Пулы потоков решают две разные проблемы: они обычно обеспечивают повышенную производительность при выполнении большого количества асинхронных задач из-за сокращения накладных расходов на вызовы для каждой задачи и предоставляют способ связывания и управления ресурсами (включая потоки), который потребляет задачи во время выполнения. Коллекция. Каждый ThreadPoolExecutor также поддерживает некоторую базовую статистику, например количество выполненных задач. Чтобы быть полезным в широком диапазоне контекстов, класс предоставляет ряд настраиваемых параметров и крючков расширяемости. Однако программисты советуют использовать более удобныйExecutorsзаводской методExecutors.newCachedThreadPool(неограниченный пул потоков с автоматическим повторным использованием потоков),Executors.newFixedThreadPool(пул потоков фиксированного размера) иExecutors.newSingleThreadExecutor(один фоновый поток), эти методы могут иметь предварительно настроенные параметры. Самый распространенный вариант использования. В противном случае используйте следующие рекомендации при ручной настройке и настройке этого класса:

Размер ядра и максимальный размер пула

ThreadPoolExecutor будет основываться на corePoolSize (см.getCorePoolSizegetCorePoolSize(ВидетьgetMaximumPoolSize) для автоматического изменения размера пула (см. getPoolSize ). При отправке новой задачи в методе execute(Runnable),и когда запущено меньше потоков, чем потоков corePoolSize, создается новый поток для обработки запроса, даже если другие рабочие потоки простаивают.. Если количество запущенных потоков больше, чем corePoolSize, но меньше, чем maxPoolSize, тогдаСоздавать новый поток только тогда, когда очередь заполнена. Пул потоков фиксированного размера можно создать, установив одинаковые параметры corePoolSize и maxPoolSize. Установив максимальное значениеPoolSize в изначально неограниченное значение, такое как Integer.MAX_VALUE, пул может содержать любое количество одновременных задач.Как правило, размер ядра и максимальный размер пула задаются только при построении, но также можно использовать setCorePoolSize и setMaximumPoolSize изменяется динамически.

Строительство по требованию

По умолчанию даже основные потоки создаются и запускаются только при поступлении новых задач, но это можно динамически переопределить с помощью методов prestartCoreThread или prestartAllCoreThreads. Если вы создаете пул с непустой очередью, вы можете захотеть предварительно запустить потоки.

создать новую тему

Используйте ThreadFactory для создания новых потоков. Если не указано иное, используется Executors.defaultThreadFactory, который создает все потоки в одной и той же ThreadGroup с одинаковым приоритетом NORM_PRIORITY и статусом без демона. Предоставляя дополнительный ThreadFactory, можно изменить имя потока, группу потоков, приоритет, состояние демона и т. д. Если ThreadFactory не может создать поток при запросе, попросив newThread вернуть null, исполнитель продолжит работу, но не сможет выполнять какие-либо задачи. Поток должен иметь «modifyThread» RuntimePermission. Если рабочие потоки или другие потоки, использующие пул, не имеют этого разрешения, сервис может ухудшиться: изменения конфигурации могут не вступить в силу своевременно, а пулы отключения могут оставаться в состоянии, которое может завершиться, но не завершиться.

держать в живых время

Если количество потоков в текущем пуле превышает corePoolSize, избыточные потоки будут завершены, когда время простоя превысит keepAliveTime (см. getKeepAliveTime(TimeUnit)). Это позволяет снизить потребление ресурсов, когда пул активно не используется. Если позже пул станет более активным, будут построены новые потоки. Вы также можете использовать метод setKeepAliveTime(long, TimeUnit)динамическое изменениеэтот параметр. Использование Long.MAX_VALUE со значением TimeUnit.NANOSECONDS, равным Long.MAX_VALUE, фактически приводит к тому, что бездействующий поток никогда не завершается до завершения работы. По умолчанию политика поддержания активности применяется только при наличии большого количества потоков corePoolSize. Однако, пока значение keepAliveTime не равно нулю, методallowCoreThreadTimeOut(логическое значение) также может использоваться для применения этой политики тайм-аута.для основной резьбы.

очередь

Любая BlockingQueue может использоваться для передачи и хранения отправленных задач. Использование этой очереди взаимодействует с размером пула: Если запущено меньше потоков, чем потоков corePoolSize, исполнитель всегда предпочитает добавлять новые потоки, а не ставить их в очередь. Если запущены corePoolSize или несколько потоков, исполнитель всегда предпочитает ставить запросы в очередь, а не добавлять новые потоки. Если запрос не может быть помещен в очередь, будет создан новый поток, если поток не превысит максимальный размер пула, и в этом случае задача будет отклонена.

Существует три основных стратегии организации очереди:

Прямая передача. Для рабочих очередей хорошим выбором по умолчанию является SynchronousQueue, который может передавать задачи потокам без необходимости их дополнительного сохранения. Здесь, если нет немедленно доступного потока для выполнения задачи, попытка поставить ее в очередь будет неудачной, поэтому будет создан новый поток. Эта стратегия позволяет избежать блокировки при обработке наборов запросов, которые могут иметь внутренние зависимости.Для прямого переключения обычно требуется неограниченный максимальный размер пула, чтобы избежать отклонения вновь отправленных задач.. В свою очередь, когда в среднем команды продолжают поступать быстрее, чем они могут быть обработаны, это можетРезьба огромного роставозможность.

Неограниченная очередь. Использование неограниченной очереди (например, LinkedBlockingQueue без предопределенной емкости) будет держать новые задачи в ожидании в очереди, пока все потоки corePoolSize заняты. Поэтому будут созданы только потоки corePoolSize. (Таким образом, значение maxPoolSize не влияет.) Это может быть уместно, когда каждая задача полностью независима от других, поэтому задачи не влияют на выполнение друг друга. Например, на веб-сервере. Хотя эта очередьПолезно для устранения коротких пакетов запросов, но признает, что возможен неограниченный рост очереди работ, поскольку команды продолжают поступать в среднем быстрее, чем они могут быть обработаны.

Ограниченная очередь. Ограниченные очереди (такие как ArrayBlockingQueue ) помогают предотвратить исчерпание ресурсов при использовании с ограниченными максимальными размерами пула, но их сложнее настраивать и контролировать.Размер очереди и максимальный размер пула могут поставить друг друга под угрозу: использование больших очередей и небольших пулов минимизирует использование ЦП, ресурсов ОС и накладные расходы на переключение контекста, но приводит к искусственно заниженной пропускной способности. Если задачи часто блокируются (например, если они привязаны к вводу-выводу), система может запланировать больше потоков, чем вы разрешили бы в противном случае. Использование небольших очередей обычно требует большего размера пула, который загружает ЦП, но может привести к неприемлемым издержкам планирования, что также снижает пропускную способность.

задача отклонена

Отправка новой задачи в методе execute(Runnable) будет отклонена, когда исполнитель был остановлен, а также когда исполнитель использует ограниченные границы как максимальной емкости потока, так и рабочей очереди и насыщен. В любом случае метод выполнения вызывает RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor) в своем методе RejectedExecutionHandler. Предоставляются четыре предопределенных стратегии обработчика:

В ThreadPoolExecutor.AbortPolicy по умолчанию обработчик создает RejectedExecutionException во время выполнения при отклонении.

В ThreadPoolExecutor.CallerRunsPolicy вызовите execute, чтобы запустить задачу в собственном потоке. Это обеспечивает простой механизм управления обратной связью, который замедляет отправку новых задач.

В ThreadPoolExecutor.DiscardPolicy просто удалите задачи, которые не могут быть выполнены.

В ThreadPoolExecutor.DiscardOldestPolicy, если исполнитель не закрыт, задача в начале рабочей очереди будет отброшена, а выполнение будет повторено (что может снова привести к сбою, что приведет к его повторению).

Можно определить и использовать другие виды классов RejectedExecutionHandler. Это требует особой осторожности, особенно если стратегия проектирования будет работать только при определенной емкости или стратегии организации очередей.

метод крючка

Этот класс предоставляет защищенные переопределяемыеbeforeExecute(Thread, Runnable) и afterExecute(Runnable, Throwable), которые вызываются до и после выполнения каждой задачи. Их можно использовать для управления средой выполнения. Например, повторно инициализируйте ThreadLocals, соберите статистику или добавьте записи журнала. Кроме того, как только исполнитель полностью завершен, завершенный метод может быть завершен для выполнения любой специальной обработки, которую необходимо выполнить. Если перехватчик или метод обратного вызова выдает исключение, внутренний рабочий поток может завершиться с ошибкой и внезапно завершиться.

обслуживание очереди

Метод getQueue() позволяет получить доступ к рабочей очереди дляМониторинг и отладка. Настоятельно не рекомендуется использовать этот метод для каких-либо других целей. При отмене большого количества задач в очереди можно использовать два предоставленных метода remove(Runnable) и purge, чтобы помочь высвободить хранилище. завершенный Эта программа больше не упоминается, и оставшиеся потоки не будут автоматически отключены из пула. Если вы хотите гарантировать, что пулы, на которые нет ссылок, будут восстановлены, даже если пользователь забудет вызвать завершение работы, вы должны установить соответствующее время поддержания активности, используя нижнюю границу нулевых потоков ядра и/или задав параметр allowCoreThreadTimeOut(boolean), чтобы организовать неиспользуемые потоки в конечном итоге. die allowCoreThreadTimeOut(boolean) .

Расширенный пример. Большинство расширений этого класса переопределяют один или несколько защищенных методов перехватчика. Например, вот подкласс, который добавляет простую функциональность паузы/возобновления:

   class PausableThreadPoolExecutor extends ThreadPoolExecutor {
     private boolean isPaused;
     private ReentrantLock pauseLock = new ReentrantLock();
     private Condition unpaused = pauseLock.newCondition();

     public PausableThreadPoolExecutor(...) { super(...); }

     protected void beforeExecute(Thread t, Runnable r) {
       super.beforeExecute(t, r);
       pauseLock.lock();
       try {
         while (isPaused) unpaused.await();
       } catch (InterruptedException ie) {
         t.interrupt();
       } finally {
         pauseLock.unlock();
       }
     }

     public void pause() {
       pauseLock.lock();
       try {
         isPaused = true;
       } finally {
         pauseLock.unlock();
       }
     }

     public void resume() {
       pauseLock.lock();
       try {
         isPaused = false;
         unpaused.signalAll();
       } finally {
         pauseLock.unlock();
       }
     }
   }


img

Если вы хотите досконально разобраться в пуле потоков, сначала разберитесь с потоком

Следующее основано на введении JDK1.8:

Выдержка из исходного фрагмента: некоторые основные определения

    private volatile String name; // 线程的名字
    // 线程的优先级,默认为5,可自行设置,越大代表可以获得的时间片几率越高
    private int       priority; 
​
    /* 是否是守护线程,守护线程在JVM结束时自动销毁 */
    private boolean     daemon = false;
​
    /* 将要运行的目标. */
    private Runnable target;
​
    /* 线程组-就是给线程分组,挺简单,初始化会被分配,与线程池无直接联系 */
    private ThreadGroup group;
    /* 此线程的上下文ClassLoader */
    private ClassLoader contextClassLoader;
​
    /* The inherited AccessControlContext of this thread */
    private AccessControlContext inheritedAccessControlContext;
​
    /* 用于命名是哪个线程的编号 */
    private static int threadInitNumber;
    private static synchronized int nextThreadNum() {
        return threadInitNumber++;
    }
​
    /* 与此线程有关的ThreadLocal值。该映射由ThreadLocal类维护 */
    ThreadLocal.ThreadLocalMap threadLocals = null;
​
    /*
     *与此线程有关的InheritableThreadLocal值。该映射由InheritreadLableThocal类维护.
     */
    ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
​
    /*
    此线程请求的堆栈大小,如果创建者未指定堆栈大小,则为0。
    VM可以根据此数字执行*喜欢的任何事情;一些虚拟机将忽略它.
     */
    private long stackSize;
​
    /*
     * Thread ID
     */
    private long tid;
​
    /* 用于生成线程ID */
    private static long threadSeqNumber;
​
    /* Java thread status 
     */
    private volatile int threadStatus = 0;

Обратите внимание на несколько важных методов:

1. есть одинstartметод, этот метод вызывает операционную систему и использует операционную систему для вызова нашего метода запуска.

private native void start0();

\2. interruputметод, это всего лишь флаг и не сломается сразу

interrupted() — это статический метод: внутренняя реализация — это вызов isInterrupted() текущего потока, исбросит статус прерывания текущего потока

isInterrupted() — это метод экземпляра, который представляет собой isInterrupted() потока, представленного объектом, вызывающим метод,Не сбрасывает прерванное состояние текущего потока

\3. joinИнтервью Часто задаваемые вопросы: На самом деле, черезждать, чтобы заблокироватьПотоки, такие как t1.join(), блокируют t1 на неопределенное время и продолжают выполнять следующие методы.

\4.getAllStackTraces Получите информацию о стеке всех потоков, которую можно использовать для расширения мониторинга.

Можно посмотреть другие методы.

Поговорим о статусе темы:

/**
 尚未启动的线程的线程状态
 */
NEW,
​
/**
 可运行线程的线程状态。状态为可运行的线程正在Java虚拟机中执行,
 但是可能正在等待来自操作系统的其他资源,例如处理器。
 */
RUNNABLE,
​
/**
 线程的线程状态被阻塞,等待监视器锁定。处于阻塞状态的线程正在等待监视
 器锁定输入同步块/方法或调用Object.wait后重新输入同步块/方法。
 区别就是有个while
 */
 //  synchronized(this)
//  {
//    while (flag)
//    {
//      obj.wait();
//    }
//  }
BLOCKED,
​
/**
 *等待线程的线程状态。由于调用以下其中一种方法,线程处于等待状态:
 Object.wait无超时
 Thread.join没有超时
 LockSupport.park 等待状态
 正在等待另一个线程执行特定操作。例如,在某个对象上调用
 Object.wait()的线程正在等待另一个线程调用 Object.notify()
 或该对象上的Object.notifyAll()名为 Thread.join的线程正在等待指定
 的线程终止。
 */
WAITING,
​
/**
 具有指定等待时间的等待线程的线程状态。线程由于以指定的正等待时间调用以下
 方法之一而处于定时等待状态:
 Thread.sleep,
 Object.wait(long)
 Thread.join(long)
 LockSupport.parkNanos
 LockSupport.parkUntil
 */
TIMED_WAITING,
​
/**
 终止线程的线程状态。*线程已完成执行
 */
TERMINATED;

Поток почти понят, давайте посмотрим на пул потоков!

Пул потоков

Взгляните на UML-диаграмму пула потоков.

img

Анализируем сверху вниз:

/ ** 
*在将来的某个时间执行给定命令。由 Executor实现决定,命令可以在新线程
池或调用线程中执行。 @param命令可运行任务,如果无法接受此任务,
则@throws RejectedExecutionException 
如果命令为null,则@throws NullPointerException 
 * 
/
void execute(Runnable command);

Проще говоря, это планирование потоков для выполнения задач, а пользователям нужно толькоПредоставьте исполняемый объект, передать текущую ** логику задачи исполнителю **** (**Executor)

ExecutorServiceВ интерфейс добавлены некоторые возможности: (1) Расширить возможность выполнения задач, причем дополнение может быть одним или пакетомАсинхронная задача генерирует Futureметод; (2) обеспечиваетУправление пулом потоковметоды, такие как остановка пула потоков. Это легко увидеть на диаграмме UML выше.

public interface ExecutorService extends Executor {
// 请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,直到所有任务完成执行。
boolean awaitTermination(long timeout, TimeUnit unit);
// 执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks);
// 执行给定的任务,当所有任务完成或超时期满时(无论哪个首先发生),返回保持任务状态和结果的 Future 列表。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit);
// 执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果。
<T> T invokeAny(Collection<? extends Callable<T>> tasks);
// 执行给定的任务,如果在给定的超时期满前某个任务已成功完成(也就是未抛出异常),则返回其结果。
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit);
// 如果此执行程序已关闭,则返回 true。
boolean isShutdown();
// 如果关闭后所有任务都已完成,则返回 true。
boolean isTerminated();
// 启动一次顺序关闭,执行以前提交的任务,但不接受新任务。
void shutdown();
// 试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。
List<Runnable> shutdownNow();
// 提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。
<T> Future<T> submit(Callable<T> task);
// 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。
Future<?> submit(Runnable task);
// 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。
<T> Future<T> submit(Runnable task, T result);
}

AbstractExecutorServiceЭто верхний абстрактный класс, который расширяется здесьFuture, просто чтобы получить результат асинхронного выполнения.Например, в Netty мы обрабатываем сообщения через двусвязный список, и нам нужно обрабатывать сообщения слой за слоем, поэтому здесь также используется Future для получения результатов обработки сообщений.

Класс реализации самого низкого уровняThreadPoolExecutorЧтобы выполнить самую сложную часть операции, ThreadPoolExecutor будет с одной стороныподдерживать собственный жизненный цикл, с другой стороныУправление потоками и задачами, так что они хорошо сочетаются для выполнения параллельных задач.

жизненный цикл пула потоков

Жизненный цикл пула потоковТо есть то, что пул потоков испытывает во время выполнения.состояние пула потоков.线程池内部使用一个变量维护两个值:Рабочий статус(состояние выполнения) иколичество потоков(счетчик рабочих). В конкретной реализации пул потоков поддерживает два ключевых параметра: рабочее состояние (runState) и количество потоков (workerCount).собрать, как показано в следующем коде:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 32 -3 
private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 1 << 29 - 1 = 2^29 -1 
​
// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS; // -2^29 = ‭11100000000000000000000000000000‬
private static final int SHUTDOWN   =  0 << COUNT_BITS; // 0 =  00000000000000000000000000000000‬
private static final int STOP       =  1 << COUNT_BITS; // 2^29 = ‭00100000000000000000000000000000‬
private static final int TIDYING    =  2 << COUNT_BITS; // 2*2^29 = ‭01000000000000000000000000000000‬
private static final int TERMINATED =  3 << COUNT_BITS; // 3*2^29 = ‭01100000000000000000000000000000‬
​
// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; } // 线程池运行状态
private static int workerCountOf(int c)  { return c & CAPACITY; } // 线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; }

Тип ctl AtomicInteger – это поле, которое управляет рабочим состоянием пула потоков и количеством допустимых потоков в пуле потоков. Оно содержит две части информации: рабочее состояние пула потоков (runState) и количество допустимых потоков. потоки в пуле потоков ( workerCount),Старшие 3 бита сохраняют runState,Нижние 29 бит содержат workerCount., две переменные не мешают друг другу. Использование одной переменной для хранения двух значений позволяет избежать несоответствий при принятии связанных решений.Нет необходимости занимать ресурсы блокировки, чтобы поддерживать согласованность двух. Изучив исходный код пула потоков, вы также обнаружите, что часто необходимо одновременно оценивать состояние выполнения пула потоков и количество потоков. Пул потоков также предоставляет пользователям несколько методов для получения текущего статуса выполнения и количества потоков в пуле потоков. Здесь используется метод битовой операции, который намного быстрее базовой операции.

Двоичные данные состояния пула потоков приведены в приведенном выше коде, который описан ниже.

  • RUNNING: можетПринимать новые отправленные задачи, а также может обрабатывать задачи в очередях блокировки.
  • SHUTDOWN: Неполноценный,Больше не принимать новые отправленные задачи, но можно продолжитьОбрабатывать сохраненные задачи в очередях блокировки.
  • STOP : Не могу принять новые задачи и не обрабатывать ихЗадачи в очереди прерывают поток, обрабатывающий задачу.
  • TIDYING: Все задачи выполненыпрекращение, workerCount (количество эффективных потоков) равен 0.
  • TERMINATED: существуетПосле выполнения метода terminated()войти в это состояние.

Процесс выполнения пула потоков

Основное внимание в этой статье.

Во-первых, все задачи планируютсяexecuteметод выполнен, эта часть выполненной работы: проверьте сейчасТекущий статус пула потоков, количество запущенных потоков и стратегия выполнения, который определяет процесс, который будет выполняться следующим, равенНепосредственно применять для выполнения потока, илиБуферизировать в очередь на выполнение, илисразу отказываться от задачи

img

Давайте взглянем непосредственно на исходный код, который более интуитивно понятен и впечатляет, а сам код не сложен.

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) { // 如果小于核心线程数
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // offer就是如果队列未满就添加到队列
    if (isRunning(c) && workQueue.offer(command)) { 
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 如果队列也满了,就直接起一个线程,失败走拒绝策略
    else if (!addWorker(command, false))
        reject(command);
}

 

Давайте взглянем на часть кода, связанного с надстройками, и удалим некоторые условные суждения.

 private boolean addWorker(Runnable firstTask, boolean core) {
     if (compareAndIncrementWorkerCount(c))
             break retry;  // 增加线程数,跳出循环
        try {
            w = new Worker(firstTask); //this.thread = getThreadFactory().newThread(this);
            final Thread t = w.thread; // 这里通过线程工厂new一个线程
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock(); // 独占锁-
                try {
                    int rs = runStateOf(ctl.get());// 获取线程池状态
                    if (rs < SHUTDOWN || // 线程池在运行或者shutdown
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);//  添加任务到阻塞队列
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start(); // 启动线程
                    workerStarted = true;
                }
            }
        return workerStarted;

Как вы можете видеть выше, t.start() включает планирование системных потоков, а затем следует методу run.

  public void run() {
      runWorker(this);
  }

Как вы можете видеть, runworker(this) выполняется следующим, и это та задача w, которая была только что добавлена.

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // 以独占的方式释放资源
        boolean completedAbruptly = true;
        try {
            // 如果task!= null,就getTask获取一个任务
            while (task != null || (task = getTask()) != null) {
                w.lock(); // 1.以独占额方式获得资源,忽略异常
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                // 2.可扩展:用于重新初始化 threadlocals 或者执行日志记录。
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();// 任务执行
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
             // 线程回收
            processWorkerExit(w, completedAbruptly);
        }
    }

Анализировать getTask

     boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();

Проще говоря, если основной поток может тайм-аут = true или задано текущее количество потоков > количество основных потоков, задача будет получена в течение ограниченного времени, иначе задача будет заблокирована.

Логика на самом деле очень простая, а некоторые вещи еще нужно тщательно проанализировать: например, в коде

Первая точка

1.w.lock()
2.public void lock()        { acquire(1); }
3.public final void acquire(int arg) { // class AbstractQueuedSynchronizer
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
  }

Вы можете видеть, что он вызывается прямо здесьЭксклюзивная блокировка AQS - честная блокировкаМетод реализации, и в потоке повторного использования processWorkerExit этот метод используетЭксклюзивная блокировка AQS — нечестная блокировка

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();
        // 默认使用非公平锁  new NonfairSync() 
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 锁内实现移除任务,同时也移除了Thread引用
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        // 尝试中断线程,如果线程池正在关闭,则关闭线程池
        tryTerminate();
​
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) { // 如果线程池没有停止
            if (!completedAbruptly) { // 没有异常结束
                // 线程池最小空闲数,允许core thread超时就是0,否则就是corePoolSize
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                // 如果min == 0但是队列不为空要保证有1个线程来执行队列中的任务
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                 // // 线程数不为空
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
             // 1.线程异常退出
            // 2.线程池为空,但是队列中还有任务没执行,看addWoker方法对这种情况的处理
            addWorker(null, false);
        }
    }

Кратко проанализируем процесс переработки нитей:

  1. метод блокировки один разприобрел эксклюзивный замокЭто указывает на текущий потокзадача в процессесередина.

  2. Если задача выполняется, тоне должен прерывать поток.

  3. Если нить сейчасне монопольный замокгосударство, то естьсостояние бездействия, указывая на то, что он не обрабатывает задачи, тоМожно прервать нить.

  4. Пул потоков выполняетсяshutdownметод илиtryTerminateметод будет вызыватьсяinterruptIdleWorkersпуть кПрервать незанятые потоки, метод interruptIdleWorkers будет использоватьtryLockметод судитьПул потоковсерединаЯвляется ли поток бездействующим; если поток простаивает, тоМожет быть безопасно переработан.

    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            // 这里可以看到只要线程数!=0,线程就可以被回收
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                // 这里看到进行了trylock判断
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        // 进行线程中断标识
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

Например: когда A thread lock(), он вызоветtryAcquire() берет блокировку исключительно и добавляет состояние+1. После этого другие потоки будут терпеть неудачу при попытке tryAcquire() до тех пор, пока поток A неразблокировать () в состояние = 0(то есть снять блокировку), другие потоки имеют возможность получить блокировку. Конечно, прежде чем снимать блокировку, поток A может неоднократно ее захватывать (состояние будет накапливаться), что является концепцией повторного входа. Но обратите внимание на то, сколько раз вам нужно его отпустить, чтобы убедиться, что состояние может вернуться в нулевое состояние.

тогдаCountDownLatchНапример, задача делится на N подпотоков для выполнения, и состояние также инициализируется равным N (обратите внимание, что N должно соответствовать количеству потоков). N подпотоков выполняются параллельно.После выполнения каждого подпотока countDown() выполняется один раз, и состояние уменьшает CAS на 1. Подожди покаПосле выполнения всех дочерних потоков (т. е. состояние = 0), **unpark()** основной вызывающий поток, а затем основной вызывающий поток вернется из функции await() и продолжит остальную часть действия.

Вообще говоря, настраиваемые синхронизаторы представляют собой либо эксклюзивные, либо общие методы, и им нужно реализовать только один из tryAcquire-tryRelease и tryAcquireShared-tryReleaseShared. Но AQS также поддерживает пользовательские синхронизаторы для реализации как эксклюзивных, так и общих методов, таких как ReentrantReadWriteLock.

отказ от задания

в пуле потоковКоличество потоков достигает максимумаPoolSizeКогда необходимо отклонить задачу, примените стратегию отклонения задачи, чтобы защитить пул потоков.

Политика отказа — это общедоступный интерфейс, что означает, что мы можем настроить расширение, и его дизайн выглядит следующим образом:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

Давайте рассмотрим несколько стратегий отказа, предоставляемых JDK:

img

Общее использование бизнес-потока: вызовите поток, который отправляет задачу в обработку (при условии, что все задачи выполнены)

ThreadPoolExecutor.CallerRunsPolicy

public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }
​
        /**
          调用者线程中执行任务r
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

деловая практика

Сценарий 1: быстро реагировать на запросы пользователей B/S

С точки зрения пользовательского опыта, чем быстрее ответ, тем лучше.Если страница не может обновляться в течение длительного времени, пользователь может отказаться от просмотра продукта. А агрегирование функций, ориентированных на пользователя, обычно очень сложное, с каскадными вызовами от вызова к вызову,Многоуровневый каскадВ других случаях студенты, изучающие бизнес-разработки, часто выбирают простой способ использования пулов потоков дляВызовы инкапсулируются в задачи для параллельного выполнения,сократитьобщийВремя отклика. Кроме того, также рассматривается использование пулов потоков. Самое главное в этом сценарии — получить максимальную скорость ответа для удовлетворения пользователей, поэтому не следует устанавливать очереди для буферизации параллельных задач.Увеличить ****corePoolSizeиmaxPoolSizeидти как можно дальшеСоздавайте несколько потоков для быстрого выполнения задач.

Сценарий 2: Быстрая обработка пакетных задач

Необходимо быстро выполнять большое количество автономных вычислительных задач. Например,Статистика для отчета, нам нужно рассчитать, какие товары в каждом магазине по стране имеют определенный атрибут для анализа последующих маркетинговых стратегий, затем нам нужно запросить все товары во всех магазинах страны, и записать товары с определенным атрибутом, и затем быстро создать отчет

эта сценанужно выполнить много заданий, мы также хотим, чтобы задача выполнялась как можно быстрее. В этом случае также следует использовать стратегию многопоточности,Параллельные вычисления. Однако отличие от сценария с приоритетом скорости ответа в том, что количество задач в этом типе сценария огромно, и его не нужно выполнять мгновенно.приоритет пропускной способностиПроблема. так и должно бытьНастройка очередей для буферизации параллельных задач,настроить правильноизcorePoolSizeчтобы установить количество потоков для обработки задачи. Здесь установитеСлишком много потоковнаверное все ещеВызывает частое переключение контекста потокапроблема, это также замедлит скорость обработки задач,уменьшить пропускную способность.

Сценарий 3: параметр очереди слишком длинный

так какНастройка очереди слишком длинная,Недопустимое максимальное количество потоков., когда количество запросов увеличивается, в очереди скапливается большое количество задач, время выполнения задачи слишком велико, и в конечном итоге большое количество нижестоящих служб вызывает сбои тайм-аута

Итак, как настроить конкретные значения?

img

Видно, что эти формулы расчета отклоняются от реальных бизнес-сценариев. Существует большая разница между интенсивным вводом-выводом и интенсивным использованием ЦП, но все они связаны с количеством ядер ЦП.Задачи с интенсивным вводом-выводом часто требуют, чтобы мы сделали параметры пула потоков динамическими, и все пулы потоков также очень дружелюбный.несколько публичных методов, чтобы мы могли динамически настроить пул потоковКоличество ядер потокаиМаксимальное количество потоковиразмер очереди блокировки.

В дополнение к ним, мы упоминали ранееполитика отказаиПредварительная обработкаиПост-задачная обработкаВсе они могут использоваться как расширение пула потоков. С помощью этих конфигураций мы можем реализовать динамическую настройку параметров пула потоков, выполнение задач, загрузку очереди, мониторинг, ведение журнала и многое другое.

Вот расширение предварительной/постобработки задачи для мониторинга:

public class TimingThreadPool extends ThreadPoolExecutor {
​
    public TimingThreadPool() {
        super(1, 1, 0L, TimeUnit.SECONDS, null);
    }
    private static final Logger logger = LoggerFactory.getLogger(TimingThreadPool.class);
    private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
    private final AtomicLong numTasks = new AtomicLong();
    private final AtomicLong totalTime = new AtomicLong();
​
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        logger.info(String.format("Thread %s: start %s", t, r));
        startTime.set(System.nanoTime());
    }
​
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        try {
            long endTime = System.nanoTime();
            long taskTime = endTime - startTime.get();
            numTasks.incrementAndGet();
            totalTime.addAndGet(taskTime);
            logger.info(String.format("Thread %s: end %s, time=%dns",
                    t, r, taskTime));
        } finally {
            super.afterExecute(r, t);
        }
    }
​
    @Override
    protected void terminated() {
        try {
            logger.info(String.format("Terminated: avg time=%dns",
                    totalTime.get() / numTasks.get()));
        } finally {
            super.terminated();
        }
    }
}

дальнейшее чтение

Обратите внимание, не потеряйтесь

Ну все,выше все содержание этой статьи.Люди которые здесь видят все таланты. Каждую неделю я буду обновлять несколько статей, связанных с интервью и общими стеками технологий интернет-компаний первой линии.Я очень благодарен талантам за то, что они это читают.Если эта статья хорошо написана, и если вы чувствуете, что в ней что-то есть, ставьте лайк 👍 Подписывайтесь ❤️ Делитесь 👥 Очень полезно для теплого человека! ! !

Если в этом блоге есть какие-либо ошибки, пожалуйста, критикуйте и советуйте, это очень ценится!