Package java.util.concurrent
---> AtomicInteger
Lock
ReadWriteLock
1. Пожалуйста, расскажите о своем понимании volatile
1. Volatile — это облегченный механизм синхронизации, предоставляемый виртуальной машиной Java.
Гарантированная видимость, отсутствие гарантий атомарности, отсутствие переупорядочения инструкций
-
Гарантированная видимость
Когда несколько потоков обращаются к одной и той же переменной, один поток изменяет значение переменной, а другие потоки могут сразу увидеть измененное значение.
Пример, когда ключевое слово volatile не добавлено:
package com.jian8.juc; import java.util.concurrent.TimeUnit; /** * 1验证volatile的可见性 * 1.1 如果int num = 0,number变量没有添加volatile关键字修饰 * 1.2 添加了volatile,可以解决可见性 */ public class VolatileDemo { public static void main(String[] args) { visibilityByVolatile();//验证volatile的可见性 } /** * volatile可以保证可见性,及时通知其他线程,主物理内存的值已经被修改 */ public static void visibilityByVolatile() { MyData myData = new MyData(); //第一个线程 new Thread(() -> { System.out.println(Thread.currentThread().getName() + "\t come in"); try { //线程暂停3s TimeUnit.SECONDS.sleep(3); myData.addToSixty(); System.out.println(Thread.currentThread().getName() + "\t update value:" + myData.num); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } }, "thread1").start(); //第二个线程是main线程 while (myData.num == 0) { //如果myData的num一直为零,main线程一直在这里循环 } System.out.println(Thread.currentThread().getName() + "\t mission is over, num value is " + myData.num); } } class MyData { // int num = 0; volatile int num = 0; public void addToSixty() { this.num = 60; } }
Выходной результат:
thread1 come in thread1 update value:60 //线程进入死循环
когда мы добавляем
volatile
После ключевого словаvolatile int num = 0;
Результат:thread1 come in thread1 update value:60 main mission is over, num value is 60 //程序没有死循环,结束执行
-
Атомарность не гарантируется
Атомарность: неделимость и полнота, то есть когда поток выполняет конкретное дело, он не может быть заблокирован или разделен посередине, он должен быть полным в целом, успешным или неуспешным в одно и то же время.
Пример проверки (добавьте ключевое слово volatile в переменную, не добавляйте synchronized в метод):
package com.jian8.juc; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * 1验证volatile的可见性 * 1.1 如果int num = 0,number变量没有添加volatile关键字修饰 * 1.2 添加了volatile,可以解决可见性 * * 2.验证volatile不保证原子性 * 2.1 原子性指的是什么 * 不可分割、完整性,即某个线程正在做某个具体业务时,中间不可以被加塞或者被分割,需要整体完整,要么同时成功,要么同时失败 */ public class VolatileDemo { public static void main(String[] args) { // visibilityByVolatile();//验证volatile的可见性 atomicByVolatile();//验证volatile不保证原子性 } /** * volatile可以保证可见性,及时通知其他线程,主物理内存的值已经被修改 */ //public static void visibilityByVolatile(){} /** * volatile不保证原子性 * 以及使用Atomic保证原子性 */ public static void atomicByVolatile(){ MyData myData = new MyData(); for(int i = 1; i <= 20; i++){ new Thread(() ->{ for(int j = 1; j <= 1000; j++){ myData.addSelf(); myData.atomicAddSelf(); } },"Thread "+i).start(); } //等待上面的线程都计算完成后,再用main线程取得最终结果值 try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } while (Thread.activeCount()>2){ Thread.yield(); } System.out.println(Thread.currentThread().getName()+"\t finally num value is "+myData.num); System.out.println(Thread.currentThread().getName()+"\t finally atomicnum value is "+myData.atomicInteger); } } class MyData { // int num = 0; volatile int num = 0; public void addToSixty() { this.num = 60; } public void addSelf(){ num++; } AtomicInteger atomicInteger = new AtomicInteger(); public void atomicAddSelf(){ atomicInteger.getAndIncrement(); } }
Результат трех исполнений:
//1. main finally num value is 19580 main finally atomicnum value is 20000 //2. main finally num value is 19999 main finally atomicnum value is 20000 //3. main finally num value is 18375 main finally atomicnum value is 20000 //num并没有达到20000
-
Отключить перестановку инструкций
Порядок: когда компьютер выполняет программу, для повышения производительности компиляторы и процессоры частоповторный заказ, как правило, делится на следующие три
graph LR 源代码 --> id1["编译器优化的重排"] id1 --> id2[指令并行的重排] id2 --> id3[内存系统的重排] id3 --> 最终执行的指令 style id1 fill:#ff8000; style id2 fill:#fab400; style id3 fill:#ffd557;
В однопоточной среде гарантируется, что окончательный результат выполнения программы согласуется с результатом последовательного выполнения кода.
Процессор должен учитывать порядок между инструкциями при изменении порядказависимость данных
Потоки выполняются попеременно в многопоточной среде.Из-за оптимизации и реорганизации компилятора неясно, могут ли переменные, используемые в двух потоках, быть согласованными, и результаты нельзя предсказать.
Пример измененного кода:
Объявите переменную:
int a,b,x,y=0
нить 1 нить 2 x = a; y = b; b = 1; a = 2; результат x = 0 y=0 Если компилятор выполняет оптимизацию реорганизации этого программного кода, могут возникнуть следующие ситуации:
нить 1 нить 2 b = 1; a = 2; x= a; y = b; результат x = 2 y=1 Этот результат показывает, что в многопоточной среде из-за оптимизации и реорганизации компилятора неясно, могут ли переменные, используемые в двух потоках, гарантировать согласованность.
Реализация volatile запрещает перестановку инструкций, тем самым избегая феномена неупорядоченного выполнения программ в многопоточной среде.
барьер памяти(Барьер памяти), также известный как барьер памяти, представляет собой инструкцию ЦП, выполняющую две функции:
- Гарантированный порядок выполнения конкретных операций
- Гарантировать видимость памяти некоторых переменных (используйте эту функцию, чтобы добиться видимости энергозависимой памяти)
Поскольку и компилятор, и процессор могут выполнять оптимизацию перестановки инструкций. Если вы вставите барьер памяти перед инструкцией, он сообщит компилятору и ЦП, что никакая инструкция не может быть переупорядочена с помощью этой инструкции барьера памяти, то естьОтключите оптимизацию переупорядочения инструкций до и после барьера памяти, вставив барьер памяти. Другая функция барьера памяти состоит в принудительном сбросе данных кэша различных ЦП, чтобы любой поток на ЦП мог прочитать последнюю версию данных.
graph TB subgraph bbbb["对Volatile变量进行读操作时,<br>回在读操作之前加入一条load屏障指令,<br>从内存中读取共享变量"] ids6[Volatile]-->red3[LoadLoad屏障] red3-->id7["禁止下边所有普通读操作<br>和上面的volatile读重排序"] red3-->red4[LoadStore屏障] red4-->id9["禁止下边所有普通写操作<br>和上面的volatile读重排序"] red4-->id8[普通读] id8-->普通写 end subgraph aaaa["对Volatile变量进行写操作时,<br>回在写操作后加入一条store屏障指令,<br>将工作内存中的共享变量值刷新回到主内存"] id1[普通读]-->id2[普通写] id2-->red1[StoreStore屏障] red1-->id3["禁止上面的普通写和<br>下面的volatile写重排序"] red1-->id4["Volatile写"] id4-->red2[StoreLoad屏障] red2-->id5["防止上面的volatile写和<br>下面可能有的volatile读写重排序"] end style red1 fill:#ff0000; style red2 fill:#ff0000; style red4 fill:#ff0000; style red3 fill:#ff0000; style aaaa fill:#ffff00; style bbbb fill:#ffff00;
2. JMM (модель памяти Java)
JMM (модель памяти Java) сама по себе является абстрактным понятием и на самом деле не существует.Она описывает набор правил или спецификаций, посредством которых определяются различные переменные в программе (включая поля экземпляра, статические поля и объекты, составляющие массив) элемент ) метод доступа.
Регламент JMM по синхронизации:
- Прежде чем поток будет разблокирован, значение общей переменной должно быть сброшено обратно в основную память.
- Прежде чем поток заблокируется, он должен прочитать последнее значение основной памяти в свою рабочую память.
- Тот же замок, когда замок разблокирован
Поскольку сущностью работающей программы JVM является поток, и при создании каждого потока JVM создаст для него рабочую память (некоторые из них станут пространством стека).Рабочая память — это частная область данных каждого потока, и все переменные указаны в модели памяти java. хранятся восновная память, основная память — это разделяемая область памяти, доступ к которой имеют все потоки,Однако операция потока над переменной (присваивание чтения и т. д.) должна выполняться в рабочей памяти.Сначала переменная копируется из основной памяти в собственное пространство рабочей памяти, а затем выполняется операция с переменной.После операции завершается, переменная записывается обратно в основную память. , переменные в основной памяти не могут быть обработаны напрямую, а рабочая память в каждом потоке хранит копию переменной копии основной памяти, поэтому разные части потока не могут получить доступ к рабочей памяти друг друга, а связь (передача значений) между потоками должна осуществляться через основную память Процесс доступа выглядит следующим образом:
- видимость
- атомарность
- упорядоченность
3. Вы использовали volatile в этих местах
Когда обычный одноэлементный шаблон является многопоточным:
public class SingletonDemo {
private static SingletonDemo instance = null;
private SingletonDemo() {
System.out.println(Thread.currentThread().getName() + "\t 构造方法SingletonDemo()");
}
public static SingletonDemo getInstance() {
if (instance == null) {
instance = new SingletonDemo();
}
return instance;
}
public static void main(String[] args) {
//构造方法只会被执行一次
// System.out.println(getInstance() == getInstance());
// System.out.println(getInstance() == getInstance());
// System.out.println(getInstance() == getInstance());
//并发多线程后,构造方法会在一些情况下执行多次
for (int i = 0; i < 10; i++) {
new Thread(() -> {
SingletonDemo.getInstance();
}, "Thread " + i).start();
}
}
}
В некоторых случаях его конструктор будет выполняться несколько раз.
Решение:
-
Код DCL одноэлементного режима
DCL (механизм Double Check Lock) принимает решение до и после блокировки
public static SingletonDemo getInstance() { if (instance == null) { synchronized (SingletonDemo.class) { if (instance == null) { instance = new SingletonDemo(); } } } return instance; }
Большинство конструкторов результатов запуска будут выполняться только один раз., но механизм перестановки инструкций сделает программу с небольшой вероятностью того, что конструктор будет выполнен несколько раз
Механизм DCL (двусторонний замок) не обязательно является потокобезопасным., причина в наличии перестановки инструкций, добавление volatile может запретить перестановку инструкций
Причина в том, что когда поток выполняет первое обнаружение и считывает, что экземпляр не нулевой, ссылочный объект экземпляра может бытьинициализация не завершена. instance=new SingleDemo(); можно разделить на три шага (псевдокод):
memory = allocate();//1.分配对象内存空间 instance(memory); //2.初始化对象 instance = memory; //3.设置instance执行刚分配的内存地址,此时instance!=null
Зависимости данных между шагами 2 и 3 нет, а результат выполнения программы до или после перестановки не меняется в одном потоке, поэтому такая оптимизация перестановки разрешена,Если шаг 3 предшествует шагу 2, но экземпляр не был инициализирован
Однако переупорядочивание инструкций обеспечивает только согласованность последовательного семантического выполнения (один поток), но не заботится о семантической согласованности между несколькими потоками.
Поэтому, когда поток обращается к экземпляру, отличному от null, поскольку экземпляр экземпляра может быть не инициализирован, это также вызывает проблемы с безопасностью потока.
-
Нестабильный код режима Singleton
Чтобы решить вышеуказанные проблемы, вы можете добавить volatile в экземпляр SingletongDemo.
private static volatile SingletonDemo instance = null;
2. Знаете ли вы КАС?
1. compareAndSet----сравнить и обменять
AtomicInteger.conpareAndSet(int expect, indt update)
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
Первый параметр — полученное ожидаемое значение.Если ожидаемое значение согласуется со значением переменной в памяти, выполнить присваивание обновления.Если ожидаемое значение не согласуется со значением переменной в памяти, это доказывает, что данные был изменен, вернуть false и отменить назначение
пример:
package com.jian8.juc.cas;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 1.CAS是什么?
* 1.1比较并交换
*/
public class CASDemo {
public static void main(String[] args) {
checkCAS();
}
public static void checkCAS(){
AtomicInteger atomicInteger = new AtomicInteger(5);
System.out.println(atomicInteger.compareAndSet(5, 2019) + "\t current data is " + atomicInteger.get());
System.out.println(atomicInteger.compareAndSet(5, 2014) + "\t current data is " + atomicInteger.get());
}
}
Результат:
true current data is 2019
false current data is 2019
2. Каков основной принцип CAS? Понимание небезопасного
Сравнить значение в текущей рабочей памяти со значением в основной памяти, если они совпадают, выполнить указанную операцию, в противном случае продолжить сравнение до тех пор, пока значения в основной памяти и рабочей памяти не будут
-
atomicInteger.getAndIncrement();
public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1); }
-
Unsafe
-
Это основной класс CAS. Поскольку методы Java не могут напрямую обращаться к системе формирования, к ним необходимо обращаться через нативные методы. Unsafe эквивалентен бэкдору, на основе которого можно напрямую манипулировать конкретными данными памяти. Класс Unsafe существует в
sun.misc
В пакете его внутренние операции методов могут напрямую манипулировать памятью, как указатели C, потому что выполнение операций CAS в Java зависит от методов класса Unsafe.Все методы класса Unsafe изменены изначально, то есть методы класса Unsafe напрямую вызывают базовые ресурсы операционной системы для выполнения соответствующих задач.
-
Переменная valueOffset представляет адрес смещения значения переменной в памяти, потому что Unsafe получает данные в соответствии с адресом смещения памяти.
-
Значение переменной изменяется с помощью volatile, чтобы обеспечить видимость между несколькими потоками.
-
-
что такое КАС
Полное название CAS — Compare-And-Swap, это примитив параллелизма ЦП.
Его функция состоит в том, чтобы определить, является ли значение определенного места в памяти ожидаемым значением, и если это так, изменить его на новое значение.Этот процесс является атомарным.
Примитивы параллелизма CAS реализованы в языке JAVA и являются методами класса sun.misc.Unsafe. Вызовите метод CAS в классе Unsafe, и JVM поможет нам реализовать инструкции по сборке CAS. Это полностью аппаратно-зависимая функция, через которую реализуются атомарные операции. Поскольку CAS является системным примитивом, примитив относится к категории терминов операционной системы, которая состоит из нескольких инструкций и используется для завершения процесса определенной функции, а выполнение примитива должно быть непрерывным, а не разрешено прерывание, что означает, что CAS является атомарной инструкцией ЦП и не вызовет несогласованности данных.
//unsafe.getAndAddInt public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
var1 сам объект AtomicInteger
var2 ссылочный адрес объекта
данные var4, которые необходимо изменить
var5 Реальное значение в основной памяти, найденное var1 var2
Сравните с var5 со значением перед объектом;
Если то же самое, обновите var5+var4 и верните true,
Если отличается, продолжайте и сравните, пока обновление не будет завершено.
3. Недостатки CAS
-
Длительное время цикла и высокие накладные расходы
Например, когда выполняется метод getAndAddInt, возникает цикл do while. Если CAS дает сбой, он всегда будет пытаться. Если CAS дает сбой в течение длительного времени, это может привести к большим накладным расходам ЦП.
-
Только гарантированные атомарные операции над общей переменной
При работе с несколькими общими переменными CAS цикла не может гарантировать атомарность операции, в настоящее время для обеспечения атомарности можно использовать блокировки.
-
АВА-проблема
3. Проблема ABA атомарного класса AtomicInteger? Ссылка на атомарное обновление?
1. Как производится АБК
Важной предпосылкой алгоритма CAS является выборка данных в определенное время в памяти, их сравнение и замена в текущее время, тогда разница во времени приведет к изменению данных.
Напримернить 1взять A из ячейки памяти V,нить 2При этом A также вынимается из памяти, и поток 2 выполняет некоторые операции по изменению значения на B, а затем поток 2 меняет данные V-позиции на A. В это время поток 1 выполняет операцию CAS и находит что значение в памяти по-прежнему равно A, а затем поток 1 Успешная операция.
Хотя операция CAS потока 1 выполнена успешно, это не означает, что в процессе нет проблем.
2. Как решить? атомарная ссылка
Образец кода:
package juc.cas;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
import java.util.concurrent.atomic.AtomicReference;
public class AtomicRefrenceDemo {
public static void main(String[] args) {
User z3 = new User("张三", 22);
User l4 = new User("李四", 23);
AtomicReference<User> atomicReference = new AtomicReference<>();
atomicReference.set(z3);
System.out.println(atomicReference.compareAndSet(z3, l4) + "\t" + atomicReference.get().toString());
System.out.println(atomicReference.compareAndSet(z3, l4) + "\t" + atomicReference.get().toString());
}
}
@Getter
@ToString
@AllArgsConstructor
class User {
String userName;
int age;
}
выходной результат
true User(userName=李四, age=23)
false User(userName=李四, age=23)
3. Атомарные ссылки на метки времени
Добавить новый механизм, изменить номер версии
package com.jian8.juc.cas;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicStampedReference;
/**
* ABA问题解决
* AtomicStampedReference
*/
public class ABADemo {
static AtomicReference<Integer> atomicReference = new AtomicReference<>(100);
static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(100, 1);
public static void main(String[] args) {
System.out.println("=====以下时ABA问题的产生=====");
new Thread(() -> {
atomicReference.compareAndSet(100, 101);
atomicReference.compareAndSet(101, 100);
}, "Thread 1").start();
new Thread(() -> {
try {
//保证线程1完成一次ABA操作
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(atomicReference.compareAndSet(100, 2019) + "\t" + atomicReference.get());
}, "Thread 2").start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("=====以下时ABA问题的解决=====");
new Thread(() -> {
int stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + "\t第1次版本号" + stamp);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicStampedReference.compareAndSet(100, 101, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + "\t第2次版本号" + atomicStampedReference.getStamp());
atomicStampedReference.compareAndSet(101, 100, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + "\t第3次版本号" + atomicStampedReference.getStamp());
}, "Thread 3").start();
new Thread(() -> {
int stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + "\t第1次版本号" + stamp);
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean result = atomicStampedReference.compareAndSet(100, 2019, stamp, stamp + 1);
System.out.println(Thread.currentThread().getName() + "\t修改是否成功" + result + "\t当前最新实际版本号:" + atomicStampedReference.getStamp());
System.out.println(Thread.currentThread().getName() + "\t当前最新实际值:" + atomicStampedReference.getReference());
}, "Thread 4").start();
}
}
Выходной результат:
=====以下时ABA问题的产生=====
true 2019
=====以下时ABA问题的解决=====
Thread 3 第1次版本号1
Thread 4 第1次版本号1
Thread 3 第2次版本号2
Thread 3 第3次版本号3
Thread 4 修改是否成功false 当前最新实际版本号:3
Thread 4 当前最新实际值:100
В-четвертых, мы знаем, что ArrayList небезопасен для потоков, пожалуйста, напишите небезопасный случай и дайте решение
HashSet совместим с ArrayList HashMap
Нижний слой HashSet представляет собой HashMap, сохраненное значение помещается в ключ HashMap, а значение хранит НАСТОЯЩИЙ статический объект объекта.
1. Поток небезопасен
package com.jian8.juc.collection;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
/**
* 集合类不安全问题
* ArrayList
*/
public class ContainerNotSafeDemo {
public static void main(String[] args) {
notSafe();
}
/**
* 故障现象
* java.util.ConcurrentModificationException
*/
public static void notSafe() {
List<String> list = new ArrayList<>();
for (int i = 1; i <= 30; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(list);
}, "Thread " + i).start();
}
}
}
Ошибка:
Exception in thread "Thread 10" java.util.ConcurrentModificationException
2. Причины
Параллельная нормальная модификация приводит к
Один человек пишет, а другой одноклассник перехватывает его, что приводит к противоречивым данным и ненормальным одновременным изменениям.
3. Решение: CopyOnWriteArrayList
List<String> list = new Vector<>();//Vector线程安全
List<String> list = Collections.synchronizedList(new ArrayList<>());//使用辅助类
List<String> list = new CopyOnWriteArrayList<>();//写时复制,读写分离
Map<String, String> map = new ConcurrentHashMap<>();
Map<String, String> map = Collections.synchronizedMap(new HashMap<>());
Метод CopyOnWriteArrayList.add:
Контейнер CopyOnWrite означает копирование при записи, при добавлении контейнера к элементу он не добавляет напрямую в текущий контейнер Object[], а сначала копирует текущий контейнер Object[], копирует новый контейнер Object[] newElements, а затем добавляет его в текущий контейнер Object[].Добавьте элементы в новый контейнер.После добавления элементов укажите ссылку исходного контейнера на новый контейнер setArray(newElements), чтобы контейнер CopyOnWrite можно было читать одновременно без блокировка, потому что текущий контейнер не будет добавлять никаких элементов, поэтому контейнер CopyOnWrite также является своего рода разделением чтения-записи, чтением и записью разных контейнеров
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
5. Справедливые блокировки, нечестные блокировки, реентерабельные блокировки, рекурсивные блокировки, спин-блокировки? рукописный замок
1. Справедливая блокировка, нечестная блокировка
-
что
Справедливые замки обслуживаются в порядке очереди, а недобросовестные замки могут застревать.
Lock lock = new ReentrantLock(Boolean fair);
Несправедливо по умолчанию.-
честный замокЭто означает, что несколько потоков получают блокировки в том порядке, в котором они применяются для блокировок, аналогично очередности за приемом пищи.
-
несправедливый замокЭто означает, что порядок, в котором несколько потоков получают блокировки, не соответствует порядку подачи заявок на блокировку. Возможно, что потоки, которые применяются позже, получат блокировки первыми. В случае высокого параллелизма может произойти изменение приоритета или явление блокировки.
-
-
Разница между двумя
-
честный замок: потоки получают справедливую блокировку в том порядке, в котором они ее запрашивали.
Справедливая блокировка очень справедлива. В параллельной среде, когда каждый поток получает блокировку, он сначала проверяет очередь ожидания, поддерживаемую блокировкой. Если она пуста или текущий поток является первым в очереди ожидания, он будет владеть блокировкой, иначе она будет добавлена в очередь ожидания и в дальнейшем будет извлекаться из очереди в соответствии с правилами FIFO.
-
несправедливый замок: несправедливая блокировка допускает вмешательство: потоки, запрашивающие блокировку, могут пропустить очередь ожидающих потоков, если блокировка оказывается доступной, когда она запрашивается.
Несправедливые блокировки относительно грубы. Когда вы подходите, вы напрямую пытаетесь занять квоту. Если попытка не удалась, будет использоваться метод, аналогичный честным блокировкам.
-
-
other
Для Java ReentrantLock справедливость блокировки определяется конструктором.По умолчанию используется несправедливая блокировка.Преимущество несправедливой блокировки заключается в том, что пропускная способность выше, чем у справедливой блокировки.
Для Synchronized это несправедливая блокировка
2. Реентерабельное место (рекурсивная блокировка)
-
Что такое рекурсивная блокировка
Это означает, что после того, как внешняя функция того же потока получит блокировку, внутренняя рекурсивная функция все еще может получить код блокировки.Когда тот же поток получает блокировку во внешнем методе, он автоматически получит блокировку при входе во внутренний метод, то есть поток может войти в любой блок кода, синхронизированный блокировкой, которой он уже владеет.
-
ReentrantLock/Synchronized — типичная реентерабельная блокировка.
-
Самая большая роль реентерабельных блокировок заключается в том, чтобы избежать взаимоблокировок.
-
пример кода
package com.jian8.juc.lock; #### public static void main(String[] args) { Phone phone = new Phone(); new Thread(() -> { try { phone.sendSMS(); } catch (Exception e) { e.printStackTrace(); } }, "Thread 1").start(); new Thread(() -> { try { phone.sendSMS(); } catch (Exception e) { e.printStackTrace(); } }, "Thread 2").start(); } } class Phone{ public synchronized void sendSMS()throws Exception{ System.out.println(Thread.currentThread().getName()+"\t -----invoked sendSMS()"); Thread.sleep(3000); sendEmail(); } public synchronized void sendEmail() throws Exception{ System.out.println(Thread.currentThread().getName()+"\t +++++invoked sendEmail()"); } }
package com.jian8.juc.lock; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ReentrantLockDemo { public static void main(String[] args) { Mobile mobile = new Mobile(); new Thread(mobile).start(); new Thread(mobile).start(); } } class Mobile implements Runnable{ Lock lock = new ReentrantLock(); @Override public void run() { get(); } public void get() { lock.lock(); try { System.out.println(Thread.currentThread().getName()+"\t invoked get()"); set(); }finally { lock.unlock(); } } public void set(){ lock.lock(); try{ System.out.println(Thread.currentThread().getName()+"\t invoked set()"); }finally { lock.unlock(); } } }
3. Эксклюзивная блокировка (блокировка записи)/общая блокировка (блокировка чтения)/блокировка взаимного исключения
-
концепция
-
эксклюзивный замок: означает, что блокировка может удерживаться только одним потоком за раз, что является эксклюзивной блокировкой для ReentrantLock и Synchronized.
-
общий замок: Только блокировка может удерживаться несколькими потоками.
ReentrantReadWriteLockБлокировка чтения — это общая блокировка, а блокировка записи — эксклюзивная блокировка.
-
Мьютекс: Общая блокировка блокировки чтения может гарантировать, что одновременное чтение будет очень эффективным, а процессы чтения, записи и записи будут взаимоисключающими.
-
-
пример кода
package com.jian8.juc.lock; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * 多个线程同时读一个资源类没有任何问题,所以为了满足并发量,读取共享资源应该可以同时进行。 * 但是 * 如果有一个线程象取写共享资源来,就不应该自由其他线程可以对资源进行读或写 * 总结 * 读读能共存 * 读写不能共存 * 写写不能共存 */ public class ReadWriteLockDemo { public static void main(String[] args) { MyCache myCache = new MyCache(); for (int i = 1; i <= 5; i++) { final int tempInt = i; new Thread(() -> { myCache.put(tempInt + "", tempInt + ""); }, "Thread " + i).start(); } for (int i = 1; i <= 5; i++) { final int tempInt = i; new Thread(() -> { myCache.get(tempInt + ""); }, "Thread " + i).start(); } } } class MyCache { private volatile Map<String, Object> map = new HashMap<>(); private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); /** * 写操作:原子+独占 * 整个过程必须是一个完整的统一体,中间不许被分割,不许被打断 * * @param key * @param value */ public void put(String key, Object value) { rwLock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName() + "\t正在写入:" + key); TimeUnit.MILLISECONDS.sleep(300); map.put(key, value); System.out.println(Thread.currentThread().getName() + "\t写入完成"); } catch (Exception e) { e.printStackTrace(); } finally { rwLock.writeLock().unlock(); } } public void get(String key) { rwLock.readLock().lock(); try { System.out.println(Thread.currentThread().getName() + "\t正在读取:" + key); TimeUnit.MILLISECONDS.sleep(300); Object result = map.get(key); System.out.println(Thread.currentThread().getName() + "\t读取完成: " + result); } catch (Exception e) { e.printStackTrace(); } finally { rwLock.readLock().unlock(); } } public void clear() { map.clear(); } }
4. Спин-блокировка
-
spinlock
означает, что поток, пытающийся получить блокировку, блокируется не сразу, аИспользуйте цикл, чтобы попытаться получить блокировки, преимущество этого состоит в том, чтобы уменьшить потребление переключения контекста потока, недостаток в том, что цикл будет потреблять ресурсы ЦП
public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
Рукописная блокировка:
package com.jian8.juc.lock; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; /** * 实现自旋锁 * 自旋锁好处,循环比较获取直到成功为止,没有类似wait的阻塞 * * 通过CAS操作完成自旋锁,A线程先进来调用mylock方法自己持有锁5秒钟,B随后进来发现当前有线程持有锁,不是null,所以只能通过自旋等待,知道A释放锁后B随后抢到 */ public class SpinLockDemo { public static void main(String[] args) { SpinLockDemo spinLockDemo = new SpinLockDemo(); new Thread(() -> { spinLockDemo.mylock(); try { TimeUnit.SECONDS.sleep(3); }catch (Exception e){ e.printStackTrace(); } spinLockDemo.myUnlock(); }, "Thread 1").start(); try { TimeUnit.SECONDS.sleep(3); }catch (Exception e){ e.printStackTrace(); } new Thread(() -> { spinLockDemo.mylock(); spinLockDemo.myUnlock(); }, "Thread 2").start(); } //原子引用线程 AtomicReference<Thread> atomicReference = new AtomicReference<>(); public void mylock() { Thread thread = Thread.currentThread(); System.out.println(Thread.currentThread().getName() + "\t come in"); while (!atomicReference.compareAndSet(null, thread)) { } } public void myUnlock() { Thread thread = Thread.currentThread(); atomicReference.compareAndSet(thread, null); System.out.println(Thread.currentThread().getName()+"\t invoked myunlock()"); } }
6. Использовали ли вы CountDownLatch/CyclicBarrier/Semaphore?
1. CountDownLatch (обратный отсчет запуска ракеты)
-
Это позволяет одному или нескольким потокам ждать, пока операция другого потока не будет выполнена, прежде чем выполнять ее. Например, основной поток приложения хочет выполнить поток после того, как поток, отвечающий за запуск службы фреймворка, запустил все службы фреймворка.
-
CountDownLatch в основном имеет два метода: когда один или несколько потоков вызывают метод await(), вызывающий поток будет заблокирован. Когда другие потоки вызывают метод countDown(), значение счетчика уменьшается на 1. Когда значение счетчика становится равным 0, поток, заблокированный вызовом метода await(), пробуждается и продолжает выполняться.
-
Пример кода:
package com.jian8.juc.conditionThread; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { // general(); countDownLatchTest(); } public static void general(){ for (int i = 1; i <= 6; i++) { new Thread(() -> { System.out.println(Thread.currentThread().getName()+"\t上完自习,离开教室"); }, "Thread-->"+i).start(); } while (Thread.activeCount()>2){ try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName()+"\t=====班长最后关门走人"); } public static void countDownLatchTest() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(6); for (int i = 1; i <= 6; i++) { new Thread(() -> { System.out.println(Thread.currentThread().getName()+"\t被灭"); countDownLatch.countDown(); }, CountryEnum.forEach_CountryEnum(i).getRetMessage()).start(); } countDownLatch.await(); System.out.println(Thread.currentThread().getName()+"\t=====秦统一"); } }
2. CyclicBarrier (соберите семь шаров дракона, чтобы вызвать дракона)
-
CycliBarrier
Циклический барьер. Пусть группа потоков будет заблокирована, когда они достигнут барьера (также называемого точкой синхронизации).Когда последний поток достигнет барьера, барьер откроет дверь, и все потоки, перехваченные барьером, продолжат работу.Поток входит барьер и передает метод ожидания CycliBarrier.()
-
Пример кода:
package com.jian8.juc.conditionThread; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierDemo { public static void main(String[] args) { cyclicBarrierTest(); } public static void cyclicBarrierTest() { CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> { System.out.println("====召唤神龙====="); }); for (int i = 1; i <= 7; i++) { final int tempInt = i; new Thread(() -> { System.out.println(Thread.currentThread().getName() + "\t收集到第" + tempInt + "颗龙珠"); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }, "" + i).start(); } } }
3. Семафор семафор
Может заменить синхронизацию и блокировку
-
Семафоры в основном используются для двух целей: одна для взаимного исключения нескольких общих ресурсов, а другая для управления количеством одновременных потоков.
-
Пример кода:
Примеры хватают пробелы:
package com.jian8.juc.conditionThread; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; public class SemaphoreDemo { public static void main(String[] args) { Semaphore semaphore = new Semaphore(3);//模拟三个停车位 for (int i = 1; i <= 6; i++) {//模拟6部汽车 new Thread(() -> { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + "\t抢到车位"); try { TimeUnit.SECONDS.sleep(3);//停车3s } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "\t停车3s后离开车位"); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } }, "Car " + i).start(); } } }
Семь, блокирующая очередь
- ArrayBlockingQueueЭто ограниченная блокирующая очередь, основанная на структуре массива, которая сортирует элементы по принципу FIFO.
- LinkedBlockingQueueЭто блокирующая очередь на основе структуры связанного списка, эта очередь сортирует элементы по принципу FIFO, а пропускная способность обычно выше, чем у ArrayBlockingQueue.
- SynchronousQueueЭто блокирующая очередь, в которой не хранятся элементы.Каждая операция вставки должна ждать, пока другой поток не вызовет операцию удаления, иначе операция вставки всегда будет заблокирована.
1. Очереди и блокирующие очереди
-
Первая — это очередь, и роль блокирующей очереди в структуре данных примерно такова:
graph LR Thread1-- put -->id1["阻塞队列"] subgraph BlockingQueue id1 end id1-- Take -->Thread2 蛋糕师父--"放(柜满阻塞)"-->id2[蛋糕展示柜] subgraph 柜 id2 end id2--"取(柜空阻塞)"-->顾客
Тема 1 Добавляет элементы в очередь блокировки, а поток 2 удаляет элементы из очереди блокировки
Когда очередь блокировки пуста, удалить из очередиПолучатьОперации над элементами заблокированы
Когда очередь блокировки заполнена, удалить из очередиДобавить кОперации над элементами заблокированы
Потоки, пытающиеся получить элементы из пустой очереди блокировки, будут заблокированы до тех пор, пока другие потоки не вставят новые элементы в пустую очередь.
Потоки, пытающиеся добавить новые элементы в полную очередь блокировки, также будут заблокированы до тех пор, пока другой поток не удалит один или несколько элементов из очереди или полностью не очистит очередь, и очередь снова не станет бездействующей и не будут сделаны новые добавления.
2. Зачем использовать? какая польза?
-
В области многопоточности: так называемая блокировка, в некоторых случаях поток будет приостановлен, после выполнения условий приостановленный поток будет автоматически разбужен.
-
Зачем вам BlockingQueue
С точки зрения преимуществ, нам не нужно заботиться о том, когда нам нужно заблокировать поток и когда нам нужно разбудить поток, потому что BlockingQueue делает все это за вас.
До того, как пакет concurrent был выпущен, в многопоточной среде каждый из наших программистов должен был контролировать эти детали, особенно в отношении эффективности и безопасности потоков, и на этот раз наша программа сильно усложнилась.
3. Основной метод BlockingQueue
тип метода | Выбросить исключение | особая ценность | блокировать | тайм-аут |
---|---|---|---|---|
вставлять | add(e) | offer(e) | put(e) | offer(e,time,unit) |
Удалить | remove() | poll() | take | poll(time,unit) |
экзамен | element() | peek() | недоступен | недоступен |
тип метода | status |
---|---|
Выбросить исключение | Когда очередь блокировки заполнена, добавление в очередь вызоветIllegalStateException: Queue full Когда очередь блокировки пуста, удаление в сетевой очереди вызовет NoSuchElementException
|
особая ценность | Вставить метод, успех истина ошибка ложь Метод удаления, который успешно возвращает элемент из очереди, возвращает null, если очереди нет. |
продолжай блокировать | Когда блокирующая очередь заполнена, поток-производитель продолжает помещать элементы в очередь, и очередь всегда будет блокировать поток, пока не узнает, что нужно поместить данные или выйти в ответ на прерывание. Когда блокирующая очередь пуста, поток-потребитель пытается взять элементы из очереди, и очередь будет блокировать поток-потребитель до тех пор, пока очередь не станет доступной. |
тайм-аут | Когда очередь блокировки заполнена, очередь заблокирует поток производителя на определенное время, и поток производителя завершится после превышения лимита времени. |
4. Обзор архитектуры + анализ категорий
-
видовой анализ
- ArrayBlockingQueue: Ограниченная блокирующая очередь, состоящая из структур данных.
-
LinkedBlockingQueue: ограничен структурой связанного списка (но размер по умолчанию равен
Integer.MAX_VALUE
) блокирует очередь. - PriorityBlockingQueue: неограниченная очередь блокировки, поддерживающая сортировку по приоритету.
- DelayQueue: отложенная неограниченная блокирующая очередь, реализованная с использованием приоритетной очереди.
- ==SychronousQueue==: Блокирующая очередь, в которой не хранятся элементы, то есть очередь из одного элемента.
- LinkedTransferQueue: неограниченная очередь блокировки, состоящая из структуры связанного списка.
- LinkedBlockingDeque: Двунаправленная блокирующая очередь, состоящая из календарной структуры.
-
SychronousQueue
-
Теория: SynchronousQueue не имеет емкости. В отличие от других BlockingQueue, SynchronousQueue — это BlockingQueue, в которой не хранятся элементы. Каждая операция размещения должна ждать операции взятия, иначе она не может продолжать добавлять элементы, и наоборот.
-
пример кода
package com.jian8.juc.queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; /** * ArrayBlockingQueue是一个基于数组结构的有界阻塞队列,此队列按FIFO原则对元素进行排序 * LinkedBlockingQueue是一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue * SynchronousQueue是一个不存储元素的阻塞队列,灭个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 * 1.队列 * 2.阻塞队列 * 2.1 阻塞队列有没有好的一面 * 2.2 不得不阻塞,你如何管理 */ public class SynchronousQueueDemo { public static void main(String[] args) throws InterruptedException { BlockingQueue<String> blockingQueue = new SynchronousQueue<>(); new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + "\t put 1"); blockingQueue.put("1"); System.out.println(Thread.currentThread().getName() + "\t put 2"); blockingQueue.put("2"); System.out.println(Thread.currentThread().getName() + "\t put 3"); blockingQueue.put("3"); } catch (InterruptedException e) { e.printStackTrace(); } }, "AAA").start(); new Thread(() -> { try { TimeUnit.SECONDS.sleep(5); System.out.println(Thread.currentThread().getName() + "\ttake " + blockingQueue.take()); TimeUnit.SECONDS.sleep(5); System.out.println(Thread.currentThread().getName() + "\ttake " + blockingQueue.take()); TimeUnit.SECONDS.sleep(5); System.out.println(Thread.currentThread().getName() + "\ttake " + blockingQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } }, "BBB").start(); } }
-
5. Где используется
-
Модель «производитель-потребитель»
-
традиционный
package com.jian8.juc.queue; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 一个初始值为零的变量,两个线程对其交替操作,一个加1一个减1,来5轮 * 1. 线程 操作 资源类 * 2. 判断 干活 通知 * 3. 防止虚假唤起机制 */ public class ProdConsumer_TraditionDemo { public static void main(String[] args) { ShareData shareData = new ShareData(); for (int i = 1; i <= 5; i++) { new Thread(() -> { try { shareData.increment(); } catch (Exception e) { e.printStackTrace(); } }, "ProductorA " + i).start(); } for (int i = 1; i <= 5; i++) { new Thread(() -> { try { shareData.decrement(); } catch (Exception e) { e.printStackTrace(); } }, "ConsumerA " + i).start(); } for (int i = 1; i <= 5; i++) { new Thread(() -> { try { shareData.increment(); } catch (Exception e) { e.printStackTrace(); } }, "ProductorB " + i).start(); } for (int i = 1; i <= 5; i++) { new Thread(() -> { try { shareData.decrement(); } catch (Exception e) { e.printStackTrace(); } }, "ConsumerB " + i).start(); } } } class ShareData {//资源类 private int number = 0; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void increment() throws Exception { lock.lock(); try { //1.判断 while (number != 0) { //等待不能生产 condition.await(); } //2.干活 number++; System.out.println(Thread.currentThread().getName() + "\t" + number); //3.通知 condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void decrement() throws Exception { lock.lock(); try { //1.判断 while (number == 0) { //等待不能消费 condition.await(); } //2.消费 number--; System.out.println(Thread.currentThread().getName() + "\t" + number); //3.通知 condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }
-
Блокирующая версия очереди
package com.jian8.juc.queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class ProdConsumer_BlockQueueDemo { public static void main(String[] args) { MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10)); new Thread(() -> { System.out.println(Thread.currentThread().getName() + "\t生产线程启动"); try { myResource.myProd(); } catch (Exception e) { e.printStackTrace(); } }, "Prod").start(); new Thread(() -> { System.out.println(Thread.currentThread().getName() + "\t消费线程启动"); try { myResource.myConsumer(); } catch (Exception e) { e.printStackTrace(); } }, "Consumer").start(); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("5s后main叫停,线程结束"); try { myResource.stop(); } catch (Exception e) { e.printStackTrace(); } } } class MyResource { private volatile boolean flag = true;//默认开启,进行生产+消费 private AtomicInteger atomicInteger = new AtomicInteger(); BlockingQueue<String> blockingQueue = null; public MyResource(BlockingQueue<String> blockingQueue) { this.blockingQueue = blockingQueue; System.out.println(blockingQueue.getClass().getName()); } public void myProd() throws Exception { String data = null; boolean retValue; while (flag) { data = atomicInteger.incrementAndGet() + ""; retValue = blockingQueue.offer(data, 2, TimeUnit.SECONDS); if (retValue) { System.out.println(Thread.currentThread().getName() + "\t插入队列" + data + "成功"); } else { System.out.println(Thread.currentThread().getName() + "\t插入队列" + data + "失败"); } TimeUnit.SECONDS.sleep(1); } System.out.println(Thread.currentThread().getName() + "\t大老板叫停了,flag=false,生产结束"); } public void myConsumer() throws Exception { String result = null; while (flag) { result = blockingQueue.poll(2, TimeUnit.SECONDS); if (null == result || result.equalsIgnoreCase("")) { flag = false; System.out.println(Thread.currentThread().getName() + "\t超过2s没有取到蛋糕,消费退出"); System.out.println(); return; } System.out.println(Thread.currentThread().getName() + "\t消费队列" + result + "成功"); } } public void stop() throws Exception { flag = false; } }
-
-
Пул потоков
-
ПО промежуточного слоя сообщений
6. В чем разница между синхронизированным и заблокированным? В чем преимущество использования нового замка? Пожалуйста, приведите пример
разница
-
оригинальная композиция
-
Ключевое слово принадлежит jvm при синхронизации
monitorenter,нижний слой делается через объект монитора.На самом деле такие методы как ожидание/уведомление тоже зависят от объекта монитора.Только в синхронизации или методах можно ждать/уведомлять и другие методы настраиваются.
monitorexit
-
Блокировка — это конкретный класс, блокировка на уровне API (java.util.)
-
-
инструкции
- Synchronized не требует, чтобы пользователь вручную снял блокировку. Когда синхронизированный код выполняется, система автоматически позволит потоку снять блокировку.
- ReentrantLock требует, чтобы пользователь снял блокировку вручную. Если блокировка активно не снимается, это может привести к взаимоблокировке. Методы lock() и unlock() должны быть завершены блоком операторов try/finally.
-
Можно ли прервать ожидание
- синхронизация не может быть прервана, если не выброшено исключение или не завершена нормальная работа
- ReentrantLock можно прервать, установить метод тайм-аута tryLock(длинный тайм-аут, блок TimeUnit) или поместить lockInterruptably() в блок кода и вызвать метод interrupt() для прерывания.
-
Справедливо ли запирать
- синхронизированная нечестная блокировка
- ReentrantLock может быть обоими, по умолчанию используется справедливая блокировка, конструктор может передавать логическое значение, true — справедливая блокировка, false — нечестная блокировка.
-
Замки связывают несколько условий
- синхронизированный нет
- ReentrantLock используется для пробуждения потоков, которые необходимо пробуждать в группах, и может просыпаться точно, а не пробуждать поток случайным образом или пробуждать все потоки, например синхронизированные.
package com.jian8.juc.lock; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * synchronized和lock区别 * <p===lock可绑定多个条件=== * 对线程之间按顺序调用,实现A>B>C三个线程启动,要求如下: * AA打印5次,BB打印10次,CC打印15次 * 紧接着 * AA打印5次,BB打印10次,CC打印15次 * 。。。。 * 来十轮 */ public class SyncAndReentrantLockDemo { public static void main(String[] args) { ShareData shareData = new ShareData(); new Thread(() -> { for (int i = 1; i <= 10; i++) { shareData.print5(); } }, "A").start(); new Thread(() -> { for (int i = 1; i <= 10; i++) { shareData.print10(); } }, "B").start(); new Thread(() -> { for (int i = 1; i <= 10; i++) { shareData.print15(); } }, "C").start(); } } class ShareData { private int number = 1;//A:1 B:2 C:3 private Lock lock = new ReentrantLock(); private Condition condition1 = lock.newCondition(); private Condition condition2 = lock.newCondition(); private Condition condition3 = lock.newCondition(); public void print5() { lock.lock(); try { //判断 while (number != 1) { condition1.await(); } //干活 for (int i = 1; i <= 5; i++) { System.out.println(Thread.currentThread().getName() + "\t" + i); } //通知 number = 2; condition2.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void print10() { lock.lock(); try { //判断 while (number != 2) { condition2.await(); } //干活 for (int i = 1; i <= 10; i++) { System.out.println(Thread.currentThread().getName() + "\t" + i); } //通知 number = 3; condition3.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void print15() { lock.lock(); try { //判断 while (number != 3) { condition3.await(); } //干活 for (int i = 1; i <= 15; i++) { System.out.println(Thread.currentThread().getName() + "\t" + i); } //通知 number = 1; condition1.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }
8. Использовали ли вы пул потоков? ThreadPoolExecutor говорит о вашем понимании
1. Использование интерфейса Callable
package com.jian8.juc.thread;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
/**
* 多线程中,第三种获得多线程的方式
*/
public class CallableDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//FutureTask(Callable<V> callable)
FutureTask<Integer> futureTask = new FutureTask<Integer>(new MyThread2());
new Thread(futureTask, "AAA").start();
// new Thread(futureTask, "BBB").start();//复用,直接取值,不要重启两个线程
int a = 100;
int b = 0;
//b = futureTask.get();//要求获得Callable线程的计算结果,如果没有计算完成就要去强求,会导致堵塞,直到计算完成
while (!futureTask.isDone()) {//当futureTask完成后取值
b = futureTask.get();
}
System.out.println("*******Result" + (a + b));
}
}
class MyThread implements Runnable {
@Override
public void run() {
}
}
class MyThread2 implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("Callable come in");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1024;
}
}
2. Зачем использовать пул потоков
-
Основная задача пула потоков - контролировать количество запущенных потоков, ставить задачи в очередь во время обработки, а затем запускать эти задачи после создания потоков.Если количество потоков превышает максимальное число, лишние потоки будут ждать в очереди и т.д. После выполнения потока задача выводится из очереди на выполнение.
-
Главная особенность
Повторное использование потоков, контроль максимального количества параллелизма, управление потоками
- Сократите потребление ресурсов, уменьшите потребление, вызванное созданием и уничтожением потоков, путем повторного использования созданных потоков.
- упомянул скорость ответа. Когда задача поступает, задача может быть выполнена немедленно, не дожидаясь создания потока.
- Объективный идеал поднятия нити. Потоки - это дефицитные ресурсы. Если они создаются без ограничений, это не только потребляет системные ресурсы, но и снижает стабильность системы. Использование пулов потоков можно использовать для унифицированного распределения, настройки и мониторинга.
3. Как использовать пул потоков
-
Описание архитектуры
Пул потоков в Java реализован через фреймворк Executor, который использует Executor, Executors, ExecutorService, ThreadPoolExecutor
graph BT 类-Executors 类-ScheduledThreadPoolExecutor-->类-ThreadPoolExecutor 类-ThreadPoolExecutor-->类-AbstractExecutorService 类-AbstractExecutorService-.->接口-ExecutorService 类-ScheduledThreadPoolExecutor-.->接口-ScheduledExecutorService 接口-ScheduledExecutorService-->接口-ExecutorService 接口-ExecutorService-->接口-Executor
-
Реализация кодирования
Существует пять реализаций: Executors.newScheduledThreadPool() запланировано со временем, а java8 недавно запущен Executors.newWorkStealingPool(int), который использует текущий процессор, доступный на машине, в качестве своего параллельного уровня.
Есть три ключевых момента
-
Executors.newFixedThreadPool(int)
Выполнение долгосрочных задач, производительность намного лучше
Создайте пул потоков фиксированной длины, чтобы контролировать максимальное количество одновременных потоков, а лишние потоки будут ждать в очереди.
Ожидается, что значения corePoolSize и maxPoolSize пула потоков, созданного newFixedThreadPool, будут ждать. Он использует LinkedBlockingQueue
-
Executors.newSingleThreadExecutor()
Сценарий выполнения задачи за задачей
Создайте однопоточный пул потоков, который будет использовать только один рабочий поток для выполнения задач, гарантируя, что все задачи выполняются в указанном порядке.
newSingleThreadExecutor устанавливает для corePoolSize и maxPoolSize значение 1, используя LinkedBlockingQueue
-
Executors.newCachedThreadPool()
Выполнять множество недолговечных асинхронных апплетов или слабо загруженных серверов.
Создайте кешируемый пул потоков. Если длина пула потоков превышает требования к обработке, неиспользуемый округ можно гибко восстановить. Если восстановления нет, будет создан новый поток.
newCachedThreadPool устанавливает для corePoolSize значение 0, maxPoolSize — для Integer.MAX_VALUE и использует SynchronousQueue, что означает, что при поступлении задачи создается поток для выполнения, а когда округ простаивает более 60 с, поток уничтожается
-
-
ThreadPoolExecutor
4. Знакомство с несколькими важными параметрами пула потоков
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
-
corePoolSize: количество резидентных основных потоков в пуле потоков.
- После создания пула потоков, когда приходит задача запроса, потоки в пуле будут организованы для выполнения задачи запроса.
- Когда количество потоков в пуле потоков достигает corePoolSize, прибывающие задачи будут помещены в очередь кеша.
- maximumPoolSize: пул потоков может вместить максимальное количество потоков, которые могут выполняться одновременно, которое должно быть больше или равно 1.
-
keepAliveTime: время выживания избыточных бездействующих потоков.
- Когда текущее количество пулов потоков превысит corePoolSize, а время простоя приостановки достигнет значения keepAliveTime, лишние простаивающие потоки будут уничтожены до тех пор, пока не останутся только потоки corePoolSize.
- unit: Единица keepAliveTime
- workQueue: очередь задач, задачи отправлены, но еще не выполнены
- threadFactory: указывает фабрику потоков, которая создает рабочие потоки в пуле потоков, который используется для создания потоков. В основном мы настраиваем имя потока, соответствующее текущему бизнесу. Как правило, можно использовать имя по умолчанию.
- handler: Стратегия отклонения, указывающая, как отклонить работоспособную стратегию для запроса выполнения, когда очередь заполнена, а рабочие потоки больше или равны максимальному количеству потоков в пуле потоков (maximumPoolSize).
5. Основной принцип работы пула потоков
graph LR
subgraph 使用者
main(提交任务)
end
main-->core{核心线程是否已满}
subgraph 线程池
core--是-->queue{队列是否已满}
queue--是-->pool{线程池是否已满}
pool--是-->reject["按照拒绝策略处理<br>无法执行的任务"]
core--否-->id[创建线程执行任务]
queue--否-->任务入队列等待
pool--否-->创建线程执行任务
end
обработать
-
После создания пула потоков дождитесь отправленного запроса задачи.
-
Когда вызывается метод execute() для добавления задачи запроса, пул потоков выносит следующие суждения:
2.1 Если количество запущенных потоков меньше, чем corePoolSize, создать поток для немедленного запуска задачи;
2.2 Если количество запущенных потоков больше или равно corePoolSize, поставить задачу в очередь;
2.3 Если в это время очередь заполнена и количество запущенных потоков меньше maxPoolSize, то создайте непрофильный поток для немедленного запуска задачи
2.4 Если очередь заполнена и количество запущенных потоков больше или равно maxmumPoolSize, то запустить стратегию отклонения насыщения для выполнения
-
Когда поток завершает задачу, он выполняет следующую задачу из очереди.
-
Когда потоку нечего делать в течение определенного времени (keepAliveTime), пул потоков оценивает:
Если количество текущих потоков больше, чем corePoolSize, поток будет остановлен, поэтому после завершения всех задач в пуле потоков он уменьшится до размера corePoolSize не более
9. Использовали ли вы пул потоков? Как вы устанавливаете разумные параметры в производстве
1. Политика отклонения пула потоков
-
что такое стратегия потока
Очередь ожидания также заполнена, и новые задачи больше не могут быть заполнены.В то же время, максимальный поток в пуле потоков также достиг, и он не может продолжать обслуживать новые задачи. В настоящее время нам необходимо отказаться от политического механизма, чтобы разумно решить эту проблему.
-
Встроенная политика отклонения JDK
-
АбортПолици (по умолчанию)
Выброс RejectedExecutionException напрямую препятствует нормальной работе системы.
-
CallerRunsPolicy
caller-run — механизм регулирования, который не отбрасывает задачи и не генерирует исключения, а вместо этого откатывает некоторые задачи вызывающей стороне, уменьшая поток новых задач.
-
DiscardOldestPolicy
Отказаться от самой длинной ожидающей задачи в очереди, затем добавить текущую задачу в очередь и попытаться отправить текущую задачу еще раз
-
DiscardPolicy
Отменить задачу напрямую, без какой-либо обработки или создания исключения. Это лучшее решение, если задача может быть потеряна
-
-
Оба реализуют интерфейс RejectedExecutionHandler.
2. В вашей работе есть три метода создания пула потоков с одним/фиксированным номером/переменной, какой из них больше
Нет, мы можем использовать только пользовательские в производстве! ! ! !
Почему?
Пулы потоков нельзя создавать с помощью Executors. Попробуйте указать размер очереди задач через ThreadPoolExecutor, чтобы избежать риска исчерпания ресурсов.
FixedThreadPool и SingleThreadPool допускают, чтобы длина очереди запросов была равна Integer.MAX_VALUE, что может аккумулировать большое количество запросов; CachedThreadPool и ScheduledThreadPool позволяют использовать количество создаваемых потоков, равное Integer.MAX_VALUE, что может создавать большое количество потоков, что приводит к OOM
3. Как вы используете пул потоков на работе?Настроили ли вы использование пула потоков?
package com.jian8.juc.thread;
import java.util.concurrent.*;
/**
* 第四种获得java多线程的方式--线程池
*/
public class MyThreadPoolDemo {
public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(3, 5, 1L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardPolicy());
//new ThreadPoolExecutor.AbortPolicy();
//new ThreadPoolExecutor.CallerRunsPolicy();
//new ThreadPoolExecutor.DiscardOldestPolicy();
//new ThreadPoolExecutor.DiscardPolicy();
try {
for (int i = 1; i <= 10; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
4. Как вы считаете рациональную конфигурацию пула потоков?
-
интенсивное использование процессора
Интенсивность ЦП означает, что задача требует большого количества вычислений без блокировки, а ЦП всегда работает на полной скорости.
Задачи, интенсивно использующие процессор, могут быть ускорены (за счет многопоточности) только на действительно многоядерных процессорах.
На одноядерном ЦП, сколько ни открывай смоделированных многопотоков, задача не может быть ускорена, потому что суммарная вычислительная мощность ЦП составляет только те
Настройте как можно меньше потоков для задач, интенсивно использующих ЦП:
Общая формула: Пул потоков ядер ЦП + 1 поток
-
Интенсивный ввод-вывод
-
Поскольку потоки задач с интенсивным вводом-выводом не всегда выполняют задачи, вы должны настроить как можно больше потоков, например, количество ядер ЦП * 2.
-
IO интенсивно, то есть задача требует много IO, то есть много блокировок.
Выполнение задач с интенсивным вводом-выводом в одном потоке может привести к трате большого количества ресурсов процессора на ожидание.
Таким образом, использование многопоточности в задачах с интенсивным вводом-выводом может значительно ускорить выполнение программы, даже на одноядерном процессоре это ускорение в основном основано на потере времени блокировки.
При интенсивном вводе-выводе блокируется большинство потоков, поэтому необходимо настроить больше потоков:
Справочная формула: количество ядер ЦП/(1-коэффициент блокировки) Коэффициент блокировки находится в диапазоне от 0,8 до 0,9.
Восьмиъядерный ЦП: 8/(1-0, 9)=80
-
10. Кодирование тупиков и анализ местоположения
-
что такое тупик
Взаимоблокировка относится к явлению, при котором два или более процессов ожидают друг друга из-за конкуренции за ресурсы во время процесса выполнения.Если нет внешнего вмешательства, они не смогут продвигаться вперед.Если системных ресурсов достаточно, процесс Запросы на ресурсы могут быть удовлетворены, вероятность тупиковой ситуации очень мала, иначе она попадет в тупиковую ситуацию из-за конкуренции за ограниченные ресурсы.
graph TD threadA(线程A) threadB(线程B) lockA((锁A)) lockB((锁B)) threadA--持有-->lockA threadB--试图获取-->lockA threadB--持有-->lockB threadA--试图获取-->lockB
-
Основная причина зависания
- Недостаточно системных ресурсов
- Порядок, в котором процессы выполняются и продвигаются, не подходит
- нерациональное использование ресурсов
-
Пример взаимоблокировки
package com.jian8.juc.thread; import java.util.concurrent.TimeUnit; /** * 死锁是指两个或两个以上的进程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力干涉那他们都将无法推进下去, */ public class DeadLockDemo { public static void main(String[] args) { String lockA = "lockA"; String lockB = "lockB"; new Thread(new HoldThread(lockA,lockB),"Thread-AAA").start(); new Thread(new HoldThread(lockB,lockA),"Thread-BBB").start(); } } class HoldThread implements Runnable { private String lockA; private String lockB; public HoldThread(String lockA, String lockB) { this.lockA = lockA; this.lockB = lockB; } @Override public void run() { synchronized (lockA) { System.out.println(Thread.currentThread().getName() + "\t自己持有:" + lockA + "\t尝试获得:" + lockB); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (lockB) { System.out.println(Thread.currentThread().getName() + "\t自己持有:" + lockB + "\t尝试获得:" + lockA); } } } }
-
решать
- использовать
jps -l
Найдите номер процесса -
jstack 进程号
найти вид тупика - JDK поставляется с
jconsole
инструмент
- использовать