предисловие
Лично я считаю, что многопоточность в java — самая сложная часть javaSe. Я чувствую, что изучал ее раньше, но я действительно не знаю, как начать с необходимости многопоточности. На самом деле, я до сих пор мало что знаю о многопоточность, сценарии применения потокового API, я не знаю запущенный процесс многопоточности и т. д. В этой статье будет использован метод пример + диаграмма + исходный код для анализа многопоточности java.
Длина статьи велика, и вы можете прочитать определенные главы. Рекомендуется, чтобы все многопоточные коды были набраны вручную. Никогда не верьте выводам, которые вы видите. Те, которые вы кодируете и запускаете, принадлежат вам.
Что такое многопоточность Java?
процесс и поток
процесс
- Когда программа запускается, она запускает процесс, например запуск qq, word
- Программа состоит из инструкций и данных.Инструкции должны выполняться, данные должны загружаться, инструкции загружаются и выполняются процессором, а данные загружаются в память.Когда инструкции выполняются, процессор может запланировать жесткий диск, сеть и другие устройства
нить
- Процесс можно разделить на несколько потоков
- Поток — это поток инструкций, наименьшая единица планирования ЦП, и ЦП выполняет инструкции одну за другой.
Параллелизм и параллелизм
Параллелизм: когда одноядерный ЦП запускает несколько потоков, квант времени переключается быстро. Потоки по очереди выполняют процессор
Параллельный: когда многоядерный процессор работает в многопоточном режиме, он действительно работает одновременно.
Java предоставляет богатый API для поддержки многопоточности.
Зачем использовать многопоточность?
Все, что может быть достигнуто с помощью многопоточности, может быть выполнено с помощью одного потока, и один поток работает хорошо.Почему Java вводит концепцию многопоточности?
Преимущества многопоточности:
-
Программы работают быстрее! быстрый! быстрый!
-
Максимально используйте ресурсы процессора. В настоящее время почти ни один онлайн-процессор не является одноядерным, что позволяет в полной мере использовать мощные возможности многоядерного процессора.
В чем сложность многопоточности?
Для одного потока существует только одна строка выполнения, процесс прост для понимания, а поток выполнения кода можно четко обрисовать в мозгу.
Многопоточность — это несколько строк, и, как правило, существует взаимодействие между несколькими строками, и требуется связь между несколькими строками.Как правило, трудности заключаются в следующем:
- Результат выполнения многопоточности не определен и зависит от планирования процессора.
- Проблемы безопасности многопоточности
- Ресурсы потока драгоценны, полагаясь на пул потоков для управления потоком, проблема настройки параметров пула потоков
- Многопоточное выполнение является динамическим, одновременным и сложным для отслеживания процесса.
- Нижний уровень многопоточности — это уровень операционной системы, а исходный код сложен.
Иногда я надеюсь, что смогу стать байт-челноком в сервере и разобраться во всех подноготных, прямо как непобедимый король разрушения (если вы не видели этот фильм, то можете посмотреть его, ваш мозг будет нараспашку).
Базовое использование многопоточности в Java
Определяйте задачи, создавайте и запускайте потоки
Задача: Тело выполнения потока. Это наша основная логика кода
определить задачи
- Наследовать класс Thread (можно сказать, что он объединяет задачи и потоки)
- Реализовать интерфейс Runnable (можно сказать, чтобы разделить задачи и потоки)
- Реализуйте интерфейс Callable (используйте FutureTask для выполнения задач)
Ограничения задач реализации Thread
- Логика задачи написана в методе run класса Thread, который имеет ограничение одиночного наследования.
- При создании многопоточности каждая задача не использует общие переменные-члены, если у них есть переменные-члены, и для обеспечения совместного использования необходимо добавить статические.
Runnable и Callable устраняют ограничения Thread
Но Runbale имеет следующие ограничения по сравнению с Callable
- Задача не имеет возвращаемого значения
- Задача не может генерировать исключение вызывающей стороне
Следующий код Несколько способов определения потоков
@Slf4j
class T extends Thread {
@Override
public void run() {
log.info("我是继承Thread的任务");
}
}
@Slf4j
class R implements Runnable {
@Override
public void run() {
log.info("我是实现Runnable的任务");
}
}
@Slf4j
class C implements Callable<String> {
@Override
public String call() throws Exception {
log.info("我是实现Callable的任务");
return "success";
}
}
Как создать нить
- Создавайте потоки напрямую через класс Thread
- Используйте пул потоков для создания потоков
Как начать нить
- Вызвать метод start() потока
// 启动继承Thread类的任务
new T().start();
// 启动继承Thread匿名内部类的任务 可用lambda优化
Thread t = new Thread(){
@Override
public void run() {
log.info("我是Thread匿名内部类的任务");
}
};
// 启动实现Runnable接口的任务
new Thread(new R()).start();
// 启动实现Runnable匿名实现类的任务
new Thread(new Runnable() {
@Override
public void run() {
log.info("我是Runnable匿名内部类的任务");
}
}).start();
// 启动实现Runnable的lambda简化后的任务
new Thread(() -> log.info("我是Runnable的lambda简化后的任务")).start();
// 启动实现了Callable接口的任务 结合FutureTask 可以获取线程执行的结果
FutureTask<String> target = new FutureTask<>(new C());
new Thread(target).start();
log.info(target.get());
Диаграмма классов вышеупомянутых классов, связанных с потоками, выглядит следующим образом.
переключатель контекста
В многоядерном процессоре многопоточность работает параллельно.Если потоков много, одно ядро будет планировать потоки одновременно, и будет концепция переключения контекста во время выполнения.
Когда ЦП выполняет задачу потока, он выделяет потоку квант времени, и переключение контекста происходит в следующих ситуациях.
- Квант процессорного времени потока истекает
- вывоз мусора
- Сам поток вызывает такие методы, как sleep, yield, wait, join, park, synchronized, lock и т. д.
Когда происходит переключение контекста, операционная система сохраняет состояние текущего потока и восстанавливает состояние другого потока.В JVM есть блок адреса памяти, называемый счетчиком программ, который используется для записи того, какая строка кода выполняется поток, который является частным для потока.
Когда идея нарушает точку, ее можно установить в режим потока, а режим отладки идеи может увидеть изменение кадра стека.
Предоставление темы - yield() и приоритет потока
Метод yield () заставит работающий поток переключиться в состояние готовности и повторно скремблировать для кванта времени ЦП. Получение кванта времени во время соревнования зависит от распределения ЦП.
код показывает, как показано ниже
// 方法的定义
public static native void yield();
Runnable r1 = () -> {
int count = 0;
for (;;){
log.info("---- 1>" + count++);
}
};
Runnable r2 = () -> {
int count = 0;
for (;;){
Thread.yield();
log.info(" ---- 2>" + count++);
}
};
Thread t1 = new Thread(r1,"t1");
Thread t2 = new Thread(r2,"t2");
t1.start();
t2.start();
// 运行结果
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129504
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129505
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129506
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129507
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129508
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129509
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129510
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129511
11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129512
11:49:15.798 [t2] INFO thread.TestYield - ---- 2>293
11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129513
11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129514
11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129515
11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129516
11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129517
11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129518
Как показано в приведенных выше результатах, поток t2 выполняет yield() каждый раз, когда он выполняется, и поток 1 имеет значительно больше возможностей для выполнения, чем поток 2.
приоритет потока
Внутри потока используется число от 1 до 10 для настройки приоритета потока.Приоритет потока по умолчанию NORM_PRIORITY: 5
Когда процессор занят, поток с более высоким приоритетом получает больше квантов времени.
Когда процессор относительно простаивает, настройка приоритета практически бесполезна.
public final static int MIN_PRIORITY = 1;
public final static int NORM_PRIORITY = 5;
public final static int MAX_PRIORITY = 10;
// 方法的定义
public final void setPriority(int newPriority) {
}
Когда процессор занят
Runnable r1 = () -> {
int count = 0;
for (;;){
log.info("---- 1>" + count++);
}
};
Runnable r2 = () -> {
int count = 0;
for (;;){
log.info(" ---- 2>" + count++);
}
};
Thread t1 = new Thread(r1,"t1");
Thread t2 = new Thread(r2,"t2");
t1.setPriority(Thread.NORM_PRIORITY);
t2.setPriority(Thread.MAX_PRIORITY);
t1.start();
t2.start();
// 可能的运行结果
11:59:00.696 [t1] INFO thread.TestYieldPriority - ---- 1>44102
11:59:00.696 [t2] INFO thread.TestYieldPriority - ---- 2>135903
11:59:00.696 [t2] INFO thread.TestYieldPriority - ---- 2>135904
11:59:00.696 [t2] INFO thread.TestYieldPriority - ---- 2>135905
11:59:00.696 [t2] INFO thread.TestYieldPriority - ---- 2>135906
Когда процессор простаивает
Runnable r1 = () -> {
int count = 0;
for (int i = 0; i < 10; i++) {
log.info("---- 1>" + count++);
}
};
Runnable r2 = () -> {
int count = 0;
for (int i = 0; i < 10; i++) {
log.info(" ---- 2>" + count++);
}
};
Thread t1 = new Thread(r1,"t1");
Thread t2 = new Thread(r2,"t2");
t1.setPriority(Thread.MIN_PRIORITY);
t2.setPriority(Thread.MAX_PRIORITY);
t1.start();
t2.start();
// 可能的运行结果 线程1优先级低 却先运行完
12:01:09.916 [t1] INFO thread.TestYieldPriority - ---- 1>7
12:01:09.916 [t1] INFO thread.TestYieldPriority - ---- 1>8
12:01:09.916 [t1] INFO thread.TestYieldPriority - ---- 1>9
12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>2
12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>3
12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>4
12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>5
12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>6
12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>7
12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>8
12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>9
Нить демона
По умолчанию процесс Java должен дождаться завершения выполнения всех потоков, прежде чем он завершится.Существует специальный поток, называемый потоком демона.Когда завершатся все потоки, не являющиеся демонами, он будет принудительно завершен, даже если он еще не закончил выполнение.
Все потоки по умолчанию являются потоками, не являющимися демонами.
Поток сборки мусора — это типичный поток демона.
// 方法的定义
public final void setDaemon(boolean on) {
}
Thread thread = new Thread(() -> {
while (true) {
}
});
// 具体的api。设为true表示未守护线程,当主线程结束后,守护线程也结束。
// 默认是false,当主线程结束后,thread继续运行,程序不停止
thread.setDaemon(true);
thread.start();
log.info("结束");
блокировка потока
Блокировку потока можно разделить на множество типов.Определение блокировки на уровне операционной системы и уровне java может различаться, но в широком смысле существуют следующие способы блокировки потоков.
- Блокировка BIO, даже если используется блокирующий поток ввода-вывода
- sleep (долгое время) позволить потоку спать и войти в состояние блокировки
- a.join() Поток, вызывающий этот метод, заблокирован, ожидая, пока поток a возобновит работу после выполнения
- Synchronized или ReentrantLock приводит к тому, что поток переходит в состояние блокировки без получения блокировки (подробно описано в главе о блокировке синхронизации).
- Вызов метода wait() после получения блокировки также переводит поток в заблокированное состояние (подробности см. в главе о блокировке синхронизации).
- LockSupport.park() переводит поток в заблокированное состояние (подробнее см. в главе о блокировке синхронизации)
sleep()
Перевод потока в спящий режим приведет к тому, что работающий поток перейдет в состояние блокировки. Когда время сна истекает, временной интервал, который повторно скремблируется для ЦП, продолжает работать.
// 方法的定义 native方法
public static native void sleep(long millis) throws InterruptedException;
try {
// 休眠2秒
// 该方法会抛出 InterruptedException异常 即休眠过程中可被中断,被中断后抛出异常
Thread.sleep(2000);
} catch (InterruptedException异常 e) {
}
try {
// 使用TimeUnit的api可替代 Thread.sleep
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
join()
join означает, что поток, вызывающий метод, переходит в заблокированное состояние и ожидает возобновления работы потока после завершения выполнения.
// 方法的定义 有重载
// 等待线程执行完才恢复运行
public final void join() throws InterruptedException {
}
// 指定join的时间。指定时间内 线程还未执行完 调用方线程不继续等待就恢复运行
public final synchronized void join(long millis)
throws InterruptedException{}
Thread t = new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
r = 10;
});
t.start();
// 让主线程阻塞 等待t线程执行完才继续执行
// 去除该行,执行结果为0,加上该行 执行结果为10
t.join();
log.info("r:{}", r);
// 运行结果
13:09:13.892 [main] INFO thread.TestJoin - r:10
Прерывание-прерывание потока()
// 相关方法的定义
public void interrupt() {
}
public boolean isInterrupted() {
}
public static boolean interrupted() {
}
Флаг прерывания: прерывается ли поток, значение true означает прерывание, значение false означает, что нет
isInterrupted() Получает флаг прерывания потока, после его вызова флаг прерывания потока не изменяется.
Метод interrupt() используется для прерывания потока.
- Вы можете прерывать потоки, которые явно вызывают методы InterruptedException, такие как сон, ожидание, присоединение и т. д., но после прерывания флаг прерывания потока по-прежнему остается ложным.
- Прервать обычный поток, поток не будет прерван, но флаг прерывания потока равен true
interrupted() Получить флаг прерывания потока и очистить флаг прерывания после вызова.То есть, если получение истинно, флаг прерывания после вызова становится ложным (обычно не используется)
Пример прерывания: существует поток фонового мониторинга, который постоянно отслеживает, и когда внешний мир прерывает его, он прекращает свою работу. код показывает, как показано ниже
@Slf4j
class TwoPhaseTerminal{
// 监控线程
private Thread monitor;
public void start(){
monitor = new Thread(() ->{
// 不停的监控
while (true){
Thread thread = Thread.currentThread();
// 判断当前线程是否被打断
if (thread.isInterrupted()){
log.info("当前线程被打断,结束运行");
break;
}
try {
Thread.sleep(1000);
// 监控逻辑中被打断后,打断标记为true
log.info("监控");
} catch (InterruptedException e) {
// 睡眠时被打断时抛出异常 在该处捕获到 此时打断标记还是false
// 在调用一次中断 使得中断标记为true
thread.interrupt();
}
}
});
monitor.start();
}
public void stop(){
monitor.interrupt();
}
}
состояние потока
Выше было сказано об использовании некоторых базовых API, после вызова этих методов у потока будет соответствующее состояние.
Состояние потока можно разделить на пять состояний на уровне операционной системы и шесть состояний на уровне java API.
пять штатов
- Исходное состояние: состояние, когда создается объект потока.
- Состояние Runnable (состояние готовности): после вызова метода start() он переходит в состояние готовности, то есть готов к планированию и выполнению процессором.
- Состояние выполнения: поток получает квант времени процессора и выполняет логику метода run().
- Состояние блокировки: поток заблокирован, отказываясь от кванта времени ЦП, ожидая, пока разблокировка вернется в состояние готовности, чтобы конкурировать за квант времени
- Состояние завершения: состояние после завершения выполнения потока или возникновения исключения.
шесть штатов
Внутреннее состояние enum в классе Thread
public enum State {
NEW,
RUNNABLE,
BLOCKED,
WAITING,
TIMED_WAITING,
TERMINATED;
}
- Создан НОВЫЙ объект потока
- Поток Runnable входит в это состояние после вызова метода start(), который включает три ситуации.
- Состояние готовности: ожидание, пока ЦП выделит квант времени
- Состояние выполнения: введите метод Runnable для выполнения задачи.
- Состояние блокировки: состояние, когда BIO выполняет блокировку потоков ввода-вывода.
- Заблокировано Состояние блокировки, когда блокировка не получена (будет подробно описано в главе о блокировке синхронизации)
- WAITING Состояние после вызова wait(), join() и других методов
- Состояние TIMED_WAITING после вызова sleep(time), wait(time), join(time) и других методов
- TERMINATED Состояние после завершения выполнения потока или возникновения исключения.
Соответствие между шестью состояниями и методами потока
Сводка методов, связанных с потоком
В основном суммируйте основные методы в классе Thread.
имя метода | Это статично | Описание метода |
---|---|---|
start() | нет | Пусть поток запустится, войдет в состояние готовности и подождет, пока ЦП выделит квант времени. |
run() | нет | Перепишите метод интерфейса Runnable, конкретную логику, выполняемую, когда поток получает квант времени процессора. |
yield() | да | Благодаря потоку поток, который получает квант времени процессора, переходит в состояние готовности и повторно шифруется для этого кванта времени. |
sleep(time) | да | Поток спит в течение фиксированного времени и входит в состояние блокировки.После того, как время ожидания завершено, квант времени повторно скремблируется, и сон может быть прерван. |
join()/join(time) | нет | Вызовите метод соединения объекта потока, вызывающий поток входит в блок, ждет, пока объект потока завершит выполнение или достигнет указанного времени для возобновления, и повторно скремблирует временной интервал. |
isInterrupted() | нет | Получить флаг прерывания потока, true: прервано, false: не прервано. Маркер разрыва не изменяется после вызова |
interrupt() | нет | Прервите поток, метод, выбрасывающий исключение InterruptedException, может быть прерван, но метка прерывания не будет изменена после прерывания, а метка прерывания будет изменена после прерывания обычного потока выполнения. |
interrupted() | нет | Получить флаг прерывания для потока. После вызова флаг прерывания будет очищен |
stop() | нет | Остановить выполнение потока Не рекомендуется |
suspend() | нет | Не рекомендуется приостанавливать обсуждение. |
resume() | нет | Возобновить выполнение потока Не рекомендуется |
currentThread() | да | получить текущий поток |
Связанные с потоком методы в Object
имя метода | Описание метода |
---|---|
wait()/wait(long timeout) | Поток, получивший блокировку, переходит в заблокированное состояние. |
notify() | Случайным образом разбудить поток, который был ожидания() |
notifyAll(); | Разбудите все потоки, которые ждут(), и повторно зашифруйте квант времени. |
блокировка синхронизации
потокобезопасность
- Нет проблем в программе, выполняющей несколько потоков
- Проблемы могут возникнуть, когда несколько потоков обращаются к общим ресурсам.
- Нет проблем с несколькими потоками, читающими общие ресурсы.
- Проблемы могут возникнуть, если происходит чередование инструкций, когда несколько потоков читают и пишут в общий ресурс.
Критическая секция. Фрагмент кода называется критической секцией, если он имеет многопоточные операции чтения и записи на общем ресурсе.
Обратите внимание, что чередование инструкций относится к тому, что когда код Java анализируется в файл байт-кода, одна строка кода Java может иметь несколько строк в байт-коде, которые могут чередоваться во время переключения контекста потока.
Потокобезопасность означает, что когда несколько потоков вызывают метод критической секции одного и того же объекта, значение атрибута объекта не должно быть неверным, что обеспечивает безопасность потоков.
Небезопасный код, как показано ниже
// 对象的成员变量
private static int count = 0;
public static void main(String[] args) throws InterruptedException {
// t1线程对变量+5000次
Thread t1 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
count++;
}
});
// t2线程对变量-5000次
Thread t2 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
count--;
}
});
t1.start();
t2.start();
// 让t1 t2都执行完
t1.join();
t2.join();
System.out.println(count);
}
// 运行结果
-1399
Приведенный выше код имеет два потока: один +5000 раз, а другой -5000. Если потокобезопасен, значение count все равно должно быть равно 0.
Но запуская его много раз, результат каждый раз разный, и он не равен 0, поэтому он не является потокобезопасным.
Являются ли потокобезопасные классы обязательно потокобезопасными для всех операций?
Некоторые потокобезопасные классы часто упоминаются при разработке, например ConcurrentHashMap.Безопасность потоков означает, что каждый независимый метод в классе является потокобезопасным, ноКомбинации методов не обязательно потокобезопасны..
Являются ли переменные-члены и статические переменные потокобезопасными?
- Потокобезопасный, если он не используется несколькими потоками
- Если есть многопоточный обмен
- Многопоточность имеет только операции чтения, а затем потокобезопасность
- Существует операция записи в несколько потоков, и код операции записи является критическим разделом, поэтому поток небезопасен.
Местные переменные потоки безопасны?
- Локальные переменные потокобезопасны
- Объекты, на которые ссылаются локальные переменные, не обязательно потокобезопасны.
- потокобезопасный, если объект не выходит за рамки метода
- Если объект выходит за рамки метода, например, возвращаемое значение метода, необходимо учитывать безопасность потоков.
synchronized
Блокировки синхронизации также называются объектными блокировками.Они блокируются на объектах.Разные объекты - разные блокировки.
Это ключевое слово используется для обеспечения безопасности потоков и является блокирующим решением.
Только один поток может одновременно удерживать блокировку объекта, а другие потоки будут заблокированы, когда захотят получить блокировку объекта, поэтому не нужно беспокоиться о проблеме переключения контекста.
Примечание. Не следует понимать, что поток заблокирован, и он будет продолжать выполняться при входе в синхронизированный блок кода. Если квант времени переключается, другие потоки также будут выполняться, и тогда они будут выполняться сразу после обратного переключения, но они не будут выполняться на ресурсах с конкурирующими блокировками, потому что текущий поток не снял блокировку.
Когда поток завершает выполнение синхронизированного блока кода, он пробуждает ожидающий поток.
синхронизированный фактически использует блокировки объектов для обеспечения критической секцииатомарностьКод критической секции неделим и не будет прерван переключением потоков
основное использование
// 加在方法上 实际是对this对象加锁
private synchronized void a() {
}
// 同步代码块,锁对象可以是任意的,加在this上 和a()方法作用相同
private void b(){
synchronized (this){
}
}
// 加在静态方法上 实际是对类对象加锁
private synchronized static void c() {
}
// 同步代码块 实际是对类对象加锁 和c()方法作用相同
private void d(){
synchronized (TestSynchronized.class){
}
}
// 上述b方法对应的字节码源码 其中monitorenter就是加锁的地方
0 aload_0
1 dup
2 astore_1
3 monitorenter
4 aload_1
5 monitorexit
6 goto 14 (+8)
9 astore_2
10 aload_1
11 monitorexit
12 aload_2
13 athrow
14 return
потокобезопасный код
private static int count = 0;
private static Object lock = new Object();
private static Object lock2 = new Object();
// t1线程和t2对象都是对同一对象加锁。保证了线程安全。此段代码无论执行多少次,结果都是0
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
synchronized (lock) {
count++;
}
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
synchronized (lock) {
count--;
}
}
});
t1.start();
t2.start();
// 让t1 t2都执行完
t1.join();
t2.join();
System.out.println(count);
}
Особенности:Блокировка добавляется к объекту, это должен быть один и тот же объект, блокировка может вступить в силу
нить связи
wait+notify
Связь между потоками может быть достигнута путем совместного использования переменных + wait() и notify().
wait() поместит поток в заблокированное состояние, notify() разбудит поток
Когда несколько потоков конкурируют за доступ к методу синхронизации объекта, объект блокировки будет связан с базовым объектом Monitor (реализация тяжеловесных блокировок).
Как показано на рисунке ниже, Thread0,1 сначала конкурируют до тех пор, пока блокировка не выполнит код, потоки 2,3,4,5 одновременно выполняют код критической секции и начинают конкурировать за блокировку
- Thread-0 сначала получает блокировку объекта и связывает его с владельцем монитора.В блоке кода синхронизации вызывается метод wait() объекта блокировки.После вызова он войдет в waitSet для ожидания. то же самое верно для потока 1. В это время состояние потока 0 для ожидания
- Потоки 2, 3, 4 и 5 конкурируют одновременно. После того, как 2 получает блокировку, он связывает владельца монитора. 3, 4 и 5 могут только войти в EntryList и ждать. В это время состояние потока 2 — Runnable, а состояния 3, 4 и 5 — Blocked.
- После выполнения 2 разбудить потоки в entryList, 3, 4 и 5 конкурируют за блокировки, а полученные потоки будут связаны с владельцем монитора.
- 3, 4 и 5 потоки в процессе выполнения, при вызове notify() или notifyAll() объекта блокировки, разбудят поток waitSet, и пробужденный поток войдет в entryList, чтобы дождаться повторной конкуренции за блокировку. замок
Уведомление:
-
Заблокированное состояние и состояние ожидания являются блокирующими состояниями.
-
Заблокированный поток проснется, когда поток-владелец снимет блокировку.
-
Сценарий использования wait and notify заключается в том, что должна быть синхронизация, и для вызова должна быть получена блокировка объекта, для вызова используйте объект блокировки, иначе будет выброшено исключение
- wait() Снимите блокировку. Введите, что waitSet может пройти через время, если он не будет разбужен в течение указанного времени, он проснется автоматически.
- notify() случайным образом пробуждает поток в наборе ожидания.
- notifyAll() пробуждает все потоки в waitSet
static final Object lock = new Object();
new Thread(() -> {
synchronized (lock) {
log.info("开始执行");
try {
// 同步代码内部才能调用
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("继续执行核心逻辑");
}
}, "t1").start();
new Thread(() -> {
synchronized (lock) {
log.info("开始执行");
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("继续执行核心逻辑");
}
}, "t2").start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("开始唤醒");
synchronized (lock) {
// 同步代码内部才能调用
lock.notifyAll();
}
// 执行结果
14:29:47.138 [t1] INFO TestWaitNotify - 开始执行
14:29:47.141 [t2] INFO TestWaitNotify - 开始执行
14:29:49.136 [main] INFO TestWaitNotify - 开始唤醒
14:29:49.136 [t2] INFO TestWaitNotify - 继续执行核心逻辑
14:29:49.136 [t1] INFO TestWaitNotify - 继续执行核心逻辑
В чем разница между ожиданием и сном?
Оба переведут поток в заблокированное состояние со следующими отличиями:
- ожидание — это метод объекта, сон — это метод потока.
- ожидание немедленно снимет блокировку. сон не снимет блокировку.
- Состояние потока после ожидания — это состояние потока после ожидания ожидания — Time_Waiting.
park&unpark
LockSupport — это инструментальный класс в составе juc, который предоставляет методы парковки и разблокировки для реализации связи между потоками.
Различия по сравнению с ожиданием и уведомлением
- ждать и уведомлять о необходимости получения блокировок объектов парковать не парковать не
- unpark может указать, что поток пробуждения уведомляет о случайном пробуждении
- Порядок парковки и снятия с парковки может быть отключен первым, а порядок ожидания и уведомления не может быть изменен на обратный.
Модель «производитель-потребитель»
指的是有生产者来生产数据,消费者来消费数据,生产者生产满了就不生产了,通知消费者取,等消费了再进行生产。
Потребители не будут потреблять, когда они не могут потреблять, и уведомят производителя о необходимости производить, а затем продолжат потреблять, когда продукция прибудет.
public static void main(String[] args) throws InterruptedException {
MessageQueue queue = new MessageQueue(2);
// 三个生产者向队列里存值
for (int i = 0; i < 3; i++) {
int id = i;
new Thread(() -> {
queue.put(new Message(id, "值" + id));
}, "生产者" + i).start();
}
Thread.sleep(1000);
// 一个消费者不停的从队列里取值
new Thread(() -> {
while (true) {
queue.take();
}
}, "消费者").start();
}
}
// 消息队列被生产者和消费者持有
class MessageQueue {
private LinkedList<Message> list = new LinkedList<>();
// 容量
private int capacity;
public MessageQueue(int capacity) {
this.capacity = capacity;
}
/**
* 生产
*/
public void put(Message message) {
synchronized (list) {
while (list.size() == capacity) {
log.info("队列已满,生产者等待");
try {
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.addLast(message);
log.info("生产消息:{}", message);
// 生产后通知消费者
list.notifyAll();
}
}
public Message take() {
synchronized (list) {
while (list.isEmpty()) {
log.info("队列已空,消费者等待");
try {
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Message message = list.removeFirst();
log.info("消费消息:{}", message);
// 消费后通知生产者
list.notifyAll();
return message;
}
}
}
// 消息
class Message {
private int id;
private Object value;
}
Корпус синхронизированного замка
Для того, чтобы более наглядно выразить концепцию добавления блокировок синхронизации, вот пример из жизни, и постарайтесь максимально конкретизировать изложенные выше понятия.
Вот одна вещь, которая всех очень интересует. Деньги! ! ! (кроме г-на Ма).
В реальности мы подходим к банкомату у ворот банка, чтобы снять деньги. Деньги банкомата являются общей переменной. В целях безопасности невозможно, чтобы два незнакомых человека вошли в один и тот же банкомат, чтобы снять деньги в одновременно, поэтому только один человек может войти, чтобы снять деньги. , затем заблокировать дверь банкомата, а другие могут только ждать у двери банкомата.
Банкоматов несколько, деньги в них друг на друга не влияют, замков несколько (блокировка нескольких объектов), у снимающего деньги нет проблем с безопасностью при одновременном снятии денег в нескольких банкоматах.
Если каждый незнакомец, который снимает деньги,нить, когда кассир входит в банкомат и запирает дверь (поток получает блокировку), выйти после получения денег (замок освобождения нити), следующий человек соревнуется с замком, чтобы снять деньги.
Предполагая, что персонал тоже нить, если кассир входит и обнаруживает, что в банкомате недостаточно денег, то уведомляет персонал, чтобы добавить деньги в банкомат (Вызовите метод notifyAll), снимающий приостанавливает снятие денег и входит в лобби банка, чтобы заблокировать и ждать (вызвать метод ожидания).
И персонал, и денежный ящик в холле банка были разбужены и снова конкурировали за замок.После входа, если денежный ящик был денежным ящиком, потому что в банкомате закончились деньги, они должны были войти в холл банка, чтобы ждать.
Когда персонал получит замок банкомата и войдет, он уведомит людей в холле, чтобы они сняли деньги после добавления денег (Вызовите метод notifyAll). Приостановите добавление денег самостоятельно, войдите в лобби банка и дождитесь пробуждения, чтобы добавить деньги (вызвать метод ожидания).
В это время все ожидающие в холле будут соревноваться за замок, а кто его получит, тот войдет и продолжит снимать деньги.
Отличие от реальности в том, что здесь нет понятия очереди, и кто схватит замок, тот войдет и получит его.
ReentrantLock
Блокировка с повторным входом: после того, как поток получает блокировку объекта, метод выполнения может получить ее, когда ему необходимо получить блокировку. как следующий код
private static final ReentrantLock LOCK = new ReentrantLock();
private static void m() {
LOCK.lock();
try {
log.info("begin");
// 调用m1()
m1();
} finally {
// 注意锁的释放
LOCK.unlock();
}
}
public static void m1() {
LOCK.lock();
try {
log.info("m1");
m2();
} finally {
// 注意锁的释放
LOCK.unlock();
}
}
Synchronized также является реентерабельной блокировкой.ReentrantLock имеет следующие преимущества:
- Поддерживает тайм-аут для получения блокировок
- Может быть прервано во время получения блокировки
- Может быть установлен на честную блокировку
- Могут быть разные условные переменные, то есть существует несколько наборов ожидания, которые можно указать для пробуждения.
api
// 默认非公平锁,参数传true 表示未公平锁
ReentrantLock lock = new ReentrantLock(false);
// 尝试获取锁
lock()
// 释放锁 应放在finally块中 必须执行到
unlock()
try {
// 获取锁时可被打断,阻塞中的线程可被打断
LOCK.lockInterruptibly();
} catch (InterruptedException e) {
return;
}
// 尝试获取锁 获取不到就返回false
LOCK.tryLock()
// 支持超时时间 一段时间没获取到就返回false
tryLock(long timeout, TimeUnit unit)
// 指定条件变量 休息室 一个锁可以创建多个休息室
Condition waitSet = ROOM.newCondition();
// 释放锁 进入waitSet等待 释放后其他线程可以抢锁
yanWaitSet.await()
// 唤醒具体休息室的线程 唤醒后 重写竞争锁
yanWaitSet.signal()
Пример: один поток выводит a, другой поток выводит b, один поток выводит c, а abc выводится последовательно, 5 раз подряд.
Этот тест представляет собой взаимодействие потоков, которое может быть достигнуто с помощью wait()/notify() и управляющих переменных.Здесь для достижения этой функции можно использовать ReentrantLock.
public static void main(String[] args) {
AwaitSignal awaitSignal = new AwaitSignal(5);
// 构建三个条件变量
Condition a = awaitSignal.newCondition();
Condition b = awaitSignal.newCondition();
Condition c = awaitSignal.newCondition();
// 开启三个线程
new Thread(() -> {
awaitSignal.print("a", a, b);
}).start();
new Thread(() -> {
awaitSignal.print("b", b, c);
}).start();
new Thread(() -> {
awaitSignal.print("c", c, a);
}).start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
awaitSignal.lock();
try {
// 先唤醒a
a.signal();
} finally {
awaitSignal.unlock();
}
}
}
class AwaitSignal extends ReentrantLock {
// 循环次数
private int loopNumber;
public AwaitSignal(int loopNumber) {
this.loopNumber = loopNumber;
}
/**
* @param print 输出的字符
* @param current 当前条件变量
* @param next 下一个条件变量
*/
public void print(String print, Condition current, Condition next) {
for (int i = 0; i < loopNumber; i++) {
lock();
try {
try {
// 获取锁之后等待
current.await();
System.out.print(print);
} catch (InterruptedException e) {
}
next.signal();
} finally {
unlock();
}
}
}
тупик
Говоря о взаимоблокировках, давайте возьмем пример.
Ниже приведена реализация кода
static Beer beer = new Beer();
static Story story = new Story();
public static void main(String[] args) {
new Thread(() ->{
synchronized (beer){
log.info("我有酒,给我故事");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (story){
log.info("小王开始喝酒讲故事");
}
}
},"小王").start();
new Thread(() ->{
synchronized (story){
log.info("我有故事,给我酒");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (beer){
log.info("老王开始喝酒讲故事");
}
}
},"老王").start();
}
class Beer {
}
class Story{
}
Взаимная блокировка не позволяет программе нормально работать
Средства обнаружения могут обнаруживать информацию о взаимоблокировках
Модель памяти Java (JMM)
jmm отражается в следующих трех аспектах
- Атомарность гарантирует, что инструкции не будут затронуты переключением контекста.
- Видимость Гарантирует, что инструкции не будут затронуты кешем ЦП
- Упорядочивание гарантирует, что параллельные оптимизации не повлияют на инструкции.
видимость
неудержимая программа
static boolean run = true;
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(() -> {
while (run) {
// ....
}
});
t.start();
Thread.sleep(1000);
// 线程t不会如预想的停下来
run = false;
}
Как показано на рисунке выше, у потока есть свой рабочий кеш.Когда основной поток модифицирует переменную и синхронизирует ее с основной памятью, поток t не читает ее, поэтому программа не может остановиться
упорядоченность
JVM может корректировать порядок выполнения операторов, не влияя на корректность программы, что также известно как изменение порядка инструкций.
static int i;
static int j;
// 在某个线程内执行如下赋值操作
i = ...;
j = ...;
有可能将j先赋值
атомарность
Атомарность должна быть знакома всем.Синхронизированный блок кода вышеуказанной блокировки синхронизации обеспечивает атомарность, то есть часть кода представляет собой единое целое, а атомарность обеспечивает потокобезопасность и не будет зависеть от переключения контекста.
volatile
Это ключевое слово относится к видимости и упорядочению, а volatile реализуется через барьеры памяти.
- написать барьер
Барьер записи будет добавлен после операции записи объекта, данные перед барьером записи будут синхронизированы с основной памятью, а порядок выполнения барьера записи будет гарантирован до барьера записи.
- прочитать барьер
Барьер чтения будет добавлен перед операцией чтения объекта, операторы после барьера чтения будут считаны из основной памяти, а код после барьера чтения будет гарантированно выполняться после барьера чтения.
Примечание: volatile не устраняет атомарность, т. е. с помощью этого ключевого слова невозможно обеспечить потокобезопасность.
volatile сценарий приложения: один поток читает переменную, а другой поток оперирует переменной.Добавление этого ключевого слова гарантирует, что поток, читающий переменную, сможет воспринять переменную во времени после того, как переменная будет записана.
замок без замка
cas (сравнить и поменять местами) сравнить и поменять местами
При присвоении значения переменной прочитайте значение v из памяти, получите новое значение n для замены и выполните метод compareAndSwap(), сравните v и значение в текущей памяти, чтобы убедиться, что они согласованы, и если они непротиворечивы, поменяйте местами n и v , если непротиворечивы, повторите попытку.
Нижний слой cas находится на уровне процессора, то есть атомарность операции может быть гарантирована без использования блокировок синхронизации.
private AtomicInteger balance;
// 模拟cas的具体操作
@Override
public void withdraw(Integer amount) {
while (true) {
// 获取当前值
int pre = balance.get();
// 进行操作后得到新值
int next = pre - amount;
// 比较并设置成功 则中断 否则自旋重试
if (balance.compareAndSet(pre, next)) {
break;
}
}
}
Эффективность блокировки без блокировки выше, чем у предыдущей блокировки, потому что без блокировки не происходит переключение потоков потоков.
cas — это идея оптимистичной блокировки, а sychronized — идея пессимистической блокировки
cas подходит для сценариев с небольшой конкуренцией потоков.Если конкуренция сильная, часто возникают повторные попытки, что снижает эффективность.
Параллельный пакет juc содержит атомарные классы, реализующие cas.
- AtomicInteger/AtomicBoolean/AtomicLong
- AtomicIntegerArray/AtomicLongArray/AtomicReferenceArray
- AtomicReference/AtomicStampedReference/AtomicMarkableReference
AtomicInteger
Общие API
new AtomicInteger(balance)
get()
compareAndSet(pre, next)
// i.incrementAndGet() ++i
// i.decrementAndGet() --i
// i.getAndIncrement() i++
// i.getAndDecrement() ++i
i.addAndGet()
// 传入函数式接口 修改i
int getAndUpdate(IntUnaryOperator updateFunction)
// cas 的核心方法
compareAndSet(int expect, int update)
АВА-проблема
В cas есть проблема ABA, то есть при сравнении и обмене, если исходное значение равно A, другие потоки изменят его на B, а другие потоки изменят его на A.
В этот момент действительно произошел обмен, но сравнение и обмен могут быть успешно обменены, поскольку значение не изменилось.
Решение
AtomicStampedReference/AtomicMarkableReference
Вышеупомянутые два класса решают проблему ABA.Принцип заключается в том, чтобы добавить номер версии к объекту и увеличивать номер версии каждый раз, когда он изменяется, чтобы избежать проблемы ABA.
Или добавьте идентификатор логической переменной и откорректируйте значение логической переменной после модификации, что также может избежать проблемы ABA.
Пул потоков
Введение в пулы потоков
Пул потоков является одним из наиболее важных знаний и трудностей в параллелизме Java, и он наиболее широко используется на практике.
Ресурсы потоков очень ценны и не могут создаваться бесконечно. Должны быть инструменты для управления потоками. Пул потоков — это инструмент для управления потоками. В разработке Java часто встречаются идеи объединения, такие как пул соединений с базой данных, пул соединений Redis и т. д.
Создайте несколько потоков заранее и выполняйте их непосредственно при отправке задачи, что может не только сэкономить время создания потоков, но и контролировать количество потоков.
Преимущества пулов потоков
- Сократите потребление ресурсов, уменьшите потребление создания потоков и уничтожения потоков и контролируйте ресурсы с помощью идеи объединения
- Повышение скорости отклика: когда задачи поступают, они могут выполняться без создания потоков.
- Обеспечить все более и более мощные функции с высокой масштабируемостью
Конструктор пула потоков
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}
Значение параметров конструктора
имя параметра | Значение параметра |
---|---|
corePoolSize | количество основных потоков |
maximumPoolSize | максимальное количество потоков |
keepAliveTime | Время простоя спасательного потока |
unit | Единица времени простоя спасательного потока |
workQueue | очередь блокировки |
threadFactory | Фабрика для создания потоков, в основном определяющая имя потока |
handler | политика отказа |
Случай с пулом потоков
Далее мы используем пример, чтобы понять параметры пула потоков и процесс получения задач в пуле потоков.
Банк ведет бизнес, как показано выше.
- Когда клиент приходит в банк, он открывает счетчик для обработки.Счетчик эквивалентен потоку, а клиент эквивалентен задаче.Есть два счетчика, которые всегда открыты, и три временных счетчика. 2 — это количество основных потоков, а 5 — максимальное количество потоков. то есть есть два основных потока
- Когда прилавок открылся на втором, они все еще обрабатывали бизнес. Когда клиенты возвращаются, они встают в очередь в зале ожидания. В зале ожидания всего три места.
- При заполнении зала очереди клиент продолжает открывать стойку для обработки.В настоящее время имеется максимум три временных стойки, то есть три аварийных потока.
- Если вы вернетесь к клиентам в этот момент, вы не сможете нормально вести для них бизнес и использовать стратегию отказа для работы с ними.
- Когда счетчик закончит обработку дела, он доставит задачу из зала очереди.Когда счетчик не может получить задачу после периода простоя, если текущее количество потоков больше, чем количество основных потоков, поток будут переработаны. То есть счетчик обнуляется.
Состояние пула потоков
Пул потоков представляет состояние пула потоков через верхние 3 бита переменной int, а младшие 29 бит хранят количество пулов потоков.
название штата | дай три | получать новые задания | Обработка блокирующих задач очереди | инструкция |
---|---|---|---|---|
Running | 111 | Y | Y | Принимайте задачи нормально, обрабатывайте задачи нормально |
Shutdown | 000 | N | Y | Не будет получать задачи, завершит задачи, которые выполняются, а также обработает задачи в очереди блокировки |
stop | 001 | N | N | Не будет получать задачи, прервет выполнение задач и откажется от обработки задач в очереди блокировки |
Tidying | 010 | N | N | Все задачи выполнены, текущий активный поток равен 0, и он вот-вот завершится |
Termitted | 011 | N | N | конечное состояние |
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
Основной процесс пула потоков
Действия по созданию пула потоков, получению задач, выполнению задач и повторному использованию потоков
- После создания пула потоков состояние пула потоков — Выполняется, и следующие шаги можно выполнять только в этом состоянии.
- При отправке задачи пул потоков создаст поток для обработки задачи.
- Когда количество рабочих потоков в пуле потоков достигает corePoolSize, продолжение отправки задач попадет в очередь блокировки.
- Когда очередь блокировки заполнена, продолжайте отправлять задачи, и для обработки будет создан аварийный поток.
- Когда количество рабочих потоков в пуле потоков достигает maxPoolSize, будет выполняться политика отклонения.
- Когда время извлечения задачи потоком достигает значения keepAliveTime, а задача не была получена, а количество рабочих потоков больше, чем corePoolSize, поток будет перезапущен.
Примечание: дело не в том, что вновь созданный поток является основным потоком, а созданный позже поток — неосновным потоком.У потоков нет понятия основного и неосновного.Это мое давнее недоразумение.
политика отказа
- Вызывающий объект выдает исключение RejectedExecutionException (стратегия по умолчанию).
- Разрешить вызывающему абоненту выполнить задачу
- Отказаться от этой задачи
- Отменить самую старую задачу в очереди на блокировку и присоединиться к задаче
Как отправить задание
// 执行Runnable
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
// 提交Callable
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
// 内部构建FutureTask
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
// 提交Runnable,指定返回值
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// 内部构建FutureTask
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
// 提交Runnable,指定返回值
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
// 内部构建FutureTask
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
Исполнители создают пулы потоков
Примечание. Следующие методы не рекомендуются
1.newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- Количество основных потоков = максимальное количество потоков Без аварийных потоков
- Неограниченная очередь блокировки может привести к oom
2.newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- Количество основных потоков равно 0, максимальное количество потоков не ограничено, а аварийный поток перезапускается через 60 секунд.
- Очередь реализована SynchronousQueue, пропускной способности нет, то есть после того, как она поставлена в очередь, ее нельзя поставить без потока, который ее заберет.
- Это может вызвать слишком много потоков и слишком большую нагрузку на ЦП.
3.newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- Количество основных потоков и максимальное количество потоков равны 1, аварийного потока нет, а неограниченная очередь может постоянно получать задачи.
- Сериализуйте задачи и выполняйте их одну за другой. Класс-оболочка используется для защиты и изменения некоторых параметров пула потоков, таких как corePoolSize.
- Если поток выдает исключение, он воссоздает поток для продолжения выполнения.
- может вызвать ом
4.newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
- Пул потоков планирования задач может указывать время задержки для вызова и может указывать вызов через определенные промежутки времени.
Закрытие пула потоков
shutdown()
Это приведет к отключению состояния пула потоков и не сможет получать задачи, но завершит выполнение задач в рабочих потоках и блокирующих очередях, что эквивалентно корректному завершению работы.
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
shutdownNow()
Это остановит состояние пула потоков, если он не может получить задачи, он немедленно прервет исполняемый рабочий поток и не будет выполнять задачи в очереди блокировки, а вернет список задач очереди блокировки.
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
Правильное использование пула потоков
Сложность пула потоков заключается в настройке параметров, существует набор теоретических параметров конфигурации.
С интенсивным использованием ЦП: относится к программе, которая в основном выполняет операции ЦП.
Количество ядерных потоков: количество ядер ЦП + 1
Интенсивный ввод-вывод: удаленный вызов RPC, работа с базой данных и т. д., без использования ЦП для многих операций. Сценарии для большинства приложений
Количество ядерных потоков = количество ядер * ожидаемая загрузка ЦП * общее время / время работы ЦП
Однако, основываясь на приведенной выше теории, его по-прежнему сложно настроить, поскольку вычислительное время процессора нелегко оценить.
Фактический размер конфигурации можно посмотреть в следующей таблице.
интенсивный процессор | йо интенсив | |
---|---|---|
количество потоков | Количество ядер | Количество ядер*50 |
длина очереди | y>=100 | 1<=y<=10 |
1. Параметры пула потоков настраиваются через распределенную конфигурацию, и нет необходимости перезапускать приложение для изменения конфигурации.
Параметры пула потоков изменяются в зависимости от количества онлайн-запросов. Лучше всего настроить количество основных потоков, максимальное количество потоков и размер очереди.
Основная конфигурация corePoolSize maxPoolSize queueSize
Java предоставляет методы для переопределения параметров, а пул потоков будет обрабатывать параметры для плавного изменения.
public void setCorePoolSize(int corePoolSize) {
}
2. Увеличьте мониторинг пула потоков
3. Тип io-intensive может быть настроен для добавления задач в максимальное количество потоков перед помещением задач в очередь блокировки.
Код Преимущественно перезаписываемая блокирующая очередь Способ добавления задач
public boolean offer(Runnable runnable) {
if (executor == null) {
throw new RejectedExecutionException("The task queue does not have executor!");
}
final ReentrantLock lock = this.lock;
lock.lock();
try {
int currentPoolThreadSize = executor.getPoolSize();
// 如果提交任务数小于当前创建的线程数, 说明还有空闲线程,
if (executor.getTaskCount() < currentPoolThreadSize) {
// 将任务放入队列中,让线程去处理任务
return super.offer(runnable);
}
// 核心改动
// 如果当前线程数小于最大线程数,则返回 false ,让线程池去创建新的线程
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false;
}
// 否则,就将任务放入队列中
return super.offer(runnable);
} finally {
lock.unlock();
}
}
3. Стратегия отказа Рекомендуется использовать стратегию отказа кота (дайте шанс)
// tomcat的源码
@Override
public void execute(Runnable command) {
if ( executor != null ) {
try {
executor.execute(command);
} catch (RejectedExecutionException rx) {
// 捕获到异常后 在从队列获取,相当于重试1取不到任务 在执行拒绝任务
if ( !( (TaskQueue) executor.getQueue()).force(command) ) throw new RejectedExecutionException("Work queue full.");
}
} else throw new IllegalStateException("StandardThreadPool not started.");
}
Рекомендуется изменить метод выборки задач из очереди: увеличить время ожидания, а время ожидания не может быть получено в течение 1 минуты до возврата.
public boolean offer(E e, long timeout, TimeUnit unit){}
Эпилог
Работаю года три-четыре, официально блог еще не вел.Самостоятельная работа всегда накапливалась конспектированием.Недавно заново изучил java multithreading, и задумался о написании блога об этой части контент на выходных и делиться им.
Объем статьи большой, так что ставьте большой лайк друзьям увидевшим ее здесь!Ввиду ограниченного уровня автора и первого раза написания блога, в статье неизбежно будут ошибки, и отзывы и поправки от друзей приветствуются.
Если вы считаете, что статья полезна для вас, пожалуйста,Ставьте лайки, комментируйте, вперед, смотритепогнали
Ваша поддержка - моя самая большая мотивация! ! !