Личный технический блог:www.zhenganwen.top
Создать и запустить поток
Любой, кто знаком с Java, может легко написать следующий код:
public static class MyThread extends Thread {
@Override
public void run() {
System.out.println("MyThread is running...");
}
}
public static void main(String[] args) {
Thread t = new MyThread();
t.start();
}
жизненный цикл нити
Это основной вопрос, который часто задают в интервью.Вы обязательно должны ответить, что у потоков есть только пять состояний, а именно: новое состояние, состояние готовности, состояние выполнения, состояние блокировки и состояние завершения.
состояние готовности и состояние выполнения
Из-за алгоритма распределения временных интервалов планировщика неизвестно, как долго будет выполняться каждый выполняющийся поток, поэтому поток может переключаться между работающим и выполняющимся потоком.Поток в состоянии блокировки должен сначала войти в состояние готовности, прежде чем он сможет войти в состояние выполнения..
Состояние выполнения и состояние блокировки
Текущий поток активно вызываетThread.sleep()
,obj.wait()
,thread.join()
войдетTIMED-WAITING
илиWAITING
состояние и активно отказываться от права на выполнение ЦП. еслиTIMED-WAITING
, затем через определенный период времени он активно вернется и войдет в состояние Runnable, чтобы дождаться выделения временных интервалов.
thread.join()
Нижний уровень заключается в том, что текущий поток постоянно опрашиваетthread
Выжить или нет, если жив, то постоянноwait(0)
.
Если выполняющийся поток сталкивается с критическим разделом во время выполнения (synchronized
декорированный метод или кодовый блок) и блокировка, которую необходимо получить, занята другими потоками, то он будет активно приостанавливать себя и входитьBLOCKED
государство.
Заблокированное состояние и состояние готовности
Если поток, удерживающий блокировку, выйдет из критической секции, потоки, ожидающие блокировки, будут пробуждены и перейдут в состояние готовности, но только поток, захвативший блокировку, перейдет в состояние выполнения, а другие потоки, не захватившие блокировку все равно войдет в состояние блокировки.
Если поток вызываетobj
изnotify/notifyAll
метод, то когда поток выходит из критической секции (вызовwait/notify
должен сначала пройтиsynchronized
получает блокировку объекта), пробуждение ожидаетobj.wait
Поток выше перейдет из заблокированного состояния в состояние готовностиobj
изmonitor
, и хватайте толькоmonitor
Поток начнется сobj.wait
return, и поток, который его не захватил, все равно будет блокироватьсяobj.wait
начальство
состояние окончания
Поток в состоянии выполнения завершил выполнениеrun
метод или поток в заблокированном состоянииinterrupt
войдет в завершенное состояние, а затем будет уничтожен.
запустить анализ исходного кода
public synchronized void start() {
if (threadStatus != 0)
throw new IllegalThreadStateException();
group.add(this);
boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {}
}
}
private native void start0();
start
Метод в основном делает три вещи:
- Добавить текущий объект потока в группу потоков, к которой он принадлежит (группы потоков будут представлены позже)
- перечислить
start0
,Этоnative
в предыдущей статье «Как реализован поток Java? «В статье упоминается, что планирование потоков будет передано LWP, и запуск новых потоков здесь также относится к этой категории. Таким образом, мы можем предположить, что этот вызов JNI (Java Native Interface) создаст новый поток (LWP) и выполнит объект потока.run
метод - объект потока
started
статус установлен наtrue
Указывает, что он был активирован. Как сказал учитель, узнавая о нитях, нитьstart
Его можно вызвать только один раз, а повторные вызовы сообщат об ошибке через эту переменную.
Зачем вводить Runnable
Принцип единой ответственности
мы пройдемThread
Чтобы смоделировать такой сценарий: Банковский многооконный вызов. таким образом, думая, что уже естьThread
Почему мы должны представитьRunnable
В первую очередь нам нужен оконный поток для имитации процесса звонка на номер (окно звонит на номер, а клиент соответствующего номера переходит в соответствующее окно для обработки дела):
public class TicketWindow extends Thread {
public static final Random RANDOM = new Random(System.currentTimeMillis());
private static final int MAX = 20;
private int counter;
private String windowName;
public TicketWindow(String windowName) {
super(windowName);
counter = 0;
this.windowName = windowName;
}
@Override
public void run() {
System.out.println(windowName + " start working...");
while (counter < MAX){
System.out.println(windowName + ": It's the turn to number " + counter++);
//simulate handle the business
try {
Thread.sleep(RANDOM.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Затем напишите вызывающий клиент для имитации одновременного вызова четырех окон:
public class WindowThreadClient {
public static void main(String[] args) {
Stream.of("Window-1","Window-2","Window-3","Window-4").forEach(
windowName -> new TicketWindow(windowName).start()
);
}
}
Вы обнаружите, что один и тот же номер вызывается четыре раза, а это явно не то, что нам нужно. При нормальных обстоятельствах четыре окна должны совместно использовать одну вызывающую систему. Окно отвечает только за обработку дел, а вызовы должны передаваться вызывающей системе. Это типичный принцип единой ответственности в ООП.
Мы связываем нить и выполняемую задачу, поэтому возникает неловкая ситуация, описанная выше. Работа потока заключается в выполнении задачи, у него есть свое состояние времени выполнения, мы не должны ставить состояние задачи на выполнение (как в этом примереcounter
,windowName
), чтобы связать потоки вместе, а бизнес-логика должна извлекаться отдельно как логическая исполнительная единица, которую можно передать потоку, когда он должен быть выполнен. Так что естьRunnable
интерфейс:
public interface Runnable {
public abstract void run();
}
Следовательно, мы можем преобразовать предыдущий многооконный номер вызова:
public class TicketWindowRunnable implements Runnable {
public static final Random RANDOM = new Random(System.currentTimeMillis());
private static final int MAX = 20;
private int counter = 0;
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " start working...");
while (counter < MAX){
System.out.println(Thread.currentThread().getName()+ ": It's the turn to number " + counter++);
//simulate handle the business
try {
Thread.sleep(RANDOM.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Тестовый класс:
public class WindowThreadClient {
public static void main(String[] args) {
TicketWindowRunnable ticketWindow = new TicketWindowRunnable();
Stream.of("Window-1", "Window-2", "Window-3", "Window-4").forEach(
windowName -> new Thread(ticketWindow, windowName).start()
);
}
}
Таким образом, вы обнаружите, что повторных вызовов нет. Но эта программа не является потокобезопасной, потому что несколько потоков изменяются одновременно.windowRunnable
серединаcounter
переменные, поскольку в этом разделе в основном рассматриваютсяRunnable
роль, поэтому пока не будет обсуждаться.
Паттерны стратегии и функциональное программирование
будетThread
серединаrun
Еще одно преимущество представления через интерфейс заключается в том, что он дружелюбен к шаблону стратегии и функциональному программированию.
Во-первых, давайте кратко представим режим стратегии.Предположим, что нам сейчас нужно рассчитать НДФЛ работника, поэтому мы написали следующий класс инструментов, который можно вызвать, передав базовую зарплату и премию.calculate
Чтобы найти налог к уплате:
public class TaxCalculator {
private double salary;
private double bonus;
public TaxCalculator(double base, double bonus) {
this.salary = base;
this.bonus = bonus;
}
public double calculate() {
return salary * 0.03 + bonus * 0.1;
}
}
Что плохого в том, чтобы так писать? Мы выписали расчет налога к уплате:salary * 0.03 + bonus * 0.1
, а налоговая ставка непостоянна, и клиентам свойственно предлагать изменения спроса! Должны ли мы вручную изменять эту часть кода каждый раз, когда меняются требования?
В этот раз помогает шаблон стратегии: когда ввод наших требований неизменен, а вывод нужно скорректировать в соответствии с разными стратегиями, мы можем извлечь эту часть логики в интерфейс:
public interface TaxCalculateStrategy {
public double calculate(double salary, double bonus);
}
Реализация конкретной стратегии:
public class SimpleTaxCalculateStrategy implements TaxCalculateStrategy {
@Override
public double calculate(double salary, double bonus) {
return salary * 0.03 + bonus * 0.1;
}
}
А бизнес-код просто вызывает интерфейс:
public class TaxCalculator {
private double salary;
private double bonus;
private TaxCalculateStrategy taxCalculateStrategy;
public TaxCalculator(double base, double bonus, TaxCalculateStrategy taxCalculateStrategy) {
this.salary = base;
this.bonus = bonus;
this.taxCalculateStrategy = taxCalculateStrategy;
}
public double calculate() {
return taxCalculateStrategy.calculate(salary, bonus);
}
}
будетThread
логическая исполнительная единица вrun
Извлечь в интерфейсRunnable
Он имеет тот же эффект. Поскольку в реальном бизнесе мы не можем предсказать задачи, которые должны быть переданы в поток для выполнения, после извлечения их в интерфейс это дает большую гибкость нашему приложению.
Кроме того, в JDK1.8 были введены функциональное программирование и лямбда-выражения, и шаблон стратегии также очень удобен для этой функции. Тем не менее с помощью приведенного выше примера, если правило расчета становится(salary + bonus) * 1.5
, возможно, нам нужно добавить новый класс политики:
public class AnotherTaxCalculatorStrategy implements TaxCalculateStrategy {
@Override
public double calculate(double salary, double bonus) {
return (salary + bonus) * 1.5;
}
}
После того, как JDK добавит синтаксический сахар для внутренних классов, можно использовать анонимные внутренние классы, чтобы сэкономить накладные расходы на создание новых классов:
public class TaxCalculateTest {
public static void main(String[] args) {
TaxCalculator taxCalaculator = new TaxCalculator(5000,1500, new TaxCalculateStrategy(){
@Override
public double calculate(double salary, double bonus) {
return (salary + bonus) * 1.5;
}
});
}
}
Но после добавления функционального программирования в JDK он может быть более кратким и ясным:
public class TaxCalculateTest {
public static void main(String[] args) {
TaxCalculator taxCalaculator = new TaxCalculator(5000, 1500, (salary, bonus) -> (salary + bonus) * 1.5);
}
}
Эта пара имеет только один абстрактный методrun
изRunnable
То же самое относится и к интерфейсам.
Конструирование объектов Thread, некоторые вещи, о которых вы могли не знать
ПроверятьThread
метод построения, восходящий кinit
Способ (слегка нарезанный):
Thread parent = currentThread();
if (g == null) {
if (g == null) {
g = parent.getThreadGroup();
}
}
this.group = g;
this.daemon = parent.isDaemon();
this.priority = parent.getPriority();
this.target = target;
setPriority(priority);
this.stackSize = stackSize;
tid = nextThreadID();
-
g
текущий объектThreadGroup
,2~8
заключается в установке группы потоков, к которой принадлежит текущий объект, если вnew Thread
явно не указан, то по умолчанию будет родительский поток (в настоящее время выполняющийсяnew Thread
Thread) группа потоков устанавливается в свою собственную группу потоков. -
9~10
Line, наследует от родительского потока два состояния: является ли поток демоном и каков приоритет. Конечно, вnew Thread
затем может пройтиthread.setDeamon
илиthread.setPriority
чтобы настроить -
12
хорошо, если прошелnew Thread(Runnable target)
способ создать поток, а затем получить входящийRunnable target
, вызывается при запуске потокаrun
будет выполняться не пустойtarget
изrun
метод. Теоретически существует три способа создания потока:- выполнить
Runnable
интерфейсMyRunnable
,пройти черезnew Thread(myRunnable)
воплощать в жизньMyRunnable
серединаrun
- наследовать
Thread
и переписатьrun
,пройти черезnew MyThread()
выполнить перезаписьrun
- наследовать
Thread
и переписатьrun
, все еще можно передать конструкторуRunnable
Экземпляр класса реализации:new MyThread(myRunnable)
, а только выполняетMyThread
переписан вrun
, не повлияетmyRunnable
любое воздействие. В этом способе создания тредов много неясностей, за исключением того, что интервьюер может вас смутить, использовать этот способ не рекомендуется.
- выполнить
-
Установите приоритет потока, всего существует 10 уровней приоритета, соответствующих значению
[0,9]
, чем больше значение, тем выше приоритет. Однако этот параметр зависит от платформы, что означает, что он может быть действительным в некоторых операционных системах и недействительным в некоторых операционных системах, поскольку потоки Java напрямую отображаются на потоки ядра, поэтому конкретное планирование по-прежнему зависит от операционной системы. . -
Установите размер стека. Этот размер относится к размеру памяти стека, а не к максимальному количеству кадров стека, которое может содержать стек.Вызов и возврат каждого метода соответствуют процессу передачи и извлечения кадра стека из стека виртуальной машины потока в стек. стек, как описано в следующем разделе. Этот параметр будет введен в . Чтобы узнать о стеке виртуальных машин, обратитесь к Главе 2 книги «Углубленное понимание виртуальной машины Java (второе издание)».
-
установить нить
ID
, — уникальный идентификатор потока. Например, когда смещенная блокировка смещена в сторону потока, она будет отображаться в заголовке объекта.Mark Word
Идентификатор потока хранится в (предвзятые блокировки можно найти в главе 5 «Искусство параллельного программирования» и «Углубленное понимание виртуальных машин Java»).пройти через
nextThreadID
найдетstatic synchronized
метод атомарного получения серийного номера потокаthreadSeqNumber
Увеличенное значение:public static void main(String[] args) { new Thread(() -> { System.out.println(Thread.currentThread().getId()); //11 }).start(); }
Почему
main
Идентификатор первого потока, созданного в JVM, равен 11 (это означает, что он является 11-м потоком, созданным после запуска JVM)? Это потому, что JVM выполняетmain
запустит первый поток процесса JVM (называемыйmain
thread) и запустит некоторые потоки демона, такие как потоки GC.
Многопоточность и структура памяти JVM
Структура памяти JVM
Здесь следует отметить, что каждый поток имеет собственный стек виртуальной машины. Стеки всех потоков хранятся в области стека виртуальной машины области данных времени выполнения JVM.
структура памяти кадра стека
параметр stackSize
Thread
обеспечивает настраиваемыйstackSize
Перегруженный конструктор:
public Thread(ThreadGroup group,
Runnable target,
String name,
long stackSize)
Официальная документация описывает этот параметр следующим образом:
The stack size is the approximate number of bytes of address space that the virtual machine is to allocate for this thread's stack. The effect of the stackSize parameter, if any, is highly platform dependent.
вы можете указатьstackSize
Параметр приблизительно указывает размер памяти стека виртуальной машины (Уведомление: это размер памяти, то есть количество байтов, а не максимальное количество кадров стека, которое может быть размещено в стеке, и этот размер относится к размеру стека потока, а не к размеру всей виртуальной машины. область стека). И этот параметр имеет высокую степень зависимости от платформы, то есть на каждой операционной системе один и тот же параметр показывает разные эффекты.
On some platforms, specifying a higher value for the
stackSize
parameter may allow a thread to achieve greater recursion depth before throwing aStackOverflowError
. Similarly, specifying a lower value may allow a greater number of threads to exist concurrently without throwing anOutOfMemoryError
(or other internal error). The details of the relationship between the value of thestackSize
parameter and the maximum recursion depth and concurrency level are platform-dependent. On some platforms, the value of the stackSize parameter may have no effect whatsoever.
На некоторых платформах дляstackSize
Указание большего значения позволяет потоку достичь большей глубины рекурсии, прежде чем генерировать исключение переполнения стека (поскольку размер кадра стека метода известен во время компиляции. Взяв в качестве примера таблицу локальных переменных, только переменные базового типа имеютlong
а такжеdouble
Он занимает 8 байт, а остальные обрабатываются как 4 байта.Ссылочный тип занимает 4 байта или 8 байтов в зависимости от того, является ли виртуальная машина 32-разрядной или 64-разрядной. В этом случае, чем больше стек, тем больше максимальное количество кадров стека, которое может вместить стек, то есть тем больше глубина рекурсии). Аналогично, задав меньшееstackSize
Это может позволить большему количеству потоков сосуществовать и избежать исключений OOM (некоторые читатели могут XOR, почему нелегко генерировать исключения OOM, когда стек мал? Разве стек не должен быть меньше, памяти недостаточно, и OOM проще На самом деле в однопоточной среде без OOM может произойти только переполнение стека, потому что размер кадра стека, соответствующий каждому методу, может быть известен компилятору.При запуске потока часть памяти будет разделена из площадь стека виртуальной машины соответствует размеру стека. Поэтому, если будет отправлено слишком много кадров стека или слишком много кадров стека для отправки, это приведет только к тому, что стек больше не будет вмещать кадр стека и приведет к переполнению стека. когда OOM будет отброшен?Для области стека виртуальной машины, если не хватит памяти для разделения в качестве стековой памяти нового потока, OOM будет отброшен.Нетрудно понять, что осталась ограниченная память процесса после удаления памяти кучи, области методов и памяти, необходимой для самой JVM. Стек виртуальной машины ограничен, чем меньше выделено каждому стеку, тем больше потоков может сосуществовать естественным образом). Наконец, на некоторых платформах, независимо отstackSize
Независимо от того, что он установлен, вероятно, ничего не сделает.
The virtual machine is free to treat the
stackSize
parameter as a suggestion. If the specified value is unreasonably low for the platform, the virtual machine may instead use some platform-specific minimum value; if the specified value is unreasonably high, the virtual machine may instead use some platform-specific maximum. Likewise, the virtual machine is free to round the specified value up or down as it sees fit (or to ignore it completely).
Виртуальные возможности будутstackSize
Как предложение, есть еще определенное мнение в настройке размера стека. Если заданное значение слишком мало, виртуальная машина установит размер стека на минимальный размер стека, соответствующий платформе; соответственно, если заданное значение слишком велико, будет установлен максимальный размер стека, соответствующий платформе. Кроме того, виртуальная машина может округлить заданное значение вверх или вниз, чтобы установить подходящий размер стека (даже виртуальная машина игнорирует его).
Due to the platform-dependent nature of the behavior of this constructor, extreme care should be exercised in its use. The thread stack size necessary to perform a given computation will likely vary from one JRE implementation to another. In light of this variation, careful tuning of the stack size parameter may be required, and the tuning may need to be repeated for each JRE implementation on which an application is to run.
Из-за того, что этот конструктор зависит от платформы, его необходимо использовать с особой осторожностью. Правила вычисления фактического размера стека потоков будут вести себя по-разному в зависимости от реализации JVM. Учитывая это изменение, может потребоваться тщательная настройка параметра размера стека, и, возможно, потребуется настроить его по-разному для разных реализаций JVM, используемых приложением.
Implementation note: Java platform implementers are encouraged to document their implementation's behavior with respect to the
stackSize
parameter.
Глубина вызова метода при переполнении стека, если stackSize не указан:
public class StackSizeTest {
public static int counter = 0;
public static void main(String[] args) {
new Thread(() -> {
try {
count();
} catch (StackOverflowError e) {
System.out.println(counter); // result -> 35473
}
}).start();
}
public static void count() {
counter++;
count();
}
}
Укажите размер стека как 10 КБ.
Явно указатьstackSize
После этого существенно влияет размер стека потоков, а глубина вызова увеличивается на исходную.35473
стал296
:
public class StackSizeTest {
public static int counter = 0;
public static void main(String[] args) {
new Thread(null,() -> {
try {
count();
} catch (StackOverflowError e) {
System.out.println(counter);
}
},"test-stack-size",10 * 1024).start(); //stackSize -> 10KB result -> 296
}
public static void count() {
counter++;
count();
}
}
Измените размер кадра стека, изменив размер локальных переменных
Чтобы изменить размер кадра стека, это можно сделать, добавив локальные переменные. следующее, добавив несколькоlong
Переменные (одна занимает 8 байт), по сравнению с предыдущим тестом значительно уменьшена глубина вызова метода:
public class StackSizeTest {
public static int counter = 0;
public static void main(String[] args) {
new Thread(null,() -> {
try {
count();
} catch (StackOverflowError e) {
System.out.println(counter);
}
},"test-stack-size",10 * 1024).start(); //stackSize -> 10KB result -> 65
}
public static void count() {
long a,b,c,d,e,f,g,h,j,k,l,m,n,o,p,q;
counter++;
count();
}
}
Потоки демона и сценарии их использования
пройти черезthread.setDaemon(true)
Вновь созданный поток может быть установлен как поток демона, который должен быть выполнен до запуска потока (thread.start
) настройка действительна.
- Характеристика потока демона состоит в том, что когда его родительский поток завершается, поток демона также будет уничтожен.
- JVM завершит работу только тогда, когда завершится последний поток, не являющийся демоном.
Обнаружение сердцебиения
В кластерной архитектуре обычно требуется механизм обнаружения пульса. Если приложение открывает поток, не являющийся демоном, для обнаружения сердцебиения, может случиться так, что основная программа приложения будет завершена, но поток обнаружения сердцебиения все еще работает.В это время JVM будет продолжать занимать систему, потому что все еще не потоки демона работают.процессор, ресурсы памяти, чего явно не должно быть.
Следующий код просто имитирует этот сценарий:
public class HeartCheck {
public static void main(String[] args) {
// worker thread
new Thread(()->{
// start the heart-check thread first
Thread heartCheck = new Thread(()->{
// do interval-automatic heart check and notify the parent thread when heart check has error
while (true) {
System.out.println("do heart check");
try {
Thread.sleep(100); //interval
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
heartCheck.setDaemon(true);
heartCheck.start();
// simulate work
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
Подробное объяснение метода соединения
Анализ исходного кода
Перейдите непосредственно к исходному коду:
public final synchronized void join(long millis) throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
Если поток вызываетсяthread
изjoin()
, будут распределены поjoin(0)
, выполнить вышеуказанное10~12
ОК, пока текущий поток получает правильное выполнение ЦП, он будет опрашиватьthread
статус выполнения (isAlive
Являетсяnative
метод, но мы можем предположить, что его роль заключается в обнаруженииthread
Жив ли он, т.е. нетTerminated
статус), когда-то найденныйthread
Если он все еще жив, он освободит право на выполнение процессора (черезwait(0)
метод), дождитесь следующего раунда опроса, покаthread
в завершенное состояние, то текущий поток будетthread.join()
вернуть.
Обязательно четко различайте, звонитеthread.join()
Текущий поток заблокирован, он не будетthread
Нити не имеют никакого эффекта.
join
Предоставляет перегруженный метод ожидания с ограничением по времени (это классическая модель ожидания с тайм-аутом: возврат только в том случае, если условие выполнено или лимит времени ожидания превышен), что также позволяет избежать попадания текущего потока в ловушку вечной дилеммы ожидания. , который может ждать некоторое время. Автоматически возвращается после обнаружения того, что целевой поток не завершил выполнение.
join
Одним из наиболее интересных моментов является то, что если поток вызывает свой собственныйjoin
метод, то поток будет бесконечнымwait
вниз, потому что:Thread.currentThread().join()
Будет ждать, пока текущий поток завершит выполнение, и текущий поток вызывает текущий потокjoin
То есть, когда текущий поток завершает выполнение... пусть играет медленно сам~
присоединиться к сценариям использования
Выполняйте задания шаг за шагом
Например, может потребоваться обработка журналов поведения пользователей на веб-сайтах электронной коммерции посредством агрегирования, проверки, анализа, классификации и т. д. и, наконец, их сохранение в базе данных. И выполнение этих шагов должно быть пошаговой обработкой, затем шаг должен дождаться окончания предыдущего шага, чтобы получить результат и запуститься, тогда его можно использовать.join
Сделай это.
Следующий код просто имитирует этот сценарий:
public class StepByStep {
public static void main(String[] args) throws InterruptedException {
Thread step1 = new Thread(() -> {
System.out.println("start capture data...");
//simulate capture data
try {
Thread.sleep(1000);
System.out.println("capture done.");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
step1.start();
Thread step2 = new Thread(() -> {
try {
step1.join();
System.out.println("start screen out the data...");
Thread.sleep(1000);
System.out.println("screen out done.");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
step2.start();
Thread step3 = new Thread(() -> {
try {
step2.join();
System.out.println("start analyze the data...");
Thread.sleep(1000);
System.out.println("analyze done.");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
step3.start();
Thread step4 = new Thread(() -> {
try {
step3.join();
System.out.println("start classify the data");
Thread.sleep(1000);
System.out.println("classify done.");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
step4.start();
step4.join();
System.out.println("write into database");
}
}
Стоит отметить, что если вы вызываетеjoin
, который немедленно вернется:
public class StepByStep {
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(() -> {
});
t.join();
}
}
Модель разветвления/соединения
Иногда количество задач слишком велико и задачи являются разделимыми (между подзадачами нет зависимости), тогда мы могли бы также разделить задачу на несвязанные подзадачи (этот шаг называетсяFork
), назначить отдельный поток для каждой подзадачи, чтобы реализовать параллельное выполнение подзадач, повысить эффективность выполнения и, наконец, интегрировать результаты каждой подзадачи для окончательной обработки (основной поток может использоватьjoin
дождаться результатов выполнения каждого потока подзадачи, чтобы, наконец, сделать сводку). Предоставлено JDK8Stream
а такжеForkJoin
Рамки имеют эту модель рисунка.
Аномальное восприятие
мы можем пройтиjoin
Ограниченное по времени ожидание, обеспечиваемое перегруженным методом .
Подробное объяснение прерывания
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(this);
return;
}
}
interrupt0();
}
Вот деталь,interrupt
Сначала устанавливается флаг прерывания потока, а затем он прерывается.
Ознакомьтесь с официальной документацией:
If this thread is blocked in an invocation of the
wait()
,wait(long)
, orwait(long, int)
methods of theObject
class, or of thejoin()
,join(long)
,join(long, int)
,sleep(long)
, orsleep(long, int)
, methods of this class, then its interrupt status will be cleared and it will receive anInterruptedException
.If none of the previous conditions hold then this thread's interrupt status will be set.
Interrupting a thread that is not alive need not have any effect.
Отсюда мы можем извлечь три части информации:
-
Timed-Waiting/Waiting
После того, как поток будет прерван, он сначала очистит свой флаг прерывания, а затем выброситInterruptedException
. Итак, прерванная нить входит - это работает (
Runnable/Running
) не будет прерван, но будет установлен его флаг прерывания, т.isInterrupted
вернусьtrue
- Вызывается в потоке в завершенном состоянии
interrupt
не будет иметь никакого эффекта.
isInterrupted
Tests whether this thread has been interrupted. The interrupted status of the thread is unaffected by this method.
A thread interruption ignored because a thread was not alive at the time of the interrupt will be reflected by this method returning false.
Проверьте, был ли поток прерван, вызов этого метода не изменит флаг прерывания потока. Вызывается в потоке в завершенном состоянииinterrupt
не приводит к возврату методаtrue
.
Таким образом, мы можем использоватьisInterrupted
Давайте проверим 3 вывода, сделанные выше:
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t1.start();
t1.interrupt();
System.out.println(t1.isInterrupted()); //true
Thread.sleep(1000);
System.out.println(t1.isInterrupted()); //false
}
Приведенный выше код находится вt1.interrupt
проверить сразу послеt1
бит флага прерывания из-заinterrupt
заключается в том, чтобы сначала установить бит флага прерывания, а затем прерывание, поэтому17
Выход строки обнаружил, что бит флага прерывания возвращаетсяtrue
;тогда18~19
сначала подождиt1
метаниеInterruptedException
Когда флаг сброшен, обнаруживается флаг прерывания и возвращается.false
Докажите вывод 1: выбросьтеInterruptedException
Первым сбрасывается бит флага прерывания.
static volatile boolean flag = true;
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
while (flag) {
}
});
t1.start();
t1.interrupt();
System.out.println(t1.isInterrupted()); //true
flag = false;
t1.join();
System.out.println(t1.isInterrupted()); //false
}
interrupted
Текущий поток не прерывается, но его флаг прерывания установлен, поэтому первый10
возврат линииtrue
. посредством13
По выходу строки мы также можем сделать новый вывод: даПоток в завершенном состоянииперечислитьisInterrupted
всегда будет возвращатьсяfalse
.
interrupted
Это статический метод обнаружениятекущий потокпрерывается, но сisInterrupted
отличается, его вызов приведет к сбросу флага прерывания текущего потока иisInterrupted
является методом экземпляра. То есть, если два последовательных вызова сделаныThread.interrupted
, второй раз обязательно вернусьfalse
.
static volatile boolean flag = true;
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
while (flag) {
}
System.out.println(Thread.currentThread().isInterrupted()); //true
System.out.println(Thread.interrupted()); //true
System.out.println(Thread.interrupted()); //false
});
t1.start();
t1.interrupt();
flag = false;
}
Как изящно завершить поток
stop
Thread
Есть устаревший методstop
, причина устаревания в том, что этот метод похож наlinux
серединаkill -9
Этот метод принудительно завершает поток немедленно, не давая ему шанса передохнуть, а это означает, что в наполовину выполненной программе внезапно больше нет текста, и если поток открывает ресурсы, такие как ввод-вывод и соединения с базой данных, они не будут быть освобождены вовремя.
Использование потоков демона и объединений
Поток демона завершается, когда завершается его родительский поток, поэтому мы можем косвенно завершить его, установив поток как поток демона, контролируя, когда завершается его родительский поток:
public class ThreadService {
private Thread executeThread;
private volatile boolean finished;
public void execute(Runnable task) {
executeThread =new Thread(() -> {
Thread t = new Thread(() -> {
task.run();
});
t.setDaemon(true);
t.start();
try {
t.join();
finished = true;
} catch (InterruptedException e) {
System.out.println("task execution was interrupted");
}
});
executeThread.start();
}
public void shutdown(long millis) {
long base = System.currentTimeMillis();
long now = 0;
while (!finished) {
now = System.currentTimeMillis() - base;
if (now >= millis) {
System.out.println("task execution time out, kill it now");
executeThread.interrupt();
break;
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
System.out.println("was interrupted when shutdown");
}
}
finished = true;
}
}
В приведенном выше коде это можно сделать, указавshutdown
проходить вtask
срок реализации, требующий, чтобы он былmillis
Он выполняется в течение времени. Если оно превышает это время, оно рассматривается как исключение выполнения задачи и завершается завершением его родительского потока. Если он выполняется нормально, вmillis
вернуть в течение времени, что также приведет к концу родительского потока,shutdown
голосованиеfinished
состояние, чтобы ощутить окончание выполнения задачи.
Использование общих переменных состояния
public class ThreadCloseGraceful implements Runnable{
private volatile boolean stop = false;
@Override
public void run() {
while (true) {
if (stop) {
break;
}
// to do here
}
}
public void shutdown() {
stop = true;
}
}
Смысл этого подхода в том, что общие переменные состояния должны быть объявлены какvolatile
, чтобы исполнительный поток мог его вовремя воспринятьshutdown
Заказ.
флаг прерывания опроса
Команда прерывания из внешнего мира воспринимается путем опроса бита флага прерывания потока.
public class ThreadCloseGraceful extends Thread{
@Override
public void run() {
while (true) {
if (Thread.interrupted()) {
break;
}
// to do here
}
}
public void shutdown() {
this.interrupt();
}
}
suspend/resume
resume/suspend
Основная причина отказа от поддержки заключается в том, чтоsuspend
Приостановка потока не освобождает общие ресурсы, которые он содержит, если поток удерживает одну или несколько блокировок и выполняетsuspend
, то все потоки, ожидающие снятия блокировки или этих блокировок, будут заблокированы на длительное время. если ты окажешьсяresume
Нить подвешенной нити тоже должна заранее приобрести эти замки, тогдаresume
Потоки также блокируются, что может привести кsuspend
Ни один поток не проснется, и эти потоки будут заблокированы навсегда.
Следовательно, в параллельном сценарии для критических секцийsuspend
а такжеresume
Потоки противостоят друг другу, независимо от того, кто первым войдет в критическую секцию, это приведет к тому, что эти два потока или даже несколько потоков попадут в тупик.
Подробное объяснение синхронизированного
synchronized
Гарантируется, что выполнение синхронизированного кода в многопоточной среде сериализуется.
Синхронизированное использование ключевых слов
- Если используется метод экземпляра, поток сначала получает метод при входе в метод (критическая секция).
this
объектmonitor
(то есть то, что мы обычно называем замком, термин — монитор),monitor
Одновременно он может удерживаться только одним потоком. Если получение не удается, он будет находиться в заблокированном состоянии (BLOCKED) до тех пор, пока блокировка не будет снята (поток, удерживающий блокировку, не выйдет из метода/критического раздела), и поток будет присоединиться к новому витку замков в стремлении - Если используется в статических методах, вам нужно получить текущий класс
Class
объектmonitor
, логика получения-снятия блокировки такая же, как и у методов экземпляра. - Используемые на блоках кода (блоки кода еще можно назвать критическими секциями), первые два — это семантика самого JDK, подразумевающая заблокированные объекты. Для кодовых блоков вам необходимо
synchronized
Явно укажите объект синхронизации после круглых скобок, и логика получения-снятия блокировки останется прежней.
Особенности синхронизированного ключевого слова
-
Если блокировка не может быть получена, поток, заблокированный на блокировке, будет разбужен, когда блокировка будет снята, что заставит поток переключиться из режима пользователя в режим ядра.Накладные расходы времени относительно велики, даже больше, чем фактические накладные расходы на выполнение кода критической секции. Поэтому, в принципе, для уменьшения
synchronized
Однако с обновлением JDK появились такие оптимизации, как спин-блокировки, адаптивные спины, устранение блокировок, огрубление блокировок, предвзятые блокировки и упрощенные блокировки (см. «Углубленное понимание виртуальной машины Java (второе издание)»). "Глава высокого параллелизма")synchronized
Стоимость на самом деле не такая уж и большая. -
Реентерабельный, если текущий поток уже содержит
monitor
, при повторном входе требуетmonitor
В критической секции вы можете войти напрямую, минуя этап получения блокировки. -
Поток может содержать несколько
monitor
.Уведомление, эта операция может легко привести к взаимоблокировке, следующий код имитирует этот сценарий:public class DeadLock { public static Object lock1 = new Object(); public static Object lock2 = new Object(); public static void main(String[] args) { IntStream.rangeClosed(0,19).forEach(i->{ if (i % 2 == 0) { new Thread(() -> m1()).start(); } else { new Thread(() -> m2()).start(); } }); } public static void m1() { synchronized (lock1) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (lock2) { System.out.println(Thread.currentThread().getName()); } } } public static void m2() { synchronized (lock2) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (lock1) { System.out.println(Thread.currentThread().getName()); } } } }
Приведенный выше код имеет высокую вероятность взаимоблокировки, но оперативной информации не будет. мы можем пройти
jps/jstack
Проверьте статус потока:C:\Users\zaw>jps 2864 5664 Jps 4072 Launcher 2172 DeadLock C:\Users\zaw>jstack 2172 "Thread-1" #12 prio=5 os_prio=0 tid=0x0000000018c71800 nid=0x8f0 waiting for monitor entry [0x00000000196cf000] java.lang.Thread.State: BLOCKED (on object monitor) at deadlock.DeadLock.m2(DeadLock.java:47) - waiting to lock <0x00000000d6081098> (a java.lang.Object) - locked <0x00000000d60810a8> (a java.lang.Object) at deadlock.DeadLock.lambda$null$1(DeadLock.java:21) at deadlock.DeadLock?Lambda$3/1989780873.run(Unknown Source) at java.lang.Thread.run(Thread.java:748) "Thread-0" #11 prio=5 os_prio=0 tid=0x0000000018c70800 nid=0x944 waiting for monitor entry [0x00000000195cf000] java.lang.Thread.State: BLOCKED (on object monitor) at deadlock.DeadLock.m1(DeadLock.java:34) - waiting to lock <0x00000000d60810a8> (a java.lang.Object) - locked <0x00000000d6081098> (a java.lang.Object) at deadlock.DeadLock.lambda$null$0(DeadLock.java:19) at deadlock.DeadLock?Lambda$2/999966131.run(Unknown Source) at java.lang.Thread.run(Thread.java:748)
Автор опускает статус других потоков, и после анализа причины взаимоблокировки пары потоков остальные 18 потоков аналогичны. Прежде всего
9
а также18
Две строки указывают на то, что два потока указывают на то, что поток заблокирован, поскольку он не может получить блокировку объекта.BLOCKED
государство.11~12
линия в деталяхThread-1
Ожидание получения адреса памяти в0x00000000d6081098
Блокировка объекта , уже удерживаемая по адресу памяти0x00000000d60810a8
замок объекта.20~21
Эта же линия указываетThread-0
ждать0x00000000d60810a8
объект, уже полученный0x00000000d6081098
Блокировка объекта. Видно, что все они ждут, пока другая сторона снимет блокировку без блокировки мозга, поэтому они попадают в тупик.существует
jstack
После перечисления состояния каждого потока JVM мы также анализируем взаимоблокировку для нас:Found one Java-level deadlock: ============================= "Thread-19": waiting to lock monitor 0x0000000018c5a398 (object 0x00000000d60810a8, a java.lang.Object), which is held by "Thread-1" "Thread-1": waiting to lock monitor 0x0000000018c58d98 (object 0x00000000d6081098, a java.lang.Object), which is held by "Thread-0" "Thread-0": waiting to lock monitor 0x0000000018c5a398 (object 0x00000000d60810a8, a java.lang.Object), which is held by "Thread-1"
Мы также можем использовать встроенный в JDK инструмент мониторинга производительности JVM JConsole для более интуитивного анализа состояния потока:
C:\Users\zaw>jps 2864 6148 Jps 4072 Launcher 2172 DeadLock C:\Users\zaw>jconsole 2172
В открывшемся окне инструмента будет задан вопрос, доверять ли небезопасному соединению, нажмите «Да», чтобы войти. После входа вы можете просмотреть статус каждого потока через панель потоков, нажать анализ взаимоблокировок, он проанализирует для нас, какие потоки в текущем процессе JVM заблокированы и почему:
Базовая реализация синхронизированного
Чтобы понять, почему потоки имеют механизмы захвата-освобождения блокировки при выполнении критических разделов (включая синхронизированные методы и синхронизированные блоки кода), нам нужно знать, какие инструкции JVM генерирует это ключевое слово после компиляции.
Сначала мы пишем синхронизированный метод и синхронизированный блок соответственно и тестируем их по отдельности.synchronized
Каков эффект на уровне байт-кода:
public class SynchronizedTest{
public synchronized void m1(){
System.out.println("sync method");
}
Object lock = new Object();
public void m2(){
synchronized(lock){
System.out.println("sync block");
}
}
}
затем используйтеjavac
Компилировать, так как скомпилированный файл байткода представляет собой бинарный поток байтов, нам неудобно его просматривать (JVM удобно просматривать), поэтому нам также нужно использоватьjavap
Преобразуйте его в понятный нам контент (см. формат файла класса в «Углубленное понимание виртуальной машины Java (второе издание)» для формата байт-кода). Чтобы позаботиться о читателях, которые не знакомы с этой частью , автор сделал удалить, только фокусsynchronized
Полученный эффект:
C:\Users\zaw>cd Desktop
C:\Users\zaw\Desktop>javac SynchronizedTest.java
C:\Users\zaw\Desktop>javap -verbose SynchronizedTest.class
public class SynchronizedTest
minor version: 0
major version: 52
flags: ACC_PUBLIC, ACC_SUPER
Constant pool:
# 这里省去了常量池部分
{
java.lang.Object lock;
descriptor: Ljava/lang/Object;
flags:
public synchronized void m1();
descriptor: ()V
flags: ACC_PUBLIC, ACC_SYNCHRONIZED
Code:
stack=2, locals=1, args_size=1
0: getstatic #4 // Field java/lang/System.out:Ljava/io/PrintStream;
3: ldc #5 // String sync method
5: invokevirtual #6 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
8: return
LineNumberTable:
line 4: 0
line 5: 8
public void m2();
descriptor: ()V
flags: ACC_PUBLIC
Code:
stack=2, locals=3, args_size=1
0: aload_0
1: getfield #3 // Field lock:Ljava/lang/Object;
4: dup
5: astore_1
6: monitorenter
7: getstatic #4 // Field java/lang/System.out:Ljava/io/PrintStream;
10: ldc #7 // String sync block
12: invokevirtual #6 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
15: aload_1
16: monitorexit
17: goto 25
20: astore_2
21: aload_1
22: monitorexit
23: aload_2
24: athrow
25: return
Exception table:
from to target type
7 17 20 any
20 23 20 any
}
SourceFile: "SynchronizedTest.java"
Хотя приведенный выше код выглядит длинным, нам нужно сосредоточиться только на двух моментах:
- В сравнении
20
линия и33
Хорошо, вы найдете метод синхронизацииm1
чем асинхронные методыm2
изflags
один дополнительныйACC_SYNCHRONIZED
, поэтому, когда поток входит в метод синхронизации, если он обнаруживает методflags
ВключатьACC_SYNCHRONIZED
, то поток попытается получитьthis
или класса, в котором находится методClass
экземпляр (это зависит от того, является ли метод методом экземпляра или статическим методом), т.е. метод синхронизацииsynchronized
Семантика через флаги методаACC_SYNCHRONIZED
Для достижения этого процесс синхронизации является неявным (объект синхронизации указывается JVM, а снятие блокировки выполняется JVM). - посмотри снова
40~49
строку, находим, что она нам дает внутри блока synchronizedSystem.out.println("sync block")
Добавлен один до и послеmonitorenter
с однимmonitorexit
, что соответствует захвату-освобождению блокировки.Эта семантика синхронизации является явной.Объект синхронизации и критическая секция контролируются нами, что является более гибким, чем метод синхронизации.
Также стоит отметить, что выше49
Почему строка кода появляется сноваmonitorexit
? Это делается для того, чтобы в случае возникновения исключения во время выполнения блока синхронизированного кода блокировка, удерживаемая потоком, также могла быть снята до того, как возникнет исключение (без воздействия на другие потоки, ожидающие получения блокировки).
Как избежать тупика
После приведенного выше анализа понимание замков должно иметь более глубокое понимание. Итак, как избежать взаимоблокировки? Заблокированный поток не будет ни работать, ни продолжать использовать системные ресурсы, и наше приложение должно избегать этого.
- Избегайте потока, удерживающего несколько блокировок одновременно. Если поток пытается получить другие блокировки, когда он уже удерживает блокировку, то, как только получение завершится ошибкой, это неизбежно приведет к блокировке потока блокировкой, которую он уже имеет, а блокирование ресурсов синхронизации сопряжено с высокой степенью параллелизма и недружественно, и его следует избегать.
- Избегайте критических разделов, выполнение которых занимает слишком много времени. Если во время выполнения критической секции возникает исключение, обычно поток не может выйти из критической секции, не может вернуться вовремя и т. д. Это приведет к тому, что поток, удерживающий блокировку, будет откладывать освобождение блокировки.Если есть только много других потоков, ожидающих получения блокировки, эти потоки будут заблокированы в течение длительного времени. Мы стараемся, чтобы критический раздел был как можно меньше, и синхронизируем только тот код, в котором происходят гонки данных. В то же время, чтобы избежать попадания большого количества потоков в длительное ожидание из-за получения блокировки, следует использовать
Lock
изtryLock(millis)
Механизм ожидания тайм-аута: как только вы обнаружите, что время ожидания слишком велико, нет необходимости ждать все время, вы можете попытаться получить блокировку после выполнения других задач. Позже мы напишем блокировку, которая может автоматически возвращаться после ожидания тайм-аута для этой ситуации.
ждать/уведомлять и ждать набор
устарелsuspend/resume
После этого официально рекомендуется использоватьwait/notify
заменять. а такжеsuspend/resume
разное позиционирование,wait/notify
реализовано вObject
, метод, который могут вызывать все объекты. и вызывающий объектwait/notify
объектmonitor
.
Ниже приводится официальныйwait(millis)
Даны инструкции:
* This method causes the current thread (call it <var>T</var>) to
* place itself in the wait set for this object and then to relinquish
* any and all synchronization claims on this object. Thread <var>T</var>
* becomes disabled for thread scheduling purposes and lies dormant
* until one of four things happens: notify, notifyAll, interrupt, time out
называть объектobj
изwait
метод приведет к тому, что текущий поток выполнения будет помещенobj
в очереди(wait set
, гостиная нити) и освободите нить черезsynchronized
Все блокировки уже удерживаются, а затем освободить право выполнения ЦП на ожидание до тех пор, покаnotify/notifyAll
Уведомляется и вызывается другими потокамиinterrupt
Только когда прерывание или время ожидания превысит установленный предел времени, он перейдет в состояние готовности к борьбе за право выполнения ЦП.
Здесь следует отметить, что нить неnotify/notifyAll
Проснувшись, вы можете сразуwait
Возврат, после пробуждения, он только заставит поток войти в состояние готовности, чтобы бороться за право на выполнение ЦП, только для получения права на выполнение ЦП и получения всехwait
Блокировка может быть снята только сwait
return, иначе поток все еще будет блокироваться вwait
начальство.
использоватьwait/notify
, мы можем реализовать связь между потоками.
ждать/уведомлять классическую парадигму
официально даноwait/notify
Используется классическая парадигма:
synchronized (obj) {
while (<condition does not hold>)
obj.wait();
... // Perform action appropriate to condition
}
использоватьwhile
без использованияif
Причина в том, чтобы проснуться иwait
Возвращающийся поток должен постоянно проверять условия, с которыми он связан, поскольку его пробуждение может быть вызвано не тем, что другой поток целенаправленно пробуждает поток, чтобы уведомить поток.notify
случайное пробуждение,notifyAll
Одновременно захватить замок может только один из пробужденных и пробужденных потоков.wait
Возвращаемый поток имеет много неопределенности. Поскольку условия для каждого потока различны, необходимо провести опрос, чтобы определить, верны ли условия, прежде чемwhile
выходить.
Отсюда мы можем использоватьwait/notify
Реализуйте модель коммуникации производитель-потребитель:
public class ClassicForm {
private static String message;
private static Object lock = new Object();
public static void main(String[] args) {
Thread consumer = new Thread(() -> {
while(true){
synchronized (lock) {
while (message == null) { // wait for producing
try {
lock.wait();
} catch (InterruptedException e) {
System.out.println("consumer was broken");
return;
}
}
System.out.println("CONSUMER receive message : " + message);
message = null;
lock.notify();
}
}
});
Thread producer = new Thread(() -> {
synchronized (lock) {
for(int i = 0 ; i < 100 ; i++){
while (message != null) { // wait for consuming
try {
lock.wait();
} catch (InterruptedException e) {
System.out.println("producer was broken");
return;
}
}
message = "please the order, order-id is " + i;
lock.notify();
System.out.println("PRODUCER send the message : " + message);
}
}
});
consumer.start();
producer.start();
}
}
вы найдете здесьmessage
даже без добавленияvolatile
, потребитель может каждый раз точно получать изменения, внесенные производителем. Это сделаноsynchronized
изunlock
Инструкция и JMM (модель памяти Java) определяются совместно, и JMM будет подробно расширен позже.
ждать/уведомлять модель ожидания с ограничением по времени
Вышеприведенный код имеет очевидный недостаток, то есть, если производитель медленно создает сообщения, потребитель будет продолжатьwait
пока не появятся новые новости. Таким образом, ресурсы, занятые потоком-потребителем, не используются полностью. Можно ли установить ограничение времени ожидания потребителей? После того, как время ожидания превысит лимит,wait
Теперь сначала займитесь другими задачами, а затем прослушайте сообщения, созданные продюсером. Следующий код просто имитирует этот сценарий:
public class WaitTimeoutModel {
private static String message;
private static Object lock = new Object();
private static final long MAX_WAIT_LIMIT = 1000;
public static void main(String[] args) {
Thread consumer = new Thread(() -> {
synchronized (lock) {
while (true) {
long base = System.currentTimeMillis();
long now = 0;
while (message == null) {
now = System.currentTimeMillis() - base;
if (now >= MAX_WAIT_LIMIT) {
break; // exit wait
}
try {
lock.wait(MAX_WAIT_LIMIT);
} catch (InterruptedException e) {
System.out.println("consumer was broken");
}
}
if (message == null) {
System.out.println("CONSUMER exit wait, and do other things");
try { // simulate do other thing
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
System.out.println("CONSUMER receive the message : " + message);
message = null;
}
}
}
});
Thread producer = new Thread(() -> {
// prepare message is very slowly
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// notify consumer
synchronized (lock) {
message = "please handle the order, order-id is 5454656465";
lock.notify();
System.out.println("PRODUCER send the message : " + message);
}
});
consumer.start();
producer.start();
}
}
Суть в том, что на основе классической парадигмы в процессе опроса переменных состояния добавляется оценка времени ожидания (раздел14~17
строка), если найден заданный срок (здесьMAX_WAIT_LIMIT
), затем перестаньте ждать и займитесь чем-нибудь другим (стр.25~30
линии), наоборот, если вwait(MAX_WAIT_LIMIT)
В этот период, из-за напоминания о пробуждении производителя, также выскочит опрос (производитель обычно будит потребителя после производства сообщения) и входит в первый32~33
линия для потребления сообщений. Но в любом случае это конец логической исполнительной единицы для потребителя. Поскольку потребитель обычно работает с монитором 24 часа в сутки (while(true)
), поэтому в конце каждого исполнительного блока будет сбрасыватьсяbase
а такжеnow
(п.11~12
Ряд).
Эффект операции следующий:
CONSUMER exit wait, and do other things
PRODUCER send the message : please handle the order, order-id is 5454656465
CONSUMER receive the message : please handle the order, order-id is 5454656465
CONSUMER exit wait, and do other things
CONSUMER exit wait, and do other things
CONSUMER exit wait, and do other things
...
Модель тайм-аута-ожидания широко используется в шаблонах параллельного проектирования и пакетах JUC, и ее необходимо хорошо понимать.
Существенная разница между ожиданием и сном (часто задаваемый вопрос в интервью)
-
wait
даObject
Перед вызовом необходимо получить метод экземпляра и блокировку объекта экземпляра.sleep
даThread
Статические методы можно вызывать напрямую -
sleep
блокировки, удерживаемые текущим потоком, не снимаются, иwait
освободит все блокировки, удерживаемые текущим потоком -
sleep
а такжеwait
приведет к входу потокаTIMED-WAITING
состояние освобождает права на выполнение ЦП, но вызываетsleep
Поток может автоматически вернуться после установленного срока, в то время какwait(millis)
По истечении тайм-аута вам необходимо получить блокировку объекта перед возвратом,wait(0)
Еще более необходимо дождаться пробуждения и получить замок перед возвращением.
Проблема приостановленной анимации, вызванная несколькими потребителями и несколькими производителями
Следующий код имитирует случай, когда два производителя и два потребителя работают одновременно в модели производитель-потребитель, что приводит к приостановке программы.
public class ProducerConsumer {
private String message;
public synchronized void produce() {
while (message != null) {
try {
this.wait();
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " was broken");
return;
}
}
message = "time is " + new Date(System.currentTimeMillis());
this.notify();
System.out.println(Thread.currentThread().getName() + " send the message : " + message);
}
public synchronized void consume() {
while (message == null) {
try {
this.wait();
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " was broken");
return;
}
}
System.out.println(Thread.currentThread().getName() + " recv the message : " + message);
message = null;
this.notify();
}
public static void main(String[] args) {
ProducerConsumer pc = new ProducerConsumer();
Stream.of("p1", "p2").forEach(name -> {
new Thread(() -> {
while (true) {
pc.produce();
}
}, name).start();
});
Stream.of("c1", "c2").forEach(name -> {
new Thread(() -> {
while (true) {
pc.consume();
}
}, name).start();
});
}
}
Результат выглядит следующим образом:
p1 send the message : time is Fri Feb 01 14:06:26 CST 2019
c2 recv the message : time is Fri Feb 01 14:06:26 CST 2019
p2 send the message : time is Fri Feb 01 14:06:26 CST 2019
c2 recv the message : time is Fri Feb 01 14:06:26 CST 2019
p1 send the message : time is Fri Feb 01 14:06:27 CST 2019
# 至此,四个线程陷入永久wait
Автор тоже долгое время был другим.Когда Производитель создает сообщение, он уведомляет Потребителя о его потреблении, а после того, как последний его потребляет, он уведомляет Производителя, ожидающего производства.Нет проблем! как попасть вwait
Шерстяная ткань?
Это потому, что мы впали в инерционное мышление.Когда мы изучаем модель производитель-потребитель, мы всегда думаем, что производитель уведомит потребителя, когда сообщение будет произведено, и что потребитель уведомит производителя, когда потребление будет завершено. мы забылиnotify
Суть:notify
будет от объектаwait set
серединаслучайныйВыберите нить, чтобы проснуться. Давайте рационально проанализируем приведенный выше код:17
Oknotify
разбудит темуwait set
на потребительской ветке? Возможно, нет! Предположим, на мгновениеp1
схватил замок иp2,c1,c2
заблокированыwait
на, тогдаp1
Вызывается после создания сообщенияnotify
Можно ли проснутьсяp2
(В этом случае проснувшийсяp2
Находитьp1
Произведенные сообщения, которые не используются, все равно будут перехватываться.wait
, то все четыре потока захватываютсяwait
никакой другой поток, чтобы разбудить их. Точно так же потребитель может проснуться после того, как другой пользователь проглотит сообщение.wait
потребителей такое пробуждение делает бесполезную работу). это потому чтоnotify
изНеопределенность, так что приведенный выше код не следует процедуре производитель-потребитель, а последние четыре потока перехватываются вwait
И нет нити, чтобы их разбудить.
Но если первый17,34
Oknotify
изменить наnotifyAll
Никакого тупика не будет. Это потому чтоnotifyAll
разбудит все блоки, которые заблокированы на объектеwait
на нитке. следовательноp1
После создания сообщения, если вызовnotifyAll
,Такp2,c1,c2
пробудится и сразится за объектmonitor
, даже еслиp2
захваченный первым, он также войдет, потому что сообщение не потребляетсяwait
А затем отпустите блокировку и проснитесь в ожидании блокировкиc1,c2
,такp1
изnotifyAll
в конечном итоге приведет к тому, что один из потребителей изwait
Возврат, чтобы даже при наличии нескольких производителей и нескольких потребителей программа могла выполняться.
p2 send the message : time is Fri Feb 01 14:30:39 CST 2019
c1 recv the message : time is Fri Feb 01 14:30:39 CST 2019
p1 send the message : time is Fri Feb 01 14:30:39 CST 2019
c2 recv the message : time is Fri Feb 01 14:30:39 CST 2019
p2 send the message : time is Fri Feb 01 14:30:40 CST 2019
c1 recv the message : time is Fri Feb 01 14:30:40 CST 2019
p1 send the message : time is Fri Feb 01 14:30:41 CST 2019
c2 recv the message : time is Fri Feb 01 14:30:41 CST 2019
p2 send the message : time is Fri Feb 01 14:30:42 CST 2019
c1 recv the message : time is Fri Feb 01 14:30:42 CST 2019
...
Модель производитель-потребитель при многопоточности, чтобы использовать
notifyAll
Рукописный ввод BooleanLock
упомянутый вышеsynchronized
Серьезным недостатком является то, что если поток, удерживающий блокировку, не снимает блокировку с опозданием (время выполнения критической секции слишком велико), другие потоки, ожидающие блокировки, будут заблокированы до тех пор, пока блокировка не будет снята. Итак, можем ли мы реализовать такой механизм: установить ограничение времени для потока, ожидающего освобождения блокировки, если ограничение времени превышено, то блокировка не будет снята в течение полутора времени, поэтому поток может использовать это время простоя для выполнения других задач, а не блокирование и ничегонеделание все время.
Теперь мы можем использоватьwait/notify
Классическая парадигмальная реализацияsynchronized
Семантика, использующая модель ожидания тайм-аута для реализации семантики ожидания с ограничением по времени. Сначала определите интерфейс объекта синхронизации, а именноLock
:
public interface Lock {
void lock() throws InterruptedException;
void unlock();
void lock(long millis) throws InterruptedException, TimeoutException;
Collection<Thread> getBlockedThread();
int getBlockedCount();
}
Затем реализуйте простой, который использует логическую переменную для представления состояния синхронизации.BooleanLock
:
public class BooleanLock implements Lock {
private volatile boolean isSync = false; //represent whether the lock is held or not. true is held, false is not held
private Thread currentThread; //current thread which hold the lock
private Collection<Thread> waitQueue;
public BooleanLock() {
this.isSync = false;
this.currentThread = null;
this.waitQueue = new ArrayList<>();
}
@Override
public synchronized void lock() throws InterruptedException {
waitQueue.add(Thread.currentThread());
while (isSync) { // lock is held by other thread
this.wait();
}
// get the lock successfully
waitQueue.remove(Thread.currentThread());
currentThread = Thread.currentThread();
isSync = true; //indicate the lock is held
System.out.println(Thread.currentThread().getName() + " get the lock");
}
@Override
public void unlock() {
// check the operator is the thread which is holding the lock
if (Thread.currentThread() != currentThread) {
return;
}
synchronized (this) {
currentThread = null;
isSync = false;
this.notifyAll();
System.out.println(Thread.currentThread().getName() + " release the lock");
}
}
@Override
public synchronized void lock(long millis) throws InterruptedException, TimeoutException {
long base = System.currentTimeMillis();
long now = 0;
waitQueue.add(Thread.currentThread());
while (isSync) {
now = System.currentTimeMillis() - base;
if (now >= millis) {
throw new TimeoutException();
}
this.wait(millis);
}
waitQueue.remove(Thread.currentThread());
currentThread = Thread.currentThread();
isSync = true;
System.out.println(Thread.currentThread().getName() + " get the lock");
}
@Override
public Collection<Thread> getBlockedThread() {
return Collections.unmodifiableCollection(waitQueue);
}
@Override
public int getBlockedCount() {
return waitQueue.size();
}
}
контрольная работаsynchronized
Семантика:
public static void main(String[] args) {
BooleanLock lock = new BooleanLock();
Stream.of("t1", "t2", "t3", "t4", "t5").forEach(name -> {
new Thread(() -> {
try {
lock.lock();
Thread.sleep(50); // to do thing
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
printBlockedThread(lock);
lock.unlock();
}
}, name).start();
});
}
private static void printBlockedThread(BooleanLock lock) {
System.out.print("There are " + lock.getBlockedCount() + " threads waiting on the lock: ");
lock.getBlockedThread().forEach(thread -> System.out.print(thread.getName() + " "));
System.out.println();
}
результат операции:
t1 get the lock
There are 4 threads waiting on the lock: t4 t3 t2 t5
t1 release the lock
t5 get the lock
There are 3 threads waiting on the lock: t4 t3 t2
t5 release the lock
t4 get the lock
There are 2 threads waiting on the lock: t3 t2
t4 release the lock
t2 get the lock
There are 1 threads waiting on the lock: t3
t2 release the lock
t3 get the lock
There are 0 threads waiting on the lock:
t3 release the lock
должен быть в курсеunlock
должно быть написано вfinally
гарантирует, что блокировка будет снята, в то время какsynchronized
Исключение выдается при выполнении синхронизированного блока.JVM освободит удерживания текущего потока, когда исключение выдается через таблицу исключений (см. описание таблицы методов в главе «Углубленное понимание виртуальной машины Java (Второй Edition)" Структура файла класса). все блокировки.
Тестовый ограниченный по времени захват замков
Приведенный выше пример просто реализует то же самоеsynchronized
Та же функция, затем давайте протестируем функцию получения блокировки на ограниченное время, котораяsynchronized
невозможный.
public static void main(String[] args) {
BooleanLock lock = new BooleanLock();
Stream.of("t1", "t2", "t3", "t4", "t5").forEach(name -> {
new Thread(() -> {
try {
lock.lock(1000);
Thread.sleep(2000); // the task is very time-consuming
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " was interrupted");
} catch (TimeoutException e) {
System.out.println(Thread.currentThread().getName() + " get lock time out, so do other thing first and then get the lock again");
} finally {
lock.unlock();
}
}, name).start();
});
}
Результат выглядит следующим образом:
t1 get the lock
t2 get lock time out, so do other thing first and then get the lock again
t3 get lock time out, so do other thing first and then get the lock again
t4 get lock time out, so do other thing first and then get the lock again
t5 get lock time out, so do other thing first and then get the lock again
t1 release the lock
Добавьте хуки в свое приложение
При использовании некоторых фреймворков с открытым исходным кодом, таких какTomcat
, некоторые журналы все еще будут распечатываться при завершении работы, эти журналы обычно содержат информацию об освобождении ресурсов приложения. То есть мы нажимаемterminate
После того, как событие зафиксировано приложением, приложение не завершает работу напрямую, а сначала освобождает некоторые ценные ресурсы. Это делается путем установки функции ловушки, которая будет вызываться перед завершением основного потока приложения. вести перепискуAPI
даRuntime.getRuntime().addShutdownHook(thread)
.
Ниже я будуlinux
Вышеприведенное демонстрирует полезность функций ловушек.MyApp.java
Представляет мое приложение:
public class MyApp{
public static void main(String[] args){
Runtime.getRuntime().addShutdownHook(
new Thread(() -> {
//release resource here, like socket,connection etc
System.out.println("releasing resources...");
})
);
while(true){
// start a service
}
}
}
пройти черезaddShutdownHook
Установленная нить будет вmain
Вызывается, когда поток прерывается внешним миром, например, когда я работаюjava MyApp
прессованныйCTRL C
[root@izm5ecexclrsy1gmkl4bgdz ~]# javac MyApp.java
[root@izm5ecexclrsy1gmkl4bgdz ~]# java MyApp
^Creleasing resources...
Другой пример работает в фоновом режимеMyApp
,пройти черезkill pid
Завершить это:
[root@izm5ecexclrsy1gmkl4bgdz ~]# java MyApp &
[1] 14230
[root@izm5ecexclrsy1gmkl4bgdz ~]# jps
14240 Jps
14230 MyApp
[root@izm5ecexclrsy1gmkl4bgdz ~]# kill 14230
[root@izm5ecexclrsy1gmkl4bgdz ~]# releasing resources...
ноkill -9
Программа ловушки не будет запущена:
[root@izm5ecexclrsy1gmkl4bgdz ~]# java MyApp &
[1] 14264
[root@izm5ecexclrsy1gmkl4bgdz ~]# ps aux|grep java
root 14264 96.3 1.4 2460724 27344 pts/0 Sl 16:03 0:09 java MyApp
root 14275 0.0 0.0 112660 964 pts/0 R+ 16:03 0:00 grep --color=auto java
[root@izm5ecexclrsy1gmkl4bgdz ~]# kill -9 14264
[root@izm5ecexclrsy1gmkl4bgdz ~]# ps aux|grep java
root 14277 0.0 0.0 112660 964 pts/0 R+ 16:03 0:00 grep --color=auto java
[1]+ Killed java MyApp
получить информацию о стеке
Thread.currentThread().getStackTracke()
Получить всю информацию о кадре стека в стеке, когда текущий поток выполняет текущий метод, вернутьStackTraceElement[]
, элемент представляет кадр стека методов, который можно использовать для определения класса, к которому принадлежит метод, имени метода и количества строк, до которых выполняется метод.
public static void main(String[] args) {
m1();
}
public static void m1() {
m2();
}
private static void m2() {
m3();
}
private static void m3() {
Arrays.asList(Thread.currentThread().getStackTrace()).stream()
.filter(
//过滤掉native方法
stackTraceElement -> !stackTraceElement.isNativeMethod()
).forEach(
stackTraceElement -> {
System.out.println(stackTraceElement.getClassName() + ":" +
stackTraceElement.getMethodName() + "():" +
stackTraceElement.getLineNumber());
}
);
}
Поймать исключение во время выполнения метода запуска
потому чтоRunnable
интерфейсrun
Метод не объявлен для создания каких-либо исключений, поэтому при переопределенииrun
когда всеchecked exception
Все нужно решать вручную. но если броситьunchecked exception
Шерстяная ткань,1/0
Это классический пример, как нам его захватить?
пройти черезthread.setUncheckedExceptionHandler()
Чтобы иметь возможность сделать это:
public static final int A = 1;
public static final int B = 0;
public static void main(String[] args) {
Thread thread = new Thread(() -> {
int i = A / B;
});
thread.setUncaughtExceptionHandler((t, e) -> {
// t -> the ref of the thread, e -> exception
System.out.println(e.getMessage()); /// by zero
});
thread.start();
}
группа потоков
Группа потоков представляет собой набор потоков, группа потоков также может содержать другие группы потоков, а группа потоков может быть развернута в древовидной структуре.
-
При запуске JVM файл с именем
main
поток работаетmain
функция иmain
группа тем,main
Группа потока потокаmain
Группа тем:public static void main(String[] args) { System.out.println(Thread.currentThread().getName()); //main System.out.println(Thread.currentThread().getThreadGroup().getName()); //main }
-
При создании потока, если для потока явно не указана группа потоков, поток примет группу потоков своего родительского потока в качестве своей собственной группы потоков.
Если группа потоков создается без явного указания ее родительской группы потоков, группа потоков текущего потока будет использоваться в качестве его родительской группы потоков.
public static void main(String[] args) { Thread t1 = new Thread(() -> { // }); System.out.println(t1.getThreadGroup().getName()); //main ThreadGroup threadGroup = new ThreadGroup("MyThreadGroup"); Thread t2 = new Thread(threadGroup, () -> { // }); System.out.println(t2.getThreadGroup().getName()); //MyThreadGroup System.out.println(t2.getThreadGroup().getParent().getName()); //main }
-
threadGroup.list()
Метод может печатать информацию об уцелевших потоках в группе потоков, которую можно использовать дляdebug
ThreadGroup threadGroup = new ThreadGroup("MyThreadGroup"); new Thread(threadGroup, () -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(threadGroup, () -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); threadGroup.list(); java.lang.ThreadGroup[name=MyThreadGroup,maxpri=10] Thread[Thread-0,5,MyThreadGroup] Thread[Thread-1,5,MyThreadGroup]
БолееAPI
Вы можете проверить официальную документацию.
пользовательский пул потоков
Логика выполнения рабочего потока
Рабочий поток должен постоянно опрашивать очередь задач, чтобы увидеть, есть ли задача для выполнения, и выполнить ее, если она есть. Затем также предоставьте внешнему миру способ завершить текущий поток.stop
, который использует общие переменные состояния и используетvolatile
Декорация делает завершающие операции из внешнего мира немедленно видимыми для текущего рабочего потока.
public class Worker implements Runnable {
private volatile boolean stop;
private LinkedList<Runnable> taskQueue;
private Thread currentThread;
public Worker(LinkedList<Runnable> taskQueue) {
this.taskQueue = taskQueue;
}
@Override
public void run() {
currentThread = Thread.currentThread();
Runnable task = null;
OUTER:
while (!stop) {
synchronized (taskQueue) {
while (taskQueue.isEmpty()) {
try {
taskQueue.wait();
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName()+" has been interrupted");
break OUTER;
}
}
task = taskQueue.removeFirst();
taskQueue.notifyAll();
}
if (task != null) {
task.run();
}
}
}
public void interrupt() {
if (currentThread != null) {
currentThread.interrupt();
}
}
public void stop() {
stop = true;
}
}
Пул потоков: используется для создания потоков и управления ими.
public class ThreadPool {
private static final int DEFAULT_THREAD_COUNT = 10;
private int threadCount;
private LinkedList<Worker> workQueue;
private LinkedList<Runnable> taskQueue;
public ThreadPool() {
this(DEFAULT_THREAD_COUNT);
}
public ThreadPool(int size) {
this.threadCount = size;
this.workQueue = new LinkedList<>();
this.taskQueue = new LinkedList<>();
init(size);
}
//创建并启动count个线程
private void init(int count) {
if (count <= 0) {
throw new IllegalArgumentException("thread pool size must greater than zero");
}
for (int i = 0; i < count; i++) {
Worker worker = new Worker(taskQueue);
Thread thread = new Thread(worker, "ThreadPool-" + i);
thread.start();
workQueue.add(worker);
}
}
public void execute(Runnable task) {
synchronized (taskQueue) {
taskQueue.add(task);
taskQueue.notifyAll();
}
}
public int getThreadCount() {
return threadCount;
}
public int getTaskCount() {
return taskQueue.size();
}
//对wait中的线程调用stop,他也无法轮询该变量而退出循环
//因此对于wait中的工作线程直接中断它,而正在执行的线程则等他自己轮询到stop而退出
public void shutdown() {
synchronized (taskQueue) {
for (Worker worker : workQueue) {
worker.stop();
worker.interrupt();
}
}
System.out.println("thread pool destroyed");
}
}
контрольная работа
public class ThreadPoolTest {
public static void main(String[] args) throws InterruptedException {
ThreadPool threadPool = new ThreadPool();
for (int i = 0; i < 40; i++) {
int number = i;
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName() + "start execute task-" + number);
try {
Thread.sleep(new Random(System.currentTimeMillis()).nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
Thread.sleep(5000);
threadPool.shutdown();
}
}
Добавьте политику отклонения в пул потоков
Очередь работ пула потоков не должна быть бесконечно большой.Если не обращать внимания, это может привести к OOM.Поэтому, когда количество задач в очереди задач достигает определенного числа, следует принять стратегию отклонения для представленные задачи.
Здесь следует использовать режим стратегии, интерфейс стратегии:
public interface RefusePolicy {
void refuse() throws Exception;
}
Стратегия выдачи исключения, когда количество простых задач слишком велико:
public class DiscardRefusePolicy implements RefusePolicy {
public class TaskExceededException extends Exception {
public TaskExceededException(String message) {
super(message);
}
}
@Override
public void refuse() throws TaskExceededException {
throw new TaskExceededException("task has exceeded the taskSize of thread poll");
}
}
Модернизацияexecute
метод:
private static final int DEFAULT_THREAD_COUNT = 10;
private static final RefusePolicy DEFAULT_REFUSE_POLICY = new DiscardRefusePolicy();
private static final int DEFAULT_TASK_SIZE = 200;
private int threadCount;
private LinkedList<Worker> workQueue;
private LinkedList<Runnable> taskQueue;
private int maxTaskSize;
private RefusePolicy refusePolicy;
public ThreadPool() {
this(DEFAULT_THREAD_COUNT, DEFAULT_TASK_SIZE, DEFAULT_REFUSE_POLICY);
}
public ThreadPool(int size, int maxTaskSize, RefusePolicy refusePolicy) {
this.threadCount = size;
this.maxTaskSize = maxTaskSize;
this.workQueue = new LinkedList<>();
this.taskQueue = new LinkedList<>();
this.refusePolicy = refusePolicy;
init(size);
}
public void execute(Runnable task) throws Exception {
synchronized (taskQueue) {
if (taskQueue.size() >= maxTaskSize) {
refusePolicy.refuse();
return;
}
taskQueue.add(task);
taskQueue.notifyAll();
}
}
контрольная работа
public static void main(String[] args) throws InterruptedException {
ThreadPool threadPool = new ThreadPool();
for (int i = 0; i < 300; i++) {
int number = i;
try {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "start execute task-" + number);
try {
Thread.sleep(new Random(System.currentTimeMillis()).nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
});
} catch (Exception e) {
System.out.println("task-" + i + " execution error : " + e.getMessage());
}
}
Thread.sleep(5000);
threadPool.shutdown();
}