Многопоточность Java и вопросы для интервью с высокой степенью параллелизма

Java

volatile

Понимание изменчивости

volatile — это легкий механизм синхронизации.

  1. Гарантированная видимость данных
  2. Атомарность не гарантируется
  3. Отключить переупорядочивание инструкций

JMM

JMM (модель памяти Java) — это абстрактная концепция, описывающая набор правил или спецификаций, определяющих, как осуществляется доступ к различным переменным в программе.

Сущностью работающей программы JVM является поток, Когда каждый поток создается, JVM создает для него рабочую память, которая является приватной областью данных потока. JMM предусматривает, что все переменные хранятся в основной памяти, основная память является разделяемой памятью. Работа переменной потоком осуществляется в рабочей памяти, сначала переменная копируется из основной памяти в рабочую память, после завершения операции она будет записана в основную память. Разные потоки не могут получить доступ к рабочей памяти друг друга, а взаимодействие потоков (передача значений) осуществляется через основную память.

Правила JMM для синхронизации:

  1. Прежде чем поток будет разблокирован, значение общей переменной должно быть сброшено обратно в основную память.
  2. Прежде чем поток заблокируется, он должен прочитать последнее значение основной памяти в свою рабочую память.
  3. Блокировка и разблокировка — это один и тот же замок.

Три характеристики JMM

  1. видимость
  2. атомарность
  3. последовательный

Атомарность неделима: когда поток выполняет какое-то конкретное дело, его нельзя разделить посередине, либо все преуспевают, либо все терпят неудачу.

Переупорядочивание: Когда компьютер выполняет программу, для повышения производительности компилятор и процессор часто переупорядочивают инструкции.Исходный код оптимизируется и переупорядочивается компилятором, инструкции переупорядочиваются параллельно, а система памяти переупорядочивается. чтобы получить окончательную выполненную инструкцию.

В одном потоке конечный результат выполнения программы гарантированно согласуется с результатом выполнения порядка выполнения кода.

Темы выполняются попеременно в мульти-резьбе. Из-за переупорядочения его нельзя определить, могут ли переменные, используемые в двух потоках, могут гарантировать консистенцию, и результат не может быть определен.

Процессор должен учитывать зависимости данных при обработке переупорядочения.

Реализация volatile запрещает переупорядочивание инструкций и позволяет избежать выполнения программ не по порядку в многопоточной среде. Он выполняется с помощью инструкции барьера памяти.При вставке барьера памяти оптимизация переупорядочения инструкций за барьером памяти запрещается, а кэшированные данные принудительно очищаются, чтобы гарантировать, что поток может прочитать последнюю версию данных. .

Пример 1: volatile гарантирует видимость

class MyData {
    //volatile int number = 0;//case2
    //int number=0; //case1
    public void change() {
        number = 60;
    }
}

public class VolatileDemo {
    public static void main(String[] args) {
        MyData data=new MyData();

        new Thread(()->{
            System.out.println(Thread.currentThread().getName()+"\t come in");
            try{ TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) {e.printStackTrace();}
            data.change();
            System.out.println(Thread.currentThread().getName()+"\t updated number value:"+data.number);
        },"A").start();

        while(data.number==0){}
        System.out.println(Thread.currentThread().getName()+"\t over, get number:"+data.number);

    }
}

Когда мы используем case1, то есть когда число не изменяется с помощью volatile, текущий результат:

A	 come in
A	 updated number value:60

И программа не заканчивается в выполнении, указывая на то, что в основном потоке, поскольку видимость не может быть гарантирована, она находилась в бесконечном цикле.

При выполнении case2:

A	 come in
A	 updated number value:60
main	 over, get number:60

Видимость гарантирована, так что главное заканчивается успешно.

Пример 2: volatile не гарантирует атомарность

class MyData {
    volatile int number = 0;

    public void change() {
        number = 60;
    }

    public void addOne() {
        number++;
    }
}

public class VolatileDemo {
    public static void main(String[] args) {
        case2();
    }

    //验证原子性
    public static void case2() {
        MyData myData = new MyData();

        for (int i = 0; i < 20; i++) {
            new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    myData.addOne();
                }
            }, String.valueOf(i)).start();
        }

        while(Thread.activeCount()>2){
            Thread.yield();
        }
        System.out.println(Thread.currentThread().getName()+"\t number value:"+myData.number);
    }
}

Можно обнаружить, что окончательный результат вывода не равен 20000, а множественные результаты вывода не согласованы, поэтому это показывает, что volatile не может гарантировать атомарность.

Как гарантировать атомарность

  1. Блокировка: используйте синхронизацию для блокировки
  2. Использование AtomicInteger

Пример 3: volatile и шаблон singleton

Одноэлементный шаблон шаблона DCL

public class Singleton {

    private static Singleton instance=null;
    private Singleton(){
        System.out.println(Thread.currentThread().getName()+" constructor");
    }

    //DCL 双端检锁机制
    public static Singleton getInstance(){
        if(instance==null){
            synchronized (Singleton.class){
                if(instance==null)
                    instance=new Singleton();
            }
        }
        return instance;
    }
}

Механизм DCL не может полностью гарантировать безопасность потоков из-за переупорядочения инструкций.

Причина в том, что instance = new Singleton(); можно разделить на три шага:

1. memory=allocate();//分配内存空间
2. instance(memory);//初始化对象
3. instance=memory;//设置instance指向分配的内存地址,分配成功后,instance!=null

Поскольку между шагами 2 и 3 нет зависимости данных, а результат выполнения не меняется в одном потоке вне зависимости от того, выполняется переупорядочение или нет, переупорядочение этих двух шагов разрешено. Другими словами, переупорядочивание инструкций обеспечивает только согласованность однопоточной последовательной семантики (как если бы-последовательная), но не заботится о семантической согласованности между несколькими потоками.

Следовательно, после переупорядочения выполнение 3 первых результатов дает instance!=null, но объект не был инициализирован. В этот момент другой поток получает неинициализированный объект при его вызове.

Поэтому при объявлении экземпляра украсьте его volatile, чтобы запретить переупорядочивание инструкций.

private static volatile Singleton instance = null;

CAS

Весь процесс CAS представляет собой CompareAndSwap, который представляет собой примитив параллелизма ЦП. Его функция состоит в том, чтобы определить, является ли значение определенного места в памяти ожидаемым значением, и если это так, обновить его до нового значения.атомный.

Роль CAS заключается в том, чтобы сравнить значение в текущей рабочей памяти со значением в основной памяти и, если они совпадают, выполнить операцию, в противном случае продолжить сравнение до тех пор, пока значения в основной и рабочей памяти не сравняются. последовательный. Значение основной памяти — V, ожидаемое значение в рабочей памяти — A, значение обновления, которое нужно изменить, — B, и если и только если A и V совпадают, измените V на B, в противном случае ничего не делайте.

Основной принцип CAS:

В атомарном классе операции CAS выполняются через класс Unsafe.

//AtomicInteger i++
public final int getAndIncrement(){
    return unsafe.getAndAddInt(this,valueoffset,1);
}

где это текущий объект, а значение смещения представляет собой длинное значение, представляющее смещение адреса.

//AtomicInteger.java
private static final Unsafe unsfae=Unsafe.getUnsafe();//unsafe对象
private static final long valueOffset;//地址偏移量

static{
    try{
        valueoffset=unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value");
    }catch(Excepthion ex){throw new Error(ex);}
}

private volatile int value;//存储的数值
  • Unsafe

Класс Unsafe — это класс в пакете sun.misc в rt.jar, на основе которого можно напрямую манипулировать данными конкретной памяти.

Методы Java не могут получить прямой доступ к базовой системе и должны использовать собственные методы для доступа к внутренним методам класса Unsafe.обанативные методы, которые могут напрямую манипулировать памятью, например указатели в C, и выполнение операций CAS в Java — все зависит от методов класса Unsafe.

  • valueOffset

Эта переменная представляет адрес смещения значения переменной в памяти, и Unsafe получает данные в соответствии с адресом смещения памяти.

Небезопасный класс

Параллелизм CAS проистекает из различных методов класса Unsafe, воплощенных в Java. Вызывая метод CAS в этом классе, JVM поможет нам реализовать инструкции сборки CAS, которая является функцией, полностью зависящей от аппаратного обеспечения.

Примитив — это процесс, состоящий из нескольких инструкций и используемый для выполнения функции. Выполнение примитивов должно быть непрерывным, и процесс выполнения не должен прерываться. Таким образом, CAS — это атомарная инструкция ЦП, которая не вызовет несогласованности данных.

Ниже представлена ​​функция класса Unsafe, вызываемая AtomicInteger для реализации функции i++.

//unsafe.getAndAddInt
public final int getAndAddInt(Object var1,long var2,int var4){
    int var5;
    do{
        //获取当前的值的地址
        var5=this.getIntVolatile(var1,var2);
        //var1代表对象,var2和var5分别代表当前对象的真实值和期望值,如果二者相等,更新为var5+var4
    }while(!this.compareAndSwapInt(var1,var2,var5,var5+var4);
    return var5;
}

В функции getAndAddInt var1 представляет объект AtomicInteger, var2 представляет адрес объекта в памяти, а var4 представляет желаемое добавляемое значение.

Сначала получите ток через var1 и var2в основной памятиРеальное значение int, которое равно var5.

Затем происходит изменение данных по циклу, при сравнении реального значения с текущим значением объекта оно обновляется и происходит выход из цикла, в противном случае снова получается текущее реальное значение, и попытка продолжается до тех пор, пока не будет удается.

Согласованность гарантируется за счет вращения вместо блокировки в CAS, а параллелизм улучшается по сравнению с блокировкой.

В частности: поток A и поток B одновременно выполняют операцию автоинкремента AtomicInteger:

  1. Исходное значение value в AtomicInteger равно 3. Значение равно 3 в основной памяти, и в рабочей памяти потока A и потока B есть копии значения 3;
  2. Поток A получает значение 3 с помощью getIntVolatile() и приостанавливается.
  3. Поток B также получает значение 3, а затем выполняет метод compareAndSwapInt.Фактическое значение памяти сравнивается с 3, поэтому значение памяти успешно изменяется на 4.
  4. В этот момент поток А продолжает выполнять сравнение и обнаруживает, что значение 3 в объекте несовместимо со значением 4 в основной памяти, указывая на то, что оно было изменено, и повторно входит в цикл.
  5. Поток A повторно получает значение.Поскольку значение изменяется с помощью volatile, значение потока A в это время равно 4, что равно значению в основной памяти, и модификация прошла успешно.

Недостатки CAS

  1. Если CAS выйдет из строя, он продолжит попытки. Если CAS не работает в течение длительного времени, это принесет много накладных расходов на ЦП.
  2. CAS может использоваться только для гарантиине замужемАтомарные операции с общими переменными.Для операций с несколькими общими переменными CAS не может гарантировать этого и требует использования блокировок.
  3. Есть проблема с АБА.

АВА-проблема

Важной предпосылкой реализации CAS является извлечение данных в определенный момент из памяти, их сравнение и замена в текущий момент.Эта разница во времени приведет к изменению данных.

Поток 1 берет A из ячейки памяти V, поток 2 также берет A из V, а затем поток 2 изменяет A на B с помощью некоторых операций, а затем изменяет данные в ячейке V на A. В это время поток 1 выполняет операцию CAS. чтобы обнаружить, что в V все еще есть A, и операция прошла успешно. Хотя операция CAS потока 1 выполнена успешно, это не означает, что в этом процессе нет проблем.

Эта проблема аналогична проблеме фантомного чтения и решается механизмом добавления номера версии. Это можно решить с помощью AtomicStampedReference.

AtomicStampedReference

Решите эту проблему с помощью AtomicStampedReference.

public class SolveABADemo {
    static AtomicStampedReference<Integer> atomicStampedReference=new AtomicStampedReference<>(100,1);

    new Thread(()->{
        int stamp=atomicStampedReference.getStamp();
            System.out.println(Thread.currentThread().getName()+"\t 版本号:"+stamp);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        atomicStampedReference.compareAndSet(100,101,atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1);
        System.out.println(Thread.currentThread().getName()+"\t 版本号:"+atomicStampedReference.getStamp());
        atomicStampedReference.compareAndSet(101,100,atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1);
        System.out.println(Thread.currentThread().getName()+"\t 版本号:"+atomicStampedReference.getStamp());
        },"t1").start();

    new Thread(()->{
        int stamp=atomicStampedReference.getStamp();
        System.out.println(Thread.currentThread().getName()+"\t 版本号:"+stamp);
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        boolean ret=atomicStampedReference.compareAndSet(100,2019,stamp,stamp+1);
        System.out.println(Thread.currentThread().getName()+"\t"+ret
            +" stamp:"+atomicStampedReference.getStamp()
            +" value:"+atomicStampedReference.getReference());
        },"t2").start();
    }
}
t1	 版本号:1
t2	 版本号:1
t1	 版本号:2
t1	 版本号:3
t2	false stamp:3 value:100

Потокобезопасность классов коллекций

ConcurrentModificationException

Это исключение также является исключением одновременного изменения, java.util.ConcurrentModificationException.

Причина этого исключения заключается в том, что сам класс коллекции небезопасен для потоков.

решение:

  1. Синхронизируйте контейнеры с помощью Vector, Hashtable и т. д.
  2. Используйте Collections.synchronizedxxx(new XX) для создания потокобезопасных контейнеров.
  3. Используйте параллельные контейнеры в пакете j.u.c, такие как CopyOnWriteList, CopyOnWriteArraySet, ConcurrentHashMap и т. д.

CopyOnWriteArrayList

дно используетсяprivate transient volatile Object[] array;

CopyOnWriteArrayList принимает идею разделения копирования при записи и чтения-записи.

public boolean add(E e){
    final ReentrantLock lock=this.lock;
    try{
        //旧数组
        Object[] elements = getArray();
        int len = elements.length;
        //复制新数组
        Object[] newElements = Arrays.copyOf(elements, len+1);
        //修改新数组
        newElements[len] = e;
        //更改旧数组引用指向新数组
        setArray(newElements);
        return true;
    }finally{
        lock.unlock();
    }
}

При добавлении элементов они не добавляются напрямую в текущий массив контейнеров, а копируются в новый массив контейнеров, добавляя элементы в новый массив и указывая исходную ссылку контейнера на новый контейнер после добавления.

сделай этовыгодаКонтейнер можно одновременно читать без блокировки, потому что контейнер не будет добавлять никаких элементов при чтении.

Сам CopyOnWriteArraySet реализован с помощью CopyOnWriteArrayList.

Блокировка Java

Честные и нечестные замки

ReentrantLock может указать логический тип конструктора, чтобы получить справедливую или несправедливую блокировку.По умолчанию используется несправедливая блокировка, а synchronized также является несправедливой блокировкой.

Справедливые блокировки — это FIFO, в которых несколько потоков получают блокировки в том порядке, в котором они применяются для блокировок. В параллельной среде каждый поток сначала проверяет очередь ожидания, поддерживаемую блокировкой, при получении блокировки, если она пуста, он будет соратником, иначе он присоединится к очереди.

Несправедливая блокировка означает, что несколько потоков находятся не в том порядке, в котором они применяются для блокировок, и возможно, что поток, который применяется позже, получит блокировку раньше потока, который применяется первым. Высокий параллелизм может привести к инверсии приоритета или голоданию. В параллельной среде попытайтесь занять блокировку, попытайтесь безуспешно, а затем присоединитесь к очереди ожидания.

Реентерабельные блокировки (рекурсивные блокировки)

Сбрасываемая блокировка означает, что после того, как внешняя функция того же потока получает блокировку, внутренняя рекурсивная функция автоматически получает блокировку. То есть поток может войтилюбой блок кода, который синхронизируется замком, которым он уже владеет.

Как ReentrantLock, так и synchronized являются повторными блокировками.

Самая большая роль реентерабельных блокировок заключается в том, чтобы избежать взаимоблокировок.

блокировка спина

Спин-блокировка означает, что поток, пытающийся получить блокировку, не блокируется немедленно, а попытается получить блокировку в цикле. Преимущество заключается в уменьшении потребления переключения контекста потока, а недостаток в том, что оно потребляет ресурсы ЦП при зацикливании.

Реализовать спин-блокировку:

public class SpinLockDemo {
//使用AtomicReference<Thread>来更新当前占用的 Thread
    AtomicReference<Thread> threadAtomicReference=new AtomicReference<>();

    public static void main(String[] args) {
        SpinLockDemo demo=new SpinLockDemo();
        new Thread(()->{
            demo.myLock();
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            demo.myUnlock();
        },"t1").start();


        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(()->{
            demo.myLock();
            demo.myUnlock();
        },"t2").start();

    }

    public void myLock(){
        Thread thread=Thread.currentThread();
        System.out.println(Thread.currentThread().getName()+"\t come in");
        
        //如果当前占用的线程为null,则尝试获取更新
        while(!threadAtomicReference.compareAndSet(null,thread)){

        }
    }

    public void myUnlock(){
        Thread thread=Thread.currentThread();
        //释放锁,将占用的线程设置为null
        threadAtomicReference.compareAndSet(thread,null);
        System.out.println(Thread.currentThread().getName()+"\t unlocked");
    }
}

Блокировка чтения-записи

Эксклюзивная блокировка: блокировка может удерживаться только одним потоком за раз, например ReentrantLock и synchronized.

Общая блокировка: эта блокировка может удерживаться несколькими потоками.

В ReentrantReadWriteLock блокировка чтения является общей блокировкой, а блокировка записи — монопольной блокировкой. Совместное использование чтения и чтения обеспечивает параллелизм, а операции чтения и записи являются взаимоисключающими.

Класс инструментов параллелизма

CountDownLatch

Роль CountDownLatch состоит в том, чтобы блокировать некоторые потоки до тех пор, пока другие потоки не завершат серию операций, прежде чем будут разбужены.

CountDownLatch значение в начальной настройке, когдаодин или большеКогда потоки используют метод await(), эти потоки блокируются. Остальные потоки вызывают метод countDown() для уменьшения счетчика на 1. Когда счетчик равен 0, поток, заблокированный вызовом метода await(), будет разбужен и продолжит выполнение.

Можно понять, что когда все уходят, охранник запирает дверь.

CyclicBarrier

CyclicBarrier относится к барьеру, который можно использовать циклически. Когда группа потоков достигает барьера, они блокируются. Пока последний поток не достигнет барьера, барьер откроет дверь, а поток, перехваченный барьером, продолжит работу. Потоки входят в барьер через метод await().

Можно понять, что встреча может состояться только тогда, когда все будут присутствовать.

Semaphore

Количество семафора используется:

  1. Взаимоисключающее использование нескольких общих ресурсов
  2. Контроль количества одновременных потоков

Можно понять, что несколько автомобилей занимают несколько парковочных мест на стоянке. При въезде на парковочное место вызовите метод Acquire() для получения ресурсов. При выходе вызовите метод release(), чтобы освободить ресурс.

очередь блокировки

Блокирующая очередь — это в первую очередь очередь, и ее роль заключается в следующем:

  • Когда блокирующая очередь пуста, операция получения элементов из очереди будет заблокирована
  • Когда очередь блокировки заполнена, операция добавления элементов в очередь будет заблокирована.

Потоки, пытающиеся получить элементы из пустой очереди блокировки, будут заблокированы до тех пор, пока другие потоки не вставят новые элементы в пустую очередь. Аналогично, потоки, пытающиеся добавить новые элементы в полную очередь блокировки, также будут заблокированы до тех пор, пока другие потоки не удалят элементы из очереди, чтобы очередь снова освободилась и могла быть добавлена ​​последовательно.

блокировать: Блокировка означает, что в некоторых случаях нить будет приостановлена, т. е. заблокирована.После выполнения условия приостановленная нить будет снова автоматически пробуждена.

преимущество: BlockingQueue может помочь нам блокировать и пробуждать потоки, не заботясь о том, когда потоки должны быть заблокированы и когда потоки должны быть разбужены. При этом учитываются эффективность и безопасность потоков.

Блокирующая архитектура очереди

Интерфейс BlokcingQueue реализует интерфейс Queue, который имеет следующие классы реализации:

  • ArrayBlockingQueue: автормножествосостоит изесть мирочередь блокировки
  • LinkedBlockingQueue: авторсвязанный списоксостоит изесть мирОчередь блокировки (размер по умолчанию — Integer.MAX_VALUE)
  • PriorityBlockingQueue: поддержкаприоритетотсортированныйНеограниченныйочередь блокировки
  • DelayQueue: использоватьприоритетная очередьосуществленныйНеограниченная задержкаочередь блокировки
  • Синхронная очередь:элемент не сохраняетсяБлокирующая очередь, одноэлементная очередь, синхронная очередь фиксации
  • Связанная очередь передачи:связанный списоксостоит изНеограниченныйочередь блокировки
  • LinkedBlockingDeque:связанный списоксостоит издвустороннийочередь блокировки

Как заблокировать очередь

тип метода Выбросить исключение особая ценность блокировать тайм-аут
вставлять add(e) offer(e) put(e) offer(e,time,unit)
Удалить remove() poll() take() poll(time,unit)
экзамен element() peek() никто никто
  • Выдает исключение: когда очередь заполнена,add(e)вызовет исключениеIllegalStateException: Queue full; когда очередь пуста,remove()иelement()вызовет исключениеNoSuchElementException
  • Особое значение:offer(e)вернет true/false.peek()Вернет элемент очереди или ноль.
  • блокировка: очередь заполнена,put(e)будет блокироваться до успешного завершения или прерывания; очередь пустаtake()будет блокироваться до успешного завершения.
  • тайм-аут: блокировка до выхода после тайм-аута, возвращаемое значение такое же, как и в случае со специальным значением.

Модель «производитель-потребитель»

Способ 1. Использование блокировки

class ShareData {
    private int number = 0;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void increment() throws Exception {
        lock.lock();
        try {
            //判断
            while (number != 0) {
                condition.await();
            }
            //干活
            number++;
            System.out.println(Thread.currentThread().getName() + " produce\t" + number);
            //通知唤醒
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void decrement()throws Exception{
        lock.lock();
        try {
            //判断
            while (number == 0) {
                condition.await();
            }
            //干活
            number--;
            System.out.println(Thread.currentThread().getName() + " consume\t" + number);
            //通知唤醒
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

/**
 * 一个初始值为0的变量,两个线程交替操作,一个加1一个减1,重复5次
 * 1. 线程 操作 资源类
 * 2. 判断 干活 通知
 * 3. 防止虚假唤醒机制:判断的时候要用while而不是用if
 */
public class ProduceConsumeTraditionalDemo {
    public static void main(String[] args) {
        ShareData data=new ShareData();

        new Thread(()->{
            for (int i = 0; i < 5 ; i++) {
                try {
                    data.increment();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        },"A").start();

        new Thread(()->{
            for (int i = 0; i < 5 ; i++) {
                try {
                    data.decrement();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        },"B").start();
    }
}

распечатать результат

A produce	1
B consume	0
A produce	1
B consume	0
A produce	1
B consume	0
A produce	1
B consume	0
A produce	1
B consume	0

Способ 2. Используйте очередь блокировки

public class ProduceConsumeBlockingQueueDemo {
    public static void main(String[] args) {
        SharedData data=new SharedData(new ArrayBlockingQueue<>(10));
        new Thread(()-> {
            System.out.println(Thread.currentThread().getName() + "\t生产线程启动");
            try {
                data.produce();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"Producer").start();
        new Thread(()-> {
            System.out.println(Thread.currentThread().getName() + "\t消费线程启动");
            try {
                data.consume();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"Consumer").start();

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        data.stop();
        System.out.println("停止");
    }
}

class SharedData{
    private volatile boolean FLAG=true;
    private AtomicInteger atomicInteger=new AtomicInteger();

    BlockingQueue<String> blockingQueue=null;

    public SharedData(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
        System.out.println(blockingQueue.getClass().getName());
    }

    public void produce() throws InterruptedException {
        String data=null;
        boolean ret;
        while(FLAG){
            data=""+atomicInteger.incrementAndGet();
            ret=blockingQueue.offer(data,2L,TimeUnit.SECONDS);
            if(ret){
                System.out.println(Thread.currentThread().getName()+"\t插入"+data+"成功");
            }else{
                System.out.println(Thread.currentThread().getName()+"\t插入"+data+"失败");
            }
            TimeUnit.SECONDS.sleep(1);
        }
        System.out.println("生产结束,FLAG=false");
    }

    public void consume() throws InterruptedException {
        String ret=null;
        while(FLAG){
            ret=blockingQueue.poll(2L,TimeUnit.SECONDS);
            if(null==ret||ret.equalsIgnoreCase("")){
                System.out.println(FLAG=false);
                System.out.println(Thread.currentThread().getName()+"\t消费等待超时退出");
                return;
            }
            System.out.println(Thread.currentThread().getName() + "\t消费" + ret + "成功");
        }
    }

    public void stop(){
        FLAG=false;
    }


}

Способ использования блокирующей очереди + атомарный класс + volatile переменная. Результат печати следующий:

java.util.concurrent.ArrayBlockingQueue
Producer	生产线程启动
Consumer	消费线程启动
Producer	插入1成功
Consumer	消费1成功
Producer	插入2成功
Consumer	消费2成功
Producer	插入3成功
Consumer	消费3成功
停止
生产结束,FLAG=false
false
Consumer	消费等待超时退出

Разница между синхронизированным и заблокированным

  1. оригинальная композиция
    • Synchronized — это ключевое слово, относящееся к уровню JVM.Нижний слой дополняется monitorenter и monitorexit, что зависит от объекта монитора. Поскольку методы ожидания/уведомления также зависят от объекта монитора, эти методы можно вызывать только из синхронизированного блока или метода.
    • Блокировка находится в пакете java.util.concurrent.locks.lock и представляет собой блокировку на уровне API.
  2. инструкции
    • Синхронизация не требует, чтобы пользователь вручную снял блокировку. После завершения кода система автоматически позволяет потоку снять блокировку.
    • ReentrantLock требует, чтобы пользователь снял блокировку вручную. Невыполнение этого требования может привести к тупиковой ситуации.
  3. Можно ли прервать ожидание
    • Синхронизация не может быть прервана, если не выброшено исключение или не завершена нормальная работа.
    • ReentrantLock может быть прерван. один черезtryLock(long timeout, TimeUnit unit), другойlockInterruptibly()Поместите его в блок кода, позвонитеinterrupt()способ прервать.
  4. Справедливо ли запирать
    • синхронизированный - это несправедливая блокировка
    • ReentrantLock по умолчанию является несправедливой блокировкой. Вы можете передать логическое значение в конструкторе. True представляет справедливую блокировку, а false — несправедливую.
  5. Блокировки связаны с несколькими условиями
    • Synchronized имеет только одну очередь блокировки и может пробуждать только один поток или все потоки случайным образом.
    • ReentrantLock используется для реализации группового пробуждения, которое может просыпаться точно.

Кейс: трехниточная петлевая печать

class ShareData{
    private int number=1;
    private Lock lock=new ReentrantLock();

    public void printA(){
        lock.lock();
        Condition conditionA=lock.newCondition();
        try{
            while(number!=1){
                conditionA.await();
            }
            for (int i = 0; i < 5; i++) {
                System.out.println(Thread.currentThread().getName()+"\t"+i);
            }
            number=2;
            conditionA.signal();
        }catch (Exception e){
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void printB(){
        lock.lock();
        Condition conditionB=lock.newCondition();
        try{
            while(number!=2){
                conditionB.await();
            }
            for (int i = 0; i < 10; i++) {
                System.out.println(Thread.currentThread().getName()+"\t"+i);
            }
            number=3;
            conditionB.signal();
        }catch (Exception e){
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void printC(){
        lock.lock();
        Condition conditionC=lock.newCondition();
        try{
            //判断
            while(number!=3){
                conditionC.await();
            }
            //干活
            for (int i = 0; i < 15; i++) {
                System.out.println(Thread.currentThread().getName()+"\t"+i);
            }
            number=1;
            //通知
            conditionC.signal();
        }catch (Exception e){
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        ShareData data=new ShareData();
        new Thread(() -> data.printA(),"A").start();
        new Thread(() -> data.printB(),"B").start();
        new Thread(() -> data.printC(),"C").start();
    }
}

Пул потоков

создать тему

  1. Реализовать интерфейс Runnable
  2. Реализовать вызываемый интерфейс
  3. Наследовать от класса Thread
  4. Использовать пул потоков

Невозможно передать Callable в конструкторе Thread, но вы можете передать его в интерфейсе Runnable:Thread thread=new Thread(Runnable runnable, String name);. Чтобы использовать интерфейс Callable, нам нужно использовать класс FutureTask. Класс FutureTask реализует интерфейс RunnableFuture, а RunnableFutre является подинтерфейсом Future, поэтому FutureTask может использовать указанный выше конструктор Thread в качестве параметра. При этом сам конструктор FutureTask может передаваться в Callable.

class MyThread implements Callable<Integer>{
    @Override
    public Integer call() {
        System.out.println("come in callable");
        return 2019;
    }
}
class Main{
    public static void main(String [] args){
        FutureTask<Integer> futureTask = new FutureTask<>(new MyThread2());
        Thread t1=new Thread(futureTask,"A");
    }
}

Архитектура пула потоков

В дополнение к этому существует служебный класс Executors.

ThreadPoolExecutor

Пул потоков имеет семь параметров:

public ThreadPoolExecutor(
    int corePoolSize,//线程池常驻核心线程数
    int maximumPoolSize,//线程池能容纳同时执行最大线程数
    long keepAliveTime,//多余的空闲线程的存活时间,当前线程池线程数量超过core,空闲时间达到keepAliveTime,多余空闲线程会被销毁直到只剩下core个
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,//被提交尚未被执行的任务队列
    ThreadFactory threadFactory,//创建线程的线程工厂
    RejectedExecutionHandler handler//拒绝策略
    ) 
{...}

Процесс обработки выглядит следующим образом:

  1. Создайте пул потоков и дождитесь отправленных запросов задач.
  2. Добавить задачу запроса
    • Если количество запущенных потоков меньше, чем corePoolSize, создайте поток для запуска задачи.
    • Если количество запущенных потоков больше или равно Corepoolsize, поставить задачу в очередь
    • Когда очередь заполнена и количество запущенных потоков меньше максимального размера пула, создайте неосновной поток для выполнения задачи.
    • Когда очередь заполнена и количество запущенных потоков больше или равно maxPoolSize, пул потоков начнет выполнение политики отклонения насыщения.
  3. Когда поток завершит задачу, он выполнит следующую задачу из очереди.
  4. Когда потоку больше нечего делать, кроме как keepAliveTime:
    • Если количество запущенных в данный момент потоков больше, чем corePoolSize, поток останавливается.
    • После того, как все задачи пула потоков будут выполнены, он со временем уменьшится до размера corePoolSize.

политика отказа

В JDK есть четыре встроенных стратегии отклонения, каждая из которых реализует интерфейс RejectedExecutionHandler.

  • AbortPolicy: создает исключение RejectedExecutionException напрямую, что является политикой отклонения по умолчанию.
  • DiscardPolicy: отменить задачу напрямую, не обрабатывая ее и не создавая исключение. Если задача может быть потеряна, это лучшая стратегия обработки.
  • DiscardOldestPolicy: отклонить задачу с самым большим временем ожидания в очереди, а затем добавить текущую задачу в очередь и попытаться отправить ее еще раз.
  • CallerRunsPolicy: Выполняется вызывающий объект. Эта стратегия не отбрасывает задачи и не генерирует исключения, а откатывает некоторые задачи вызывающей стороне.

Три общих пула потоков

  1. Executors.newFixedThreadPool(int)

Создайте пул потоков с фиксированной емкостью, управляйте максимальным параллелизмом и превышайте количество потоков, ожидающих в очереди.

return new ThreadPoolExecutor(nThreads, nThreads, 
    0L, TimeUnit.MILLISECONDS, 
    new LinkedBlockingQueue<Runnable>());

где значения corePoolSize и maxPoolSize равны и используется LinkedBlockingQueue.

Он подходит для выполнения долгосрочных задач и имеет относительно высокую производительность.

  1. Executors.newSingleThreadExecutor()

Создается однопоточный пул потоков, и только один рабочий поток используется для выполнения задач, гарантируя, что все задачи выполняются по порядку.

return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>()));

Для corePoolSize и maxPoolSize установлено значение 1, также используется LinkedBlockingQueue.

Он подходит для сценариев, в которых одна задача выполняется по одной задаче за раз.

  1. Executors.newCachedThreadPool()

Создается кешируемый пул потоков. Если длина пула потоков превышает требования к обработке, простаивающие потоки могут быть гибко перезапущены. Если нет перерабатываемых потоков, будут созданы новые потоки.

return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
        60L, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>());

Установите для corePoolSize значение 0, для maxPoolSize значение Integer.MAX_VALUE и используйте SynchronousQueue. Когда приходит задача, создается поток для выполнения, и поток уничтожается после бездействия более 60 секунд.

Он подходит для выполнения множества краткосрочных асинхронных небольших программ или серверов с относительно небольшой нагрузкой.

Какой пул потоков используется в работе

В Руководстве по разработке Java для Alibaba есть следующие положения:

  1. Ресурсы потоков должны предоставляться через пул потоков, и явное создание потоков в приложении запрещено.
    • инструкция: Преимущество использования пула потоков заключается в сокращении затрат времени на создание и уничтожение потоков и накладных расходов системных ресурсов, а также решении проблемы нехватки ресурсов. Если пул потоков не используется, это может привести к тому, что система создаст большое количество потоков одного типа, что может привести к потреблению памяти или чрезмерному переключению.
  2. Пулу потоков не разрешено использовать Executors для создания, то есть нельзя использовать три вышеуказанных пула потоков, но необходимо использовать метод ThreadPoolExecutor.Этот метод обработки позволяет пишущим студентам лучше понять правила выполнения потока. бассейн и позволяет избежать риска ресурсов Hanjin. .
    • Как FixedThreadPool, так и SingleThreadPool используют LinkedBlockingQueue, а допустимая длина очереди — Integer.MAX_VALUE, что может привести к скоплению большого количества запросов и вызвать неработоспособность.
    • CachedThreadPool и ScheduledThreadPool позволяют создать число потоков, равное Integer.MAX_VALUE, что может создать большое количество потоков, что приведет к неработоспособности.

Как установить количество потоков в пуле потоков

Runtime.getRuntime().availableProcessors()Получить количество процессоров текущего устройства.

  1. Задачи с интенсивным использованием процессора
    • Интенсивная нагрузка на ЦП означает, что задача требует большого количества операций без блокировки, а ЦП постоянно работает на полной скорости.
    • Задачи, интенсивно использующие ЦП, могут быть ускорены (за счет многопоточности) только на настоящем многоядерном ЦП, в то время как на одноядерном ЦП, независимо от того, сколько симулированных многопотоков включено, оно не будет ускорено.
    • Конфигурация задачи с интенсивным использованием ЦП как небольшой номер потока, обычно устанавливаетсяCPU 核心数 + 1
  2. Интенсивный ввод-вывод
    • Интенсивный ввод-вывод, что означает, что задача требует много ввода-вывода, много блокировки
    • Выполнение задач с интенсивным вводом-выводом в одном потоке может привести к трате большого количества ресурсов ЦП на ожидание.
    • Использование многопоточности для задач с интенсивным вводом-выводом может значительно ускорить выполнение программы и извлечь выгоду из потерянного времени блокировки.
    • При интенсивном вводе-выводе большинство потоков блокируется, и необходимо настроить больше потоков.CPU核心数 * 2или используйтеCPU 核心数 / (1 - 阻塞系数), коэффициент блокировки составляет от 0,8 до 0,9

тупик

Причина взаимоблокировки

Тупик относится к явлению, когда два или более процессов ожидают друг друга из-за конкуренции за ресурсы во время процесса выполнения.

Тупик требует четырех маньчжурских условий:

  1. взаимоисключающий
  2. циклическое ожидание
  3. Не вытесняемый
  4. обладать и ждать

Основными причинами взаимоблокировок являются:

  1. Недостаточно системных ресурсов
  2. Неправильный порядок, в котором процессы выполняются и развиваются
  3. нерациональное использование ресурсов

экземпляр взаимоблокировки

class HoldLockThread implements Runnable{
    private String lock1;
    private String lock2;

    public HoldLockThread(String lock1, String lock2) {
        this.lock1 = lock1;
        this.lock2 = lock2;
    }

    @Override
    public void run() {
        synchronized (lock1){
            System.out.println(Thread.currentThread().getName()+"\t持有"+lock1+"\t尝试获取"+lock2);
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            synchronized (lock2){
                System.out.println(Thread.currentThread().getName()+"\t持有"+lock1+"\t尝试获取"+lock2);
            }
        }
    }
}

public class DeadLockDemo {
    public static void main(String[] args) {
        String lockA="lockA";
        String lockB="lockB";

        new Thread(new HoldLockThread(lockA,lockB),"Thread1").start();
        new Thread(new HoldLockThread(lockB,lockA),"Thread2").start();
    }
}

Выводятся следующие результаты, и программа не завершается.

Thread2	持有lockB	尝试获取lockA
Thread1	持有lockA	尝试获取lockB

Битовый анализ взаимоблокировки

Используйте jps, аналогично команде ps в Linux.

В приведенном выше java-файле используйте open In Terminal в IDEA или используйте инструмент командной строки cmd в каталоге файлов.

первое использованиеjps -lкоманда, что-то вродеls -lкоманда выводит текущий запущенный поток Java, из которого вы можете узнать номер потока потока DeadLockDemo.

Затем используйтеjstack threadIdдля просмотра информации о стеке. Результат выглядит следующим образом:

Java stack information for the threads listed above:
===================================================
"Thread2":
        at interview.jvm.deadlock.HoldLockThread.run(DeadLockDemo.java:22)
        - waiting to lock <0x00000000d6240328> (a java.lang.String)
        - locked <0x00000000d6240360> (a java.lang.String)
        at java.lang.Thread.run(Thread.java:748)
"Thread1":
        at interview.jvm.deadlock.HoldLockThread.run(DeadLockDemo.java:22)
        - waiting to lock <0x00000000d6240360> (a java.lang.String)
        - locked <0x00000000d6240328> (a java.lang.String)
        at java.lang.Thread.run(Thread.java:748)

Found 1 deadlock.