Основы параллелизма и многопоточности

Java

Личный технический блог:www.zhenganwen.top

Создать и запустить поток

Любой, кто знаком с Java, может легко написать следующий код:

public static class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("MyThread is running...");
    }
}

public static void main(String[] args) {
    Thread t = new MyThread();
    t.start();
}

жизненный цикл нити

Это основной вопрос, который часто задают в интервью.Вы обязательно должны ответить, что у потоков есть только пять состояний, а именно: новое состояние, состояние готовности, состояние выполнения, состояние блокировки и состояние завершения.

image

состояние готовности и состояние выполнения

Из-за алгоритма распределения временных интервалов планировщика неизвестно, как долго будет выполняться каждый выполняющийся поток, поэтому поток может переключаться между работающим и выполняющимся потоком.Поток в состоянии блокировки должен сначала войти в состояние готовности, прежде чем он сможет войти в состояние выполнения..

Состояние выполнения и состояние блокировки

Текущий поток активно вызываетThread.sleep(),obj.wait(),thread.join()войдетTIMED-WAITINGилиWAITINGсостояние и активно отказываться от права на выполнение ЦП. еслиTIMED-WAITING, затем через определенный период времени он активно вернется и войдет в состояние Runnable, чтобы дождаться выделения временных интервалов.

thread.join()Нижний уровень заключается в том, что текущий поток постоянно опрашиваетthreadВыжить или нет, если жив, то постоянноwait(0).

Если выполняющийся поток сталкивается с критическим разделом во время выполнения (synchronizedдекорированный метод или кодовый блок) и блокировка, которую необходимо получить, занята другими потоками, то он будет активно приостанавливать себя и входитьBLOCKEDгосударство.

Заблокированное состояние и состояние готовности

Если поток, удерживающий блокировку, выйдет из критической секции, потоки, ожидающие блокировки, будут пробуждены и перейдут в состояние готовности, но только поток, захвативший блокировку, перейдет в состояние выполнения, а другие потоки, не захватившие блокировку все равно войдет в состояние блокировки.

Если поток вызываетobjизnotify/notifyAllметод, то когда поток выходит из критической секции (вызовwait/notifyдолжен сначала пройтиsynchronizedполучает блокировку объекта), пробуждение ожидаетobj.waitПоток выше перейдет из заблокированного состояния в состояние готовностиobjизmonitor, и хватайте толькоmonitorПоток начнется сobj.waitreturn, и поток, который его не захватил, все равно будет блокироватьсяobj.waitначальство

состояние окончания

Поток в состоянии выполнения завершил выполнениеrunметод или поток в заблокированном состоянииinterruptвойдет в завершенное состояние, а затем будет уничтожен.

запустить анализ исходного кода

public synchronized void start() {
    if (threadStatus != 0)
        throw new IllegalThreadStateException();
    group.add(this);
    boolean started = false;
    try {
        start0();
        started = true;
    } finally {
        try {
            if (!started) {
                group.threadStartFailed(this);
            }
        } catch (Throwable ignore) {}
    }
}

private native void start0();

startМетод в основном делает три вещи:

  1. Добавить текущий объект потока в группу потоков, к которой он принадлежит (группы потоков будут представлены позже)
  2. перечислитьstart0,Этоnativeв предыдущей статье «Как реализован поток Java? «В статье упоминается, что планирование потоков будет передано LWP, и запуск новых потоков здесь также относится к этой категории. Таким образом, мы можем предположить, что этот вызов JNI (Java Native Interface) создаст новый поток (LWP) и выполнит объект потока.runметод
  3. объект потокаstartedстатус установлен наtrueУказывает, что он был активирован. Как сказал учитель, узнавая о нитях, нитьstartЕго можно вызвать только один раз, а повторные вызовы сообщат об ошибке через эту переменную.

Зачем вводить Runnable

Принцип единой ответственности

мы пройдемThreadЧтобы смоделировать такой сценарий: Банковский многооконный вызов. таким образом, думая, что уже естьThreadПочему мы должны представитьRunnable

В первую очередь нам нужен оконный поток для имитации процесса звонка на номер (окно звонит на номер, а клиент соответствующего номера переходит в соответствующее окно для обработки дела):

public class TicketWindow extends Thread {

    public static final Random RANDOM = new Random(System.currentTimeMillis());
    private static final int MAX = 20;
    private int counter;
    private String windowName;

    public TicketWindow(String windowName) {
        super(windowName);
        counter = 0;
        this.windowName = windowName;
    }

    @Override
    public void run() {
        System.out.println(windowName + " start working...");
        while (counter < MAX){
            System.out.println(windowName + ": It's the turn to number " + counter++);
            //simulate handle the business
            try {
                Thread.sleep(RANDOM.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Затем напишите вызывающий клиент для имитации одновременного вызова четырех окон:

public class WindowThreadClient {
    public static void main(String[] args) {
        Stream.of("Window-1","Window-2","Window-3","Window-4").forEach(
            windowName -> new TicketWindow(windowName).start()
        );
    }
}

Вы обнаружите, что один и тот же номер вызывается четыре раза, а это явно не то, что нам нужно. При нормальных обстоятельствах четыре окна должны совместно использовать одну вызывающую систему. Окно отвечает только за обработку дел, а вызовы должны передаваться вызывающей системе. Это типичный принцип единой ответственности в ООП.

Мы связываем нить и выполняемую задачу, поэтому возникает неловкая ситуация, описанная выше. Работа потока заключается в выполнении задачи, у него есть свое состояние времени выполнения, мы не должны ставить состояние задачи на выполнение (как в этом примереcounter,windowName), чтобы связать потоки вместе, а бизнес-логика должна извлекаться отдельно как логическая исполнительная единица, которую можно передать потоку, когда он должен быть выполнен. Так что естьRunnableинтерфейс:

public interface Runnable {
    public abstract void run();
}

Следовательно, мы можем преобразовать предыдущий многооконный номер вызова:

public class TicketWindowRunnable implements Runnable {

    public static final Random RANDOM = new Random(System.currentTimeMillis());
    private static final int MAX = 20;
    private int counter = 0;

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " start working...");
        while (counter < MAX){
            System.out.println(Thread.currentThread().getName()+ ": It's the turn to number " + counter++);
            //simulate handle the business
            try {
                Thread.sleep(RANDOM.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Тестовый класс:

public class WindowThreadClient {
    public static void main(String[] args) {
        TicketWindowRunnable ticketWindow = new TicketWindowRunnable();
        Stream.of("Window-1", "Window-2", "Window-3", "Window-4").forEach(
                windowName -> new Thread(ticketWindow, windowName).start()
        );
    }
}

Таким образом, вы обнаружите, что повторных вызовов нет. Но эта программа не является потокобезопасной, потому что несколько потоков изменяются одновременно.windowRunnableсерединаcounterпеременные, поскольку в этом разделе в основном рассматриваютсяRunnableроль, поэтому пока не будет обсуждаться.

Паттерны стратегии и функциональное программирование

будетThreadсерединаrunЕще одно преимущество представления через интерфейс заключается в том, что он дружелюбен к шаблону стратегии и функциональному программированию.

Во-первых, давайте кратко представим режим стратегии.Предположим, что нам сейчас нужно рассчитать НДФЛ работника, поэтому мы написали следующий класс инструментов, который можно вызвать, передав базовую зарплату и премию.calculateЧтобы найти налог к ​​уплате:

public class TaxCalculator {

    private double salary;
    private double bonus;

    public TaxCalculator(double base, double bonus) {
        this.salary = base;
        this.bonus = bonus;
    }

    public double calculate() {
        return salary * 0.03 + bonus * 0.1;
    }
}

Что плохого в том, чтобы так писать? Мы выписали расчет налога к уплате:salary * 0.03 + bonus * 0.1, а налоговая ставка непостоянна, и клиентам свойственно предлагать изменения спроса! Должны ли мы вручную изменять эту часть кода каждый раз, когда меняются требования?

В этот раз помогает шаблон стратегии: когда ввод наших требований неизменен, а вывод нужно скорректировать в соответствии с разными стратегиями, мы можем извлечь эту часть логики в интерфейс:

public interface TaxCalculateStrategy {
    public double calculate(double salary, double bonus);
}

Реализация конкретной стратегии:

public class SimpleTaxCalculateStrategy implements TaxCalculateStrategy {
    @Override
    public double calculate(double salary, double bonus) {
        return salary * 0.03 + bonus * 0.1;
    }
}

А бизнес-код просто вызывает интерфейс:

public class TaxCalculator {

    private double salary;
    private double bonus;
    private TaxCalculateStrategy taxCalculateStrategy;

    public TaxCalculator(double base, double bonus, TaxCalculateStrategy taxCalculateStrategy) {
        this.salary = base;
        this.bonus = bonus;
        this.taxCalculateStrategy = taxCalculateStrategy;
    }

    public double calculate() {
        return taxCalculateStrategy.calculate(salary, bonus);
    }
}

будетThreadлогическая исполнительная единица вrunИзвлечь в интерфейсRunnableОн имеет тот же эффект. Поскольку в реальном бизнесе мы не можем предсказать задачи, которые должны быть переданы в поток для выполнения, после извлечения их в интерфейс это дает большую гибкость нашему приложению.

Кроме того, в JDK1.8 были введены функциональное программирование и лямбда-выражения, и шаблон стратегии также очень удобен для этой функции. Тем не менее с помощью приведенного выше примера, если правило расчета становится(salary + bonus) * 1.5, возможно, нам нужно добавить новый класс политики:

public class AnotherTaxCalculatorStrategy implements TaxCalculateStrategy {
    @Override
    public double calculate(double salary, double bonus) {
        return (salary + bonus) * 1.5;
    }
}

После того, как JDK добавит синтаксический сахар для внутренних классов, можно использовать анонимные внутренние классы, чтобы сэкономить накладные расходы на создание новых классов:

public class TaxCalculateTest {
    public static void main(String[] args) {
        TaxCalculator taxCalaculator = new TaxCalculator(5000,1500, new TaxCalculateStrategy(){
            @Override
            public double calculate(double salary, double bonus) {
                return (salary + bonus) * 1.5;
            }
        });
    }
}

Но после добавления функционального программирования в JDK он может быть более кратким и ясным:

public class TaxCalculateTest {
    public static void main(String[] args) {
        TaxCalculator taxCalaculator = new TaxCalculator(5000, 1500, (salary, bonus) -> (salary + bonus) * 1.5);
    }
}

Эта пара имеет только один абстрактный методrunизRunnableТо же самое относится и к интерфейсам.

Конструирование объектов Thread, некоторые вещи, о которых вы могли не знать

ПроверятьThreadметод построения, восходящий кinitСпособ (слегка нарезанный):

Thread parent = currentThread();
if (g == null) {
    if (g == null) {
        g = parent.getThreadGroup();
    }
}

this.group = g;
this.daemon = parent.isDaemon();
this.priority = parent.getPriority();

this.target = target;
setPriority(priority);
this.stackSize = stackSize;

tid = nextThreadID();
  1. gтекущий объектThreadGroup,2~8заключается в установке группы потоков, к которой принадлежит текущий объект, если вnew Threadявно не указан, то по умолчанию будет родительский поток (в настоящее время выполняющийсяnew ThreadThread) группа потоков устанавливается в свою собственную группу потоков.

  2. 9~10Line, наследует от родительского потока два состояния: является ли поток демоном и каков приоритет. Конечно, вnew Threadзатем может пройтиthread.setDeamonилиthread.setPriorityчтобы настроить

  3. 12хорошо, если прошелnew Thread(Runnable target)способ создать поток, а затем получить входящийRunnable target, вызывается при запуске потокаrunбудет выполняться не пустойtargetизrunметод. Теоретически существует три способа создания потока:

    • выполнитьRunnableинтерфейсMyRunnable,пройти черезnew Thread(myRunnable)воплощать в жизньMyRunnableсерединаrun
    • наследоватьThreadи переписатьrun,пройти черезnew MyThread()выполнить перезаписьrun
    • наследоватьThreadи переписатьrun, все еще можно передать конструкторуRunnableЭкземпляр класса реализации:new MyThread(myRunnable), а только выполняетMyThreadпереписан вrun, не повлияетmyRunnableлюбое воздействие. В этом способе создания тредов много неясностей, за исключением того, что интервьюер может вас смутить, использовать этот способ не рекомендуется.
  4. Установите приоритет потока, всего существует 10 уровней приоритета, соответствующих значению[0,9], чем больше значение, тем выше приоритет. Однако этот параметр зависит от платформы, что означает, что он может быть действительным в некоторых операционных системах и недействительным в некоторых операционных системах, поскольку потоки Java напрямую отображаются на потоки ядра, поэтому конкретное планирование по-прежнему зависит от операционной системы. .

  5. Установите размер стека. Этот размер относится к размеру памяти стека, а не к максимальному количеству кадров стека, которое может содержать стек.Вызов и возврат каждого метода соответствуют процессу передачи и извлечения кадра стека из стека виртуальной машины потока в стек. стек, как описано в следующем разделе. Этот параметр будет введен в . Чтобы узнать о стеке виртуальных машин, обратитесь к Главе 2 книги «Углубленное понимание виртуальной машины Java (второе издание)».

  6. установить нитьID, — уникальный идентификатор потока. Например, когда смещенная блокировка смещена в сторону потока, она будет отображаться в заголовке объекта.Mark WordИдентификатор потока хранится в (предвзятые блокировки можно найти в главе 5 «Искусство параллельного программирования» и «Углубленное понимание виртуальных машин Java»).

    пройти черезnextThreadIDнайдетstatic synchronizedметод атомарного получения серийного номера потокаthreadSeqNumberУвеличенное значение:

    public static void main(String[] args) {
        new Thread(() -> {
            System.out.println(Thread.currentThread().getId()); //11
        }).start();
    }
    

    ПочемуmainИдентификатор первого потока, созданного в JVM, равен 11 (это означает, что он является 11-м потоком, созданным после запуска JVM)? Это потому, что JVM выполняетmainзапустит первый поток процесса JVM (называемыйmainthread) и запустит некоторые потоки демона, такие как потоки GC.

Многопоточность и структура памяти JVM

Структура памяти JVM

image

Здесь следует отметить, что каждый поток имеет собственный стек виртуальной машины. Стеки всех потоков хранятся в области стека виртуальной машины области данных времени выполнения JVM.

структура памяти кадра стека

image

параметр stackSize

Threadобеспечивает настраиваемыйstackSizeПерегруженный конструктор:

public Thread(ThreadGroup group,
              Runnable target,
              String name,
              long stackSize)

Официальная документация описывает этот параметр следующим образом:

The stack size is the approximate number of bytes of address space that the virtual machine is to allocate for this thread's stack. The effect of the stackSize parameter, if any, is highly platform dependent.

вы можете указатьstackSizeПараметр приблизительно указывает размер памяти стека виртуальной машины (Уведомление: это размер памяти, то есть количество байтов, а не максимальное количество кадров стека, которое может быть размещено в стеке, и этот размер относится к размеру стека потока, а не к размеру всей виртуальной машины. область стека). И этот параметр имеет высокую степень зависимости от платформы, то есть на каждой операционной системе один и тот же параметр показывает разные эффекты.

On some platforms, specifying a higher value for the stackSize parameter may allow a thread to achieve greater recursion depth before throwing a StackOverflowError. Similarly, specifying a lower value may allow a greater number of threads to exist concurrently without throwing an OutOfMemoryError (or other internal error). The details of the relationship between the value of the stackSize parameter and the maximum recursion depth and concurrency level are platform-dependent. On some platforms, the value of the stackSize parameter may have no effect whatsoever.

На некоторых платформах дляstackSizeУказание большего значения позволяет потоку достичь большей глубины рекурсии, прежде чем генерировать исключение переполнения стека (поскольку размер кадра стека метода известен во время компиляции. Взяв в качестве примера таблицу локальных переменных, только переменные базового типа имеютlongа такжеdoubleОн занимает 8 байт, а остальные обрабатываются как 4 байта.Ссылочный тип занимает 4 байта или 8 байтов в зависимости от того, является ли виртуальная машина 32-разрядной или 64-разрядной. В этом случае, чем больше стек, тем больше максимальное количество кадров стека, которое может вместить стек, то есть тем больше глубина рекурсии). Аналогично, задав меньшееstackSizeЭто может позволить большему количеству потоков сосуществовать и избежать исключений OOM (некоторые читатели могут XOR, почему нелегко генерировать исключения OOM, когда стек мал? Разве стек не должен быть меньше, памяти недостаточно, и OOM проще На самом деле в однопоточной среде без OOM может произойти только переполнение стека, потому что размер кадра стека, соответствующий каждому методу, может быть известен компилятору.При запуске потока часть памяти будет разделена из площадь стека виртуальной машины соответствует размеру стека. Поэтому, если будет отправлено слишком много кадров стека или слишком много кадров стека для отправки, это приведет только к тому, что стек больше не будет вмещать кадр стека и приведет к переполнению стека. когда OOM будет отброшен?Для области стека виртуальной машины, если не хватит памяти для разделения в качестве стековой памяти нового потока, OOM будет отброшен.Нетрудно понять, что осталась ограниченная память процесса после удаления памяти кучи, области методов и памяти, необходимой для самой JVM. Стек виртуальной машины ограничен, чем меньше выделено каждому стеку, тем больше потоков может сосуществовать естественным образом). Наконец, на некоторых платформах, независимо отstackSizeНезависимо от того, что он установлен, вероятно, ничего не сделает.

The virtual machine is free to treat the stackSize parameter as a suggestion. If the specified value is unreasonably low for the platform, the virtual machine may instead use some platform-specific minimum value; if the specified value is unreasonably high, the virtual machine may instead use some platform-specific maximum. Likewise, the virtual machine is free to round the specified value up or down as it sees fit (or to ignore it completely).

Виртуальные возможности будутstackSizeКак предложение, есть еще определенное мнение в настройке размера стека. Если заданное значение слишком мало, виртуальная машина установит размер стека на минимальный размер стека, соответствующий платформе; соответственно, если заданное значение слишком велико, будет установлен максимальный размер стека, соответствующий платформе. Кроме того, виртуальная машина может округлить заданное значение вверх или вниз, чтобы установить подходящий размер стека (даже виртуальная машина игнорирует его).

Due to the platform-dependent nature of the behavior of this constructor, extreme care should be exercised in its use. The thread stack size necessary to perform a given computation will likely vary from one JRE implementation to another. In light of this variation, careful tuning of the stack size parameter may be required, and the tuning may need to be repeated for each JRE implementation on which an application is to run.

Из-за того, что этот конструктор зависит от платформы, его необходимо использовать с особой осторожностью. Правила вычисления фактического размера стека потоков будут вести себя по-разному в зависимости от реализации JVM. Учитывая это изменение, может потребоваться тщательная настройка параметра размера стека, и, возможно, потребуется настроить его по-разному для разных реализаций JVM, используемых приложением.

Implementation note: Java platform implementers are encouraged to document their implementation's behavior with respect to the stackSizeparameter.

Глубина вызова метода при переполнении стека, если stackSize не указан:

public class StackSizeTest {
    public static int counter = 0;
    public static void main(String[] args) {
        new Thread(() -> {
            try {
                count();
            } catch (StackOverflowError e) {
                System.out.println(counter);	// result -> 35473
            }
        }).start();
    }

    public static void count() {
        counter++;
        count();
    }
}

Укажите размер стека как 10 КБ.

Явно указатьstackSizeПосле этого существенно влияет размер стека потоков, а глубина вызова увеличивается на исходную.35473стал296:

public class StackSizeTest {
    public static int counter = 0;
    public static void main(String[] args) {
        new Thread(null,() -> {
            try {
                count();
            } catch (StackOverflowError e) {
                System.out.println(counter);
            }
        },"test-stack-size",10 * 1024).start(); //stackSize -> 10KB  result -> 296
    }

    public static void count() {
        counter++;
        count();
    }
}

Измените размер кадра стека, изменив размер локальных переменных

Чтобы изменить размер кадра стека, это можно сделать, добавив локальные переменные. следующее, добавив несколькоlongПеременные (одна занимает 8 байт), по сравнению с предыдущим тестом значительно уменьшена глубина вызова метода:

public class StackSizeTest {
    public static int counter = 0;
    public static void main(String[] args) {
        new Thread(null,() -> {
            try {
                count();
            } catch (StackOverflowError e) {
                System.out.println(counter);
            }
        },"test-stack-size",10 * 1024).start(); //stackSize -> 10KB  result -> 65
    }

    public static void count() {
        long a,b,c,d,e,f,g,h,j,k,l,m,n,o,p,q;
        counter++;
        count();
    }
}

Потоки демона и сценарии их использования

пройти черезthread.setDaemon(true)Вновь созданный поток может быть установлен как поток демона, который должен быть выполнен до запуска потока (thread.start) настройка действительна.

  • Характеристика потока демона состоит в том, что когда его родительский поток завершается, поток демона также будет уничтожен.
  • JVM завершит работу только тогда, когда завершится последний поток, не являющийся демоном.

Обнаружение сердцебиения

В кластерной архитектуре обычно требуется механизм обнаружения пульса. Если приложение открывает поток, не являющийся демоном, для обнаружения сердцебиения, может случиться так, что основная программа приложения будет завершена, но поток обнаружения сердцебиения все еще работает.В это время JVM будет продолжать занимать систему, потому что все еще не потоки демона работают.процессор, ресурсы памяти, чего явно не должно быть.

Следующий код просто имитирует этот сценарий:

public class HeartCheck {
    public static void main(String[] args) {

        // worker thread
        new Thread(()->{

            // start the heart-check thread first
            Thread heartCheck = new Thread(()->{
                // do interval-automatic heart check and notify the parent thread when heart check has error
                while (true) {
                    System.out.println("do heart check");
                    try {
                        Thread.sleep(100);	//interval
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            heartCheck.setDaemon(true);
            heartCheck.start();

            // simulate work
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

Подробное объяснение метода соединения

Анализ исходного кода

Перейдите непосредственно к исходному коду:

public final synchronized void join(long millis) throws InterruptedException {
    long base = System.currentTimeMillis();
    long now = 0;

    if (millis < 0) {
        throw new IllegalArgumentException("timeout value is negative");
    }

    if (millis == 0) {
        while (isAlive()) {
            wait(0);
        }
    } else {
        while (isAlive()) {
            long delay = millis - now;
            if (delay <= 0) {
                break;
            }
            wait(delay);
            now = System.currentTimeMillis() - base;
        }
    }
}

Если поток вызываетсяthreadизjoin(), будут распределены поjoin(0), выполнить вышеуказанное10~12ОК, пока текущий поток получает правильное выполнение ЦП, он будет опрашиватьthreadстатус выполнения (isAliveЯвляетсяnativeметод, но мы можем предположить, что его роль заключается в обнаруженииthreadЖив ли он, т.е. нетTerminatedстатус), когда-то найденныйthreadЕсли он все еще жив, он освободит право на выполнение процессора (черезwait(0)метод), дождитесь следующего раунда опроса, покаthreadв завершенное состояние, то текущий поток будетthread.join()вернуть.

Обязательно четко различайте, звонитеthread.join()Текущий поток заблокирован, он не будетthreadНити не имеют никакого эффекта.

joinПредоставляет перегруженный метод ожидания с ограничением по времени (это классическая модель ожидания с тайм-аутом: возврат только в том случае, если условие выполнено или лимит времени ожидания превышен), что также позволяет избежать попадания текущего потока в ловушку вечной дилеммы ожидания. , который может ждать некоторое время. Автоматически возвращается после обнаружения того, что целевой поток не завершил выполнение.

joinОдним из наиболее интересных моментов является то, что если поток вызывает свой собственныйjoinметод, то поток будет бесконечнымwaitвниз, потому что:Thread.currentThread().join()Будет ждать, пока текущий поток завершит выполнение, и текущий поток вызывает текущий потокjoinТо есть, когда текущий поток завершает выполнение... пусть играет медленно сам~

присоединиться к сценариям использования

Выполняйте задания шаг за шагом

Например, может потребоваться обработка журналов поведения пользователей на веб-сайтах электронной коммерции посредством агрегирования, проверки, анализа, классификации и т. д. и, наконец, их сохранение в базе данных. И выполнение этих шагов должно быть пошаговой обработкой, затем шаг должен дождаться окончания предыдущего шага, чтобы получить результат и запуститься, тогда его можно использовать.joinСделай это.

Следующий код просто имитирует этот сценарий:

public class StepByStep {

    public static void main(String[] args) throws InterruptedException {
        Thread step1 = new Thread(() -> {
            System.out.println("start capture data...");
            //simulate capture data
            try {
                Thread.sleep(1000);
                System.out.println("capture done.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        step1.start();

        Thread step2 = new Thread(() -> {
            try {
                step1.join();
                System.out.println("start screen out the data...");
                Thread.sleep(1000);
                System.out.println("screen out done.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        step2.start();

        Thread step3 = new Thread(() -> {
            try {
                step2.join();
                System.out.println("start analyze the data...");
                Thread.sleep(1000);
                System.out.println("analyze done.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        step3.start();

        Thread step4 = new Thread(() -> {
            try {
                step3.join();
                System.out.println("start classify the data");
                Thread.sleep(1000);
                System.out.println("classify done.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        step4.start();

        step4.join();

        System.out.println("write into database");
    }
}

Стоит отметить, что если вы вызываетеjoin, который немедленно вернется:

public class StepByStep {

    public static void main(String[] args) throws InterruptedException {
        Thread t = new Thread(() -> {

        });
        t.join();
    }
}

Модель разветвления/соединения

Иногда количество задач слишком велико и задачи являются разделимыми (между подзадачами нет зависимости), тогда мы могли бы также разделить задачу на несвязанные подзадачи (этот шаг называетсяFork), назначить отдельный поток для каждой подзадачи, чтобы реализовать параллельное выполнение подзадач, повысить эффективность выполнения и, наконец, интегрировать результаты каждой подзадачи для окончательной обработки (основной поток может использоватьjoinдождаться результатов выполнения каждого потока подзадачи, чтобы, наконец, сделать сводку). Предоставлено JDK8Streamа такжеForkJoinРамки имеют эту модель рисунка.

Аномальное восприятие

мы можем пройтиjoinОграниченное по времени ожидание, обеспечиваемое перегруженным методом .

Подробное объяснение прерывания

public void interrupt() {
    if (this != Thread.currentThread())
        checkAccess();

    synchronized (blockerLock) {
        Interruptible b = blocker;
        if (b != null) {
            interrupt0();           // Just to set the interrupt flag
            b.interrupt(this);
            return;
        }
    }
    interrupt0();
}

Вот деталь,interruptСначала устанавливается флаг прерывания потока, а затем он прерывается.

Ознакомьтесь с официальной документацией:

If this thread is blocked in an invocation of the wait(), wait(long), or wait(long, int) methods of the Object class, or of the join(), join(long), join(long, int), sleep(long), or sleep(long, int), methods of this class, then its interrupt status will be cleared and it will receive an InterruptedException.

If none of the previous conditions hold then this thread's interrupt status will be set.

Interrupting a thread that is not alive need not have any effect.

Отсюда мы можем извлечь три части информации:

  1. Timed-Waiting/WaitingПосле того, как поток будет прерван, он сначала очистит свой флаг прерывания, а затем выброситInterruptedException. Итак, прерванная нить входит
  2. это работает (Runnable/Running) не будет прерван, но будет установлен его флаг прерывания, т.isInterruptedвернусьtrue
  3. Вызывается в потоке в завершенном состоянииinterruptне будет иметь никакого эффекта.

isInterrupted

Tests whether this thread has been interrupted. The interrupted status of the thread is unaffected by this method.

A thread interruption ignored because a thread was not alive at the time of the interrupt will be reflected by this method returning false.

Проверьте, был ли поток прерван, вызов этого метода не изменит флаг прерывания потока. Вызывается в потоке в завершенном состоянииinterruptне приводит к возврату методаtrue.

Таким образом, мы можем использоватьisInterruptedДавайте проверим 3 вывода, сделанные выше:

public static void main(String[] args) throws InterruptedException {
    Thread t1 = new Thread(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {

        }

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
    t1.start();
    t1.interrupt();
    System.out.println(t1.isInterrupted());	//true
    Thread.sleep(1000);
    System.out.println(t1.isInterrupted());	//false
}

Приведенный выше код находится вt1.interruptпроверить сразу послеt1бит флага прерывания из-заinterruptзаключается в том, чтобы сначала установить бит флага прерывания, а затем прерывание, поэтому17Выход строки обнаружил, что бит флага прерывания возвращаетсяtrue;тогда18~19сначала подождиt1метаниеInterruptedExceptionКогда флаг сброшен, обнаруживается флаг прерывания и возвращается.falseДокажите вывод 1: выбросьтеInterruptedExceptionПервым сбрасывается бит флага прерывания.

static volatile boolean flag = true;
public static void main(String[] args) throws InterruptedException {
    Thread t1 = new Thread(() -> {
        while (flag) {

        }
    });
    t1.start();
    t1.interrupt();
    System.out.println(t1.isInterrupted());	//true
    flag = false;
    t1.join();
    System.out.println(t1.isInterrupted());	//false
}

interruptedТекущий поток не прерывается, но его флаг прерывания установлен, поэтому первый10возврат линииtrue. посредством13По выходу строки мы также можем сделать новый вывод: даПоток в завершенном состоянииперечислитьisInterruptedвсегда будет возвращатьсяfalse.

interrupted

Это статический метод обнаружениятекущий потокпрерывается, но сisInterruptedотличается, его вызов приведет к сбросу флага прерывания текущего потока иisInterruptedявляется методом экземпляра. То есть, если два последовательных вызова сделаныThread.interrupted, второй раз обязательно вернусьfalse.

static volatile boolean flag = true;
public static void main(String[] args) throws InterruptedException {
    Thread t1 = new Thread(() -> {
        while (flag) {

        }
        System.out.println(Thread.currentThread().isInterrupted());	//true
        System.out.println(Thread.interrupted());	//true
        System.out.println(Thread.interrupted());	//false
    });
    t1.start();
    t1.interrupt();
    flag = false;
}

Как изящно завершить поток

stop

ThreadЕсть устаревший методstop, причина устаревания в том, что этот метод похож наlinuxсерединаkill -9Этот метод принудительно завершает поток немедленно, не давая ему шанса передохнуть, а это означает, что в наполовину выполненной программе внезапно больше нет текста, и если поток открывает ресурсы, такие как ввод-вывод и соединения с базой данных, они не будут быть освобождены вовремя.

Использование потоков демона и объединений

Поток демона завершается, когда завершается его родительский поток, поэтому мы можем косвенно завершить его, установив поток как поток демона, контролируя, когда завершается его родительский поток:

public class ThreadService {

    private Thread executeThread;
    private volatile boolean finished;

    public void execute(Runnable task) {
        executeThread =new Thread(() -> {
            Thread t = new Thread(() -> {
                task.run();
            });
            t.setDaemon(true);
            t.start();

            try {
                t.join();
                finished = true;
            } catch (InterruptedException e) {
                System.out.println("task execution was interrupted");
            }
        });
        executeThread.start();
    }

    public void shutdown(long millis) {
        long base = System.currentTimeMillis();
        long now = 0;
        while (!finished) {
            now = System.currentTimeMillis() - base;
            if (now >= millis) {
                System.out.println("task execution time out, kill it now");
                executeThread.interrupt();
                break;
            }

            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                System.out.println("was interrupted when shutdown");
            }
        }
        finished = true;
    }
}

В приведенном выше коде это можно сделать, указавshutdownпроходить вtaskсрок реализации, требующий, чтобы он былmillisОн выполняется в течение времени. Если оно превышает это время, оно рассматривается как исключение выполнения задачи и завершается завершением его родительского потока. Если он выполняется нормально, вmillisвернуть в течение времени, что также приведет к концу родительского потока,shutdownголосованиеfinishedсостояние, чтобы ощутить окончание выполнения задачи.

Использование общих переменных состояния

public class ThreadCloseGraceful implements Runnable{

    private volatile boolean stop = false;
    
    @Override
    public void run() {
        while (true) {
            if (stop) {
                break;
            }
            // to do here
        }
    }

    public void shutdown() {
        stop = true;
    }
}

Смысл этого подхода в том, что общие переменные состояния должны быть объявлены какvolatile, чтобы исполнительный поток мог его вовремя воспринятьshutdownЗаказ.

флаг прерывания опроса

Команда прерывания из внешнего мира воспринимается путем опроса бита флага прерывания потока.

public class ThreadCloseGraceful extends Thread{

    @Override
    public void run() {
        while (true) {
            if (Thread.interrupted()) {
                break;
            }
            // to do here
        }
    }

    public void shutdown() {
        this.interrupt();
    }
}

suspend/resume

resume/suspendОсновная причина отказа от поддержки заключается в том, чтоsuspendПриостановка потока не освобождает общие ресурсы, которые он содержит, если поток удерживает одну или несколько блокировок и выполняетsuspend, то все потоки, ожидающие снятия блокировки или этих блокировок, будут заблокированы на длительное время. если ты окажешьсяresumeНить подвешенной нити тоже должна заранее приобрести эти замки, тогдаresumeПотоки также блокируются, что может привести кsuspendНи один поток не проснется, и эти потоки будут заблокированы навсегда.

Следовательно, в параллельном сценарии для критических секцийsuspendа такжеresumeПотоки противостоят друг другу, независимо от того, кто первым войдет в критическую секцию, это приведет к тому, что эти два потока или даже несколько потоков попадут в тупик.

Подробное объяснение синхронизированного

synchronizedГарантируется, что выполнение синхронизированного кода в многопоточной среде сериализуется.

Синхронизированное использование ключевых слов

  • Если используется метод экземпляра, поток сначала получает метод при входе в метод (критическая секция).thisобъектmonitor(то есть то, что мы обычно называем замком, термин — монитор),monitorОдновременно он может удерживаться только одним потоком. Если получение не удается, он будет находиться в заблокированном состоянии (BLOCKED) до тех пор, пока блокировка не будет снята (поток, удерживающий блокировку, не выйдет из метода/критического раздела), и поток будет присоединиться к новому витку замков в стремлении
  • Если используется в статических методах, вам нужно получить текущий классClassобъектmonitor, логика получения-снятия блокировки такая же, как и у методов экземпляра.
  • Используемые на блоках кода (блоки кода еще можно назвать критическими секциями), первые два — это семантика самого JDK, подразумевающая заблокированные объекты. Для кодовых блоков вам необходимоsynchronizedЯвно укажите объект синхронизации после круглых скобок, и логика получения-снятия блокировки останется прежней.

Особенности синхронизированного ключевого слова

  • Если блокировка не может быть получена, поток, заблокированный на блокировке, будет разбужен, когда блокировка будет снята, что заставит поток переключиться из режима пользователя в режим ядра.Накладные расходы времени относительно велики, даже больше, чем фактические накладные расходы на выполнение кода критической секции. Поэтому, в принципе, для уменьшенияsynchronizedОднако с обновлением JDK появились такие оптимизации, как спин-блокировки, адаптивные спины, устранение блокировок, огрубление блокировок, предвзятые блокировки и упрощенные блокировки (см. «Углубленное понимание виртуальной машины Java (второе издание)»). "Глава высокого параллелизма")synchronizedСтоимость на самом деле не такая уж и большая.

  • Реентерабельный, если текущий поток уже содержитmonitor, при повторном входе требуетmonitorВ критической секции вы можете войти напрямую, минуя этап получения блокировки.

  • Поток может содержать несколькоmonitor.Уведомление, эта операция может легко привести к взаимоблокировке, следующий код имитирует этот сценарий:

    public class DeadLock {
        public static Object lock1 = new Object();
        public static Object lock2 = new Object();
    
        public static void main(String[] args) {
            IntStream.rangeClosed(0,19).forEach(i->{
                if (i % 2 == 0) {
                    new Thread(() -> m1()).start();
                } else {
                    new Thread(() -> m2()).start();
                }
            });
        }
    
        public static void m1() {
            synchronized (lock1) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (lock2) {
                    System.out.println(Thread.currentThread().getName());
                }
            }
        }
    
        public static void m2() {
            synchronized (lock2) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (lock1) {
                    System.out.println(Thread.currentThread().getName());
                }
            }
        }
    }
    

    Приведенный выше код имеет высокую вероятность взаимоблокировки, но оперативной информации не будет. мы можем пройтиjps/jstackПроверьте статус потока:

    C:\Users\zaw>jps
    2864
    5664 Jps
    4072 Launcher
    2172 DeadLock
    
    C:\Users\zaw>jstack 2172
    "Thread-1" #12 prio=5 os_prio=0 tid=0x0000000018c71800 nid=0x8f0 waiting for monitor entry [0x00000000196cf000]
       java.lang.Thread.State: BLOCKED (on object monitor)
            at deadlock.DeadLock.m2(DeadLock.java:47)
            - waiting to lock <0x00000000d6081098> (a java.lang.Object)
            - locked <0x00000000d60810a8> (a java.lang.Object)
            at deadlock.DeadLock.lambda$null$1(DeadLock.java:21)
            at deadlock.DeadLock?Lambda$3/1989780873.run(Unknown Source)
            at java.lang.Thread.run(Thread.java:748)
    
    "Thread-0" #11 prio=5 os_prio=0 tid=0x0000000018c70800 nid=0x944 waiting for monitor entry [0x00000000195cf000]
       java.lang.Thread.State: BLOCKED (on object monitor)
            at deadlock.DeadLock.m1(DeadLock.java:34)
            - waiting to lock <0x00000000d60810a8> (a java.lang.Object)
            - locked <0x00000000d6081098> (a java.lang.Object)
            at deadlock.DeadLock.lambda$null$0(DeadLock.java:19)
            at deadlock.DeadLock?Lambda$2/999966131.run(Unknown Source)
            at java.lang.Thread.run(Thread.java:748)
    

    Автор опускает статус других потоков, и после анализа причины взаимоблокировки пары потоков остальные 18 потоков аналогичны. Прежде всего9а также18Две строки указывают на то, что два потока указывают на то, что поток заблокирован, поскольку он не может получить блокировку объекта.BLOCKEDгосударство.11~12линия в деталяхThread-1Ожидание получения адреса памяти в0x00000000d6081098Блокировка объекта , уже удерживаемая по адресу памяти0x00000000d60810a8замок объекта.20~21Эта же линия указываетThread-0ждать0x00000000d60810a8объект, уже полученный0x00000000d6081098Блокировка объекта. Видно, что все они ждут, пока другая сторона снимет блокировку без блокировки мозга, поэтому они попадают в тупик.

    существуетjstackПосле перечисления состояния каждого потока JVM мы также анализируем взаимоблокировку для нас:

    Found one Java-level deadlock:
    =============================
    "Thread-19":
      waiting to lock monitor 0x0000000018c5a398 (object 0x00000000d60810a8, a java.lang.Object),
      which is held by "Thread-1"
    "Thread-1":
      waiting to lock monitor 0x0000000018c58d98 (object 0x00000000d6081098, a java.lang.Object),
      which is held by "Thread-0"
    "Thread-0":
      waiting to lock monitor 0x0000000018c5a398 (object 0x00000000d60810a8, a java.lang.Object),
      which is held by "Thread-1"
    

    Мы также можем использовать встроенный в JDK инструмент мониторинга производительности JVM JConsole для более интуитивного анализа состояния потока:

    C:\Users\zaw>jps
    2864
    6148 Jps
    4072 Launcher
    2172 DeadLock
    
    C:\Users\zaw>jconsole 2172
    

    В открывшемся окне инструмента будет задан вопрос, доверять ли небезопасному соединению, нажмите «Да», чтобы войти. После входа вы можете просмотреть статус каждого потока через панель потоков, нажать анализ взаимоблокировок, он проанализирует для нас, какие потоки в текущем процессе JVM заблокированы и почему:

    image

Базовая реализация синхронизированного

Чтобы понять, почему потоки имеют механизмы захвата-освобождения блокировки при выполнении критических разделов (включая синхронизированные методы и синхронизированные блоки кода), нам нужно знать, какие инструкции JVM генерирует это ключевое слово после компиляции.

Сначала мы пишем синхронизированный метод и синхронизированный блок соответственно и тестируем их по отдельности.synchronizedКаков эффект на уровне байт-кода:

public class SynchronizedTest{
	
	public synchronized void m1(){
		System.out.println("sync method");
	}
	
	Object lock = new Object();
	public void m2(){
		synchronized(lock){
			System.out.println("sync block");
		}
	}
}

затем используйтеjavacКомпилировать, так как скомпилированный файл байткода представляет собой бинарный поток байтов, нам неудобно его просматривать (JVM удобно просматривать), поэтому нам также нужно использоватьjavapПреобразуйте его в понятный нам контент (см. формат файла класса в «Углубленное понимание виртуальной машины Java (второе издание)» для формата байт-кода). Чтобы позаботиться о читателях, которые не знакомы с этой частью , автор сделал удалить, только фокусsynchronizedПолученный эффект:

C:\Users\zaw>cd Desktop

C:\Users\zaw\Desktop>javac SynchronizedTest.java

C:\Users\zaw\Desktop>javap -verbose SynchronizedTest.class

public class SynchronizedTest
  minor version: 0
  major version: 52
  flags: ACC_PUBLIC, ACC_SUPER
Constant pool:
  	# 这里省去了常量池部分
{
  java.lang.Object lock;
    descriptor: Ljava/lang/Object;
    flags:

  public synchronized void m1();
    descriptor: ()V
    flags: ACC_PUBLIC, ACC_SYNCHRONIZED
    Code:
      stack=2, locals=1, args_size=1
         0: getstatic     #4                  // Field java/lang/System.out:Ljava/io/PrintStream;
         3: ldc           #5                  // String sync method
         5: invokevirtual #6                  // Method java/io/PrintStream.println:(Ljava/lang/String;)V
         8: return
      LineNumberTable:
        line 4: 0
        line 5: 8

  public void m2();
    descriptor: ()V
    flags: ACC_PUBLIC
    Code:
      stack=2, locals=3, args_size=1
         0: aload_0
         1: getfield      #3                  // Field lock:Ljava/lang/Object;
         4: dup
         5: astore_1
         6: monitorenter
         7: getstatic     #4                  // Field java/lang/System.out:Ljava/io/PrintStream;
        10: ldc           #7                  // String sync block
        12: invokevirtual #6                  // Method java/io/PrintStream.println:(Ljava/lang/String;)V
        15: aload_1
        16: monitorexit
        17: goto          25
        20: astore_2
        21: aload_1
        22: monitorexit
        23: aload_2
        24: athrow
        25: return
   Exception table:
         from    to  target type
             7    17    20   any
            20    23    20   any
}
SourceFile: "SynchronizedTest.java"

Хотя приведенный выше код выглядит длинным, нам нужно сосредоточиться только на двух моментах:

  • В сравнении20линия и33Хорошо, вы найдете метод синхронизацииm1чем асинхронные методыm2изflagsодин дополнительныйACC_SYNCHRONIZED, поэтому, когда поток входит в метод синхронизации, если он обнаруживает методflagsВключатьACC_SYNCHRONIZED, то поток попытается получитьthisили класса, в котором находится методClassэкземпляр (это зависит от того, является ли метод методом экземпляра или статическим методом), т.е. метод синхронизацииsynchronizedСемантика через флаги методаACC_SYNCHRONIZEDДля достижения этого процесс синхронизации является неявным (объект синхронизации указывается JVM, а снятие блокировки выполняется JVM).
  • посмотри снова40~49строку, находим, что она нам дает внутри блока synchronizedSystem.out.println("sync block")Добавлен один до и послеmonitorenterс однимmonitorexit, что соответствует захвату-освобождению блокировки.Эта семантика синхронизации является явной.Объект синхронизации и критическая секция контролируются нами, что является более гибким, чем метод синхронизации.

Также стоит отметить, что выше49Почему строка кода появляется сноваmonitorexit? Это делается для того, чтобы в случае возникновения исключения во время выполнения блока синхронизированного кода блокировка, удерживаемая потоком, также могла быть снята до того, как возникнет исключение (без воздействия на другие потоки, ожидающие получения блокировки).

Как избежать тупика

После приведенного выше анализа понимание замков должно иметь более глубокое понимание. Итак, как избежать взаимоблокировки? Заблокированный поток не будет ни работать, ни продолжать использовать системные ресурсы, и наше приложение должно избегать этого.

  • Избегайте потока, удерживающего несколько блокировок одновременно. Если поток пытается получить другие блокировки, когда он уже удерживает блокировку, то, как только получение завершится ошибкой, это неизбежно приведет к блокировке потока блокировкой, которую он уже имеет, а блокирование ресурсов синхронизации сопряжено с высокой степенью параллелизма и недружественно, и его следует избегать.
  • Избегайте критических разделов, выполнение которых занимает слишком много времени. Если во время выполнения критической секции возникает исключение, обычно поток не может выйти из критической секции, не может вернуться вовремя и т. д. Это приведет к тому, что поток, удерживающий блокировку, будет откладывать освобождение блокировки.Если есть только много других потоков, ожидающих получения блокировки, эти потоки будут заблокированы в течение длительного времени. Мы стараемся, чтобы критический раздел был как можно меньше, и синхронизируем только тот код, в котором происходят гонки данных. В то же время, чтобы избежать попадания большого количества потоков в длительное ожидание из-за получения блокировки, следует использоватьLockизtryLock(millis)Механизм ожидания тайм-аута: как только вы обнаружите, что время ожидания слишком велико, нет необходимости ждать все время, вы можете попытаться получить блокировку после выполнения других задач. Позже мы напишем блокировку, которая может автоматически возвращаться после ожидания тайм-аута для этой ситуации.

ждать/уведомлять и ждать набор

устарелsuspend/resumeПосле этого официально рекомендуется использоватьwait/notifyзаменять. а такжеsuspend/resumeразное позиционирование,wait/notifyреализовано вObject, метод, который могут вызывать все объекты. и вызывающий объектwait/notifyобъектmonitor.

Ниже приводится официальныйwait(millis)Даны инструкции:

* This method causes the current thread (call it <var>T</var>) to
* place itself in the wait set for this object and then to relinquish
* any and all synchronization claims on this object. Thread <var>T</var>
* becomes disabled for thread scheduling purposes and lies dormant
* until one of four things happens: notify, notifyAll, interrupt, time out

называть объектobjизwaitметод приведет к тому, что текущий поток выполнения будет помещенobjв очереди(wait set, гостиная нити) и освободите нить черезsynchronizedВсе блокировки уже удерживаются, а затем освободить право выполнения ЦП на ожидание до тех пор, покаnotify/notifyAllУведомляется и вызывается другими потокамиinterruptТолько когда прерывание или время ожидания превысит установленный предел времени, он перейдет в состояние готовности к борьбе за право выполнения ЦП.

Здесь следует отметить, что нить неnotify/notifyAllПроснувшись, вы можете сразуwaitВозврат, после пробуждения, он только заставит поток войти в состояние готовности, чтобы бороться за право на выполнение ЦП, только для получения права на выполнение ЦП и получения всехwaitБлокировка может быть снята только сwaitreturn, иначе поток все еще будет блокироваться вwaitначальство.

использоватьwait/notify, мы можем реализовать связь между потоками.

ждать/уведомлять классическую парадигму

официально даноwait/notifyИспользуется классическая парадигма:

synchronized (obj) {
         while (<condition does not hold>)
             obj.wait();
         ... // Perform action appropriate to condition
     }

использоватьwhileбез использованияifПричина в том, чтобы проснуться иwaitВозвращающийся поток должен постоянно проверять условия, с которыми он связан, поскольку его пробуждение может быть вызвано не тем, что другой поток целенаправленно пробуждает поток, чтобы уведомить поток.notifyслучайное пробуждение,notifyAllОдновременно захватить замок может только один из пробужденных и пробужденных потоков.waitВозвращаемый поток имеет много неопределенности. Поскольку условия для каждого потока различны, необходимо провести опрос, чтобы определить, верны ли условия, прежде чемwhileвыходить.

Отсюда мы можем использоватьwait/notifyРеализуйте модель коммуникации производитель-потребитель:

public class ClassicForm {

    private static String message;
    private static Object lock = new Object();

    public static void main(String[] args) {
        Thread consumer = new Thread(() -> {
            while(true){
                synchronized (lock) {
                    while (message == null) {	// wait for producing
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            System.out.println("consumer was broken");
                            return;
                        }
                    }
                    System.out.println("CONSUMER receive message : " + message);
                    message = null;
                    lock.notify();
                }
            }
        });

        Thread producer = new Thread(() -> {
            synchronized (lock) {
                for(int i = 0 ; i < 100 ; i++){
                    while (message != null) {	// wait for consuming
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            System.out.println("producer was broken");
                            return;
                        }
                    }
                    message = "please the order, order-id is " + i;
                    lock.notify();
                    System.out.println("PRODUCER send the message : " + message);
                }
            }
        });

        consumer.start();
        producer.start();
    }

}

вы найдете здесьmessageдаже без добавленияvolatile, потребитель может каждый раз точно получать изменения, внесенные производителем. Это сделаноsynchronizedизunlockИнструкция и JMM (модель памяти Java) определяются совместно, и JMM будет подробно расширен позже.

ждать/уведомлять модель ожидания с ограничением по времени

Вышеприведенный код имеет очевидный недостаток, то есть, если производитель медленно создает сообщения, потребитель будет продолжатьwaitпока не появятся новые новости. Таким образом, ресурсы, занятые потоком-потребителем, не используются полностью. Можно ли установить ограничение времени ожидания потребителей? После того, как время ожидания превысит лимит,waitТеперь сначала займитесь другими задачами, а затем прослушайте сообщения, созданные продюсером. Следующий код просто имитирует этот сценарий:

public class WaitTimeoutModel {

    private static String message;
    private static Object lock = new Object();
    private static final long MAX_WAIT_LIMIT = 1000;

    public static void main(String[] args) {
        Thread consumer = new Thread(() -> {
            synchronized (lock) {
                while (true) {
                    long base = System.currentTimeMillis();
                    long now = 0;
                    while (message == null) {
                        now = System.currentTimeMillis() - base;
                        if (now >= MAX_WAIT_LIMIT) {
                            break;  // exit wait
                        }
                        try {
                            lock.wait(MAX_WAIT_LIMIT);
                        } catch (InterruptedException e) {
                            System.out.println("consumer was broken");
                        }
                    }
                    if (message == null) {
                        System.out.println("CONSUMER exit wait, and do other things");
                        try {   // simulate do other thing
                            Thread.sleep(50);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    } else {
                        System.out.println("CONSUMER receive the message : " + message);
                        message = null;
                    }
                }
            }
        });

        Thread producer = new Thread(() -> {
            // prepare message is very slowly
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // notify consumer
            synchronized (lock) {
                message = "please handle the order, order-id is 5454656465";
                lock.notify();
                System.out.println("PRODUCER send the message : " + message);
            }
        });

        consumer.start();
        producer.start();
    }
}

Суть в том, что на основе классической парадигмы в процессе опроса переменных состояния добавляется оценка времени ожидания (раздел14~17строка), если найден заданный срок (здесьMAX_WAIT_LIMIT), затем перестаньте ждать и займитесь чем-нибудь другим (стр.25~30линии), наоборот, если вwait(MAX_WAIT_LIMIT)В этот период, из-за напоминания о пробуждении производителя, также выскочит опрос (производитель обычно будит потребителя после производства сообщения) и входит в первый32~33линия для потребления сообщений. Но в любом случае это конец логической исполнительной единицы для потребителя. Поскольку потребитель обычно работает с монитором 24 часа в сутки (while(true)), поэтому в конце каждого исполнительного блока будет сбрасыватьсяbaseа такжеnow(п.11~12Ряд).

Эффект операции следующий:

CONSUMER exit wait, and do other things
PRODUCER send the message : please handle the order, order-id is 5454656465
CONSUMER receive the message : please handle the order, order-id is 5454656465
CONSUMER exit wait, and do other things
CONSUMER exit wait, and do other things
CONSUMER exit wait, and do other things
...

Модель тайм-аута-ожидания широко используется в шаблонах параллельного проектирования и пакетах JUC, и ее необходимо хорошо понимать.

Существенная разница между ожиданием и сном (часто задаваемый вопрос в интервью)

  • waitдаObjectПеред вызовом необходимо получить метод экземпляра и блокировку объекта экземпляра.sleepдаThreadСтатические методы можно вызывать напрямую
  • sleepблокировки, удерживаемые текущим потоком, не снимаются, иwaitосвободит все блокировки, удерживаемые текущим потоком
  • sleepа такжеwaitприведет к входу потокаTIMED-WAITINGсостояние освобождает права на выполнение ЦП, но вызываетsleepПоток может автоматически вернуться после установленного срока, в то время какwait(millis)По истечении тайм-аута вам необходимо получить блокировку объекта перед возвратом,wait(0)Еще более необходимо дождаться пробуждения и получить замок перед возвращением.

Проблема приостановленной анимации, вызванная несколькими потребителями и несколькими производителями

Следующий код имитирует случай, когда два производителя и два потребителя работают одновременно в модели производитель-потребитель, что приводит к приостановке программы.

public class ProducerConsumer {

    private String message;

    public synchronized void produce() {
        while (message != null) {
            try {
                this.wait();
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getName() + " was broken");
                return;
            }
        }
        message = "time is " + new Date(System.currentTimeMillis());
        this.notify();
        System.out.println(Thread.currentThread().getName() + " send the message : " + message);
    }

    public synchronized void consume() {
        while (message == null) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getName() + " was broken");
                return;
            }
        }
        System.out.println(Thread.currentThread().getName() + " recv the message : " + message);
        message = null;
        this.notify();
    }

    public static void main(String[] args) {
        ProducerConsumer pc = new ProducerConsumer();
        Stream.of("p1", "p2").forEach(name -> {
            new Thread(() -> {
                while (true) {
                    pc.produce();
                }
            }, name).start();
        });

        Stream.of("c1", "c2").forEach(name -> {
            new Thread(() -> {
                while (true) {
                    pc.consume();
                }
            }, name).start();
        });
    }
}

Результат выглядит следующим образом:

p1 send the message : time is Fri Feb 01 14:06:26 CST 2019
c2 recv the message : time is Fri Feb 01 14:06:26 CST 2019
p2 send the message : time is Fri Feb 01 14:06:26 CST 2019
c2 recv the message : time is Fri Feb 01 14:06:26 CST 2019
p1 send the message : time is Fri Feb 01 14:06:27 CST 2019
# 至此,四个线程陷入永久wait

Автор тоже долгое время был другим.Когда Производитель создает сообщение, он уведомляет Потребителя о его потреблении, а после того, как последний его потребляет, он уведомляет Производителя, ожидающего производства.Нет проблем! как попасть вwaitШерстяная ткань?

Это потому, что мы впали в инерционное мышление.Когда мы изучаем модель производитель-потребитель, мы всегда думаем, что производитель уведомит потребителя, когда сообщение будет произведено, и что потребитель уведомит производителя, когда потребление будет завершено. мы забылиnotifyСуть:notifyбудет от объектаwait setсерединаслучайныйВыберите нить, чтобы проснуться. Давайте рационально проанализируем приведенный выше код:17Oknotifyразбудит темуwait setна потребительской ветке? Возможно, нет! Предположим, на мгновениеp1схватил замок иp2,c1,c2заблокированыwaitна, тогдаp1Вызывается после создания сообщенияnotifyМожно ли проснутьсяp2(В этом случае проснувшийсяp2Находитьp1Произведенные сообщения, которые не используются, все равно будут перехватываться.wait, то все четыре потока захватываютсяwaitникакой другой поток, чтобы разбудить их. Точно так же потребитель может проснуться после того, как другой пользователь проглотит сообщение.waitпотребителей такое пробуждение делает бесполезную работу). это потому чтоnotifyизНеопределенность, так что приведенный выше код не следует процедуре производитель-потребитель, а последние четыре потока перехватываются вwaitИ нет нити, чтобы их разбудить.

Но если первый17,34Oknotifyизменить наnotifyAllНикакого тупика не будет. Это потому чтоnotifyAllразбудит все блоки, которые заблокированы на объектеwaitна нитке. следовательноp1После создания сообщения, если вызовnotifyAll,Такp2,c1,c2пробудится и сразится за объектmonitor, даже еслиp2захваченный первым, он также войдет, потому что сообщение не потребляетсяwaitА затем отпустите блокировку и проснитесь в ожидании блокировкиc1,c2,такp1изnotifyAllв конечном итоге приведет к тому, что один из потребителей изwaitВозврат, чтобы даже при наличии нескольких производителей и нескольких потребителей программа могла выполняться.

p2 send the message : time is Fri Feb 01 14:30:39 CST 2019
c1 recv the message : time is Fri Feb 01 14:30:39 CST 2019
p1 send the message : time is Fri Feb 01 14:30:39 CST 2019
c2 recv the message : time is Fri Feb 01 14:30:39 CST 2019
p2 send the message : time is Fri Feb 01 14:30:40 CST 2019
c1 recv the message : time is Fri Feb 01 14:30:40 CST 2019
p1 send the message : time is Fri Feb 01 14:30:41 CST 2019
c2 recv the message : time is Fri Feb 01 14:30:41 CST 2019
p2 send the message : time is Fri Feb 01 14:30:42 CST 2019
c1 recv the message : time is Fri Feb 01 14:30:42 CST 2019
...

Модель производитель-потребитель при многопоточности, чтобы использоватьnotifyAll

Рукописный ввод BooleanLock

упомянутый вышеsynchronizedСерьезным недостатком является то, что если поток, удерживающий блокировку, не снимает блокировку с опозданием (время выполнения критической секции слишком велико), другие потоки, ожидающие блокировки, будут заблокированы до тех пор, пока блокировка не будет снята. Итак, можем ли мы реализовать такой механизм: установить ограничение времени для потока, ожидающего освобождения блокировки, если ограничение времени превышено, то блокировка не будет снята в течение полутора времени, поэтому поток может использовать это время простоя для выполнения других задач, а не блокирование и ничегонеделание все время.

Теперь мы можем использоватьwait/notifyКлассическая парадигмальная реализацияsynchronizedСемантика, использующая модель ожидания тайм-аута для реализации семантики ожидания с ограничением по времени. Сначала определите интерфейс объекта синхронизации, а именноLock:

public interface Lock {

    void lock() throws InterruptedException;

    void unlock();

    void lock(long millis) throws InterruptedException, TimeoutException;

    Collection<Thread> getBlockedThread();

    int getBlockedCount();
}

Затем реализуйте простой, который использует логическую переменную для представления состояния синхронизации.BooleanLock:

public class BooleanLock implements Lock {

    private volatile boolean isSync = false; //represent whether the lock is held or not. true is held, false is not held
    private Thread currentThread;   //current thread which hold the lock
    private Collection<Thread> waitQueue;

    public BooleanLock() {
        this.isSync = false;
        this.currentThread = null;
        this.waitQueue = new ArrayList<>();
    }

    @Override
    public synchronized void lock() throws InterruptedException {
        waitQueue.add(Thread.currentThread());
        while (isSync) {    // lock is held by other thread
            this.wait();
        }
        // get the lock successfully
        waitQueue.remove(Thread.currentThread());
        currentThread = Thread.currentThread();
        isSync = true;  //indicate the lock is held
        System.out.println(Thread.currentThread().getName() + " get the lock");
    }

    @Override
    public void unlock() {
        // check the operator is the thread which is holding the lock
        if (Thread.currentThread() != currentThread) {
            return;
        }
        synchronized (this) {
            currentThread = null;
            isSync = false;
            this.notifyAll();
            System.out.println(Thread.currentThread().getName() + " release the lock");
        }
    }

    @Override
    public synchronized void lock(long millis) throws InterruptedException, TimeoutException {
        long base = System.currentTimeMillis();
        long now = 0;
        waitQueue.add(Thread.currentThread());
        while (isSync) {
            now = System.currentTimeMillis() - base;
            if (now >= millis) {
                throw new TimeoutException();
            }
            this.wait(millis);
        }
        waitQueue.remove(Thread.currentThread());
        currentThread = Thread.currentThread();
        isSync = true;
        System.out.println(Thread.currentThread().getName() + " get the lock");
    }

    @Override
    public Collection<Thread> getBlockedThread() {
        return Collections.unmodifiableCollection(waitQueue);
    }

    @Override
    public int getBlockedCount() {
        return waitQueue.size();
    }
}

контрольная работаsynchronizedСемантика:

public static void main(String[] args) {
    BooleanLock lock = new BooleanLock();
    Stream.of("t1", "t2", "t3", "t4", "t5").forEach(name -> {
        new Thread(() -> {
            try {
                lock.lock();
                Thread.sleep(50); // to do thing
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                printBlockedThread(lock);
                lock.unlock();
            }
        }, name).start();
    });
}

private static void printBlockedThread(BooleanLock lock) {
    System.out.print("There are " + lock.getBlockedCount() + " threads waiting on the lock: ");
    lock.getBlockedThread().forEach(thread -> System.out.print(thread.getName() + " "));
    System.out.println();
}

результат операции:

t1 get the lock
There are 4 threads waiting on the lock: t4 t3 t2 t5 
t1 release the lock
t5 get the lock
There are 3 threads waiting on the lock: t4 t3 t2 
t5 release the lock
t4 get the lock
There are 2 threads waiting on the lock: t3 t2 
t4 release the lock
t2 get the lock
There are 1 threads waiting on the lock: t3 
t2 release the lock
t3 get the lock
There are 0 threads waiting on the lock: 
t3 release the lock

должен быть в курсеunlockдолжно быть написано вfinallyгарантирует, что блокировка будет снята, в то время какsynchronizedИсключение выдается при выполнении синхронизированного блока.JVM освободит удерживания текущего потока, когда исключение выдается через таблицу исключений (см. описание таблицы методов в главе «Углубленное понимание виртуальной машины Java (Второй Edition)" Структура файла класса). все блокировки.

Тестовый ограниченный по времени захват замков

Приведенный выше пример просто реализует то же самоеsynchronizedТа же функция, затем давайте протестируем функцию получения блокировки на ограниченное время, котораяsynchronizedневозможный.

public static void main(String[] args) {
    BooleanLock lock = new BooleanLock();
    Stream.of("t1", "t2", "t3", "t4", "t5").forEach(name -> {
        new Thread(() -> {
            try {
                lock.lock(1000);
                Thread.sleep(2000); // the task is very time-consuming
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getName() + " was interrupted");
            } catch (TimeoutException e) {
                System.out.println(Thread.currentThread().getName() + " get lock time out, so do other thing first and then get the lock again");
            } finally {
                lock.unlock();
            }
        }, name).start();
    });
}

Результат выглядит следующим образом:

t1 get the lock
t2 get lock time out, so do other thing first and then get the lock again
t3 get lock time out, so do other thing first and then get the lock again
t4 get lock time out, so do other thing first and then get the lock again
t5 get lock time out, so do other thing first and then get the lock again
t1 release the lock

Добавьте хуки в свое приложение

При использовании некоторых фреймворков с открытым исходным кодом, таких какTomcat, некоторые журналы все еще будут распечатываться при завершении работы, эти журналы обычно содержат информацию об освобождении ресурсов приложения. То есть мы нажимаемterminateПосле того, как событие зафиксировано приложением, приложение не завершает работу напрямую, а сначала освобождает некоторые ценные ресурсы. Это делается путем установки функции ловушки, которая будет вызываться перед завершением основного потока приложения. вести перепискуAPIдаRuntime.getRuntime().addShutdownHook(thread).

Ниже я будуlinuxВышеприведенное демонстрирует полезность функций ловушек.MyApp.javaПредставляет мое приложение:

public class MyApp{
    public static void main(String[] args){

        Runtime.getRuntime().addShutdownHook(
            new Thread(() -> {
                //release resource here, like socket,connection etc
                System.out.println("releasing resources...");
            })
        );

        while(true){
            // start a service
        }
    }
}

пройти черезaddShutdownHookУстановленная нить будет вmainВызывается, когда поток прерывается внешним миром, например, когда я работаюjava MyAppпрессованныйCTRL C

[root@izm5ecexclrsy1gmkl4bgdz ~]# javac MyApp.java
[root@izm5ecexclrsy1gmkl4bgdz ~]# java MyApp
^Creleasing resources...

Другой пример работает в фоновом режимеMyApp,пройти черезkill pidЗавершить это:

[root@izm5ecexclrsy1gmkl4bgdz ~]# java MyApp &
[1] 14230
[root@izm5ecexclrsy1gmkl4bgdz ~]# jps
14240 Jps
14230 MyApp
[root@izm5ecexclrsy1gmkl4bgdz ~]# kill 14230
[root@izm5ecexclrsy1gmkl4bgdz ~]# releasing resources...

ноkill -9Программа ловушки не будет запущена:

[root@izm5ecexclrsy1gmkl4bgdz ~]# java MyApp &
[1] 14264
[root@izm5ecexclrsy1gmkl4bgdz ~]# ps aux|grep java
root     14264 96.3  1.4 2460724 27344 pts/0   Sl   16:03   0:09 java MyApp
root     14275  0.0  0.0 112660   964 pts/0    R+   16:03   0:00 grep --color=auto java
[root@izm5ecexclrsy1gmkl4bgdz ~]# kill -9 14264
[root@izm5ecexclrsy1gmkl4bgdz ~]# ps aux|grep java
root     14277  0.0  0.0 112660   964 pts/0    R+   16:03   0:00 grep --color=auto java
[1]+  Killed                  java MyApp

получить информацию о стеке

Thread.currentThread().getStackTracke()Получить всю информацию о кадре стека в стеке, когда текущий поток выполняет текущий метод, вернутьStackTraceElement[], элемент представляет кадр стека методов, который можно использовать для определения класса, к которому принадлежит метод, имени метода и количества строк, до которых выполняется метод.

public static void main(String[] args) {
    m1();
}

public static void m1() {
    m2();
}

private static void m2() {
    m3();
}

private static void m3() {
    Arrays.asList(Thread.currentThread().getStackTrace()).stream()
        .filter(
        //过滤掉native方法
        stackTraceElement -> !stackTraceElement.isNativeMethod()
    ).forEach(
        stackTraceElement -> {
            System.out.println(stackTraceElement.getClassName() + ":" +
                               stackTraceElement.getMethodName() + "():" +
                               stackTraceElement.getLineNumber());
        }
    );

}

Поймать исключение во время выполнения метода запуска

потому чтоRunnableинтерфейсrunМетод не объявлен для создания каких-либо исключений, поэтому при переопределенииrunкогда всеchecked exceptionВсе нужно решать вручную. но если броситьunchecked exceptionШерстяная ткань,1/0Это классический пример, как нам его захватить?

пройти черезthread.setUncheckedExceptionHandler()Чтобы иметь возможность сделать это:

public static final int A = 1;
public static final int B = 0;
public static void main(String[] args) {
    Thread thread = new Thread(() -> {
        int i = A / B;
    });
    thread.setUncaughtExceptionHandler((t, e) -> {
        // t -> the ref of the thread, e -> exception
        System.out.println(e.getMessage()); /// by zero
    });
    thread.start();
}

группа потоков

Группа потоков представляет собой набор потоков, группа потоков также может содержать другие группы потоков, а группа потоков может быть развернута в древовидной структуре.

  1. При запуске JVM файл с именемmainпоток работаетmainфункция иmainгруппа тем,mainГруппа потока потокаmainГруппа тем:

    public static void main(String[] args) {
        System.out.println(Thread.currentThread().getName());	//main
        System.out.println(Thread.currentThread().getThreadGroup().getName());	//main
    }
    
  2. При создании потока, если для потока явно не указана группа потоков, поток примет группу потоков своего родительского потока в качестве своей собственной группы потоков.

    Если группа потоков создается без явного указания ее родительской группы потоков, группа потоков текущего потока будет использоваться в качестве его родительской группы потоков.

    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            //
        });
        System.out.println(t1.getThreadGroup().getName());	//main
    
        ThreadGroup threadGroup = new ThreadGroup("MyThreadGroup");
        Thread t2 = new Thread(threadGroup, () -> {
            //
        });
        System.out.println(t2.getThreadGroup().getName());				//MyThreadGroup
        System.out.println(t2.getThreadGroup().getParent().getName());   //main
    }
    
  3. threadGroup.list()Метод может печатать информацию об уцелевших потоках в группе потоков, которую можно использовать дляdebug

    ThreadGroup threadGroup = new ThreadGroup("MyThreadGroup");
    new Thread(threadGroup, () -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
    
    new Thread(threadGroup, () -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
    threadGroup.list();
    
    java.lang.ThreadGroup[name=MyThreadGroup,maxpri=10]
        Thread[Thread-0,5,MyThreadGroup]
        Thread[Thread-1,5,MyThreadGroup]
    

БолееAPIВы можете проверить официальную документацию.

пользовательский пул потоков

Логика выполнения рабочего потока

Рабочий поток должен постоянно опрашивать очередь задач, чтобы увидеть, есть ли задача для выполнения, и выполнить ее, если она есть. Затем также предоставьте внешнему миру способ завершить текущий поток.stop, который использует общие переменные состояния и используетvolatileДекорация делает завершающие операции из внешнего мира немедленно видимыми для текущего рабочего потока.

public class Worker implements Runnable {

    private volatile boolean stop;
    private LinkedList<Runnable> taskQueue;
    private Thread currentThread;

    public Worker(LinkedList<Runnable> taskQueue) {
        this.taskQueue = taskQueue;
    }

    @Override
    public void run() {
        currentThread = Thread.currentThread();
        Runnable task = null;
        OUTER:
        while (!stop) {
            synchronized (taskQueue) {
                while (taskQueue.isEmpty()) {
                    try {
                        taskQueue.wait();
                    } catch (InterruptedException e) {
                        System.out.println(Thread.currentThread().getName()+" has been interrupted");
                        break OUTER;
                    }
                }
                task = taskQueue.removeFirst();
                taskQueue.notifyAll();
            }
            if (task != null) {
                task.run();
            }
        }
    }

    public void interrupt() {
        if (currentThread != null) {
            currentThread.interrupt();
        }
    }

    public void stop() {
        stop = true;
    }
}

Пул потоков: используется для создания потоков и управления ими.

public class ThreadPool {

    private static final int DEFAULT_THREAD_COUNT = 10;

    private int threadCount;
    private LinkedList<Worker> workQueue;
    private LinkedList<Runnable> taskQueue;

    public ThreadPool() {
        this(DEFAULT_THREAD_COUNT);
    }

    public ThreadPool(int size) {
        this.threadCount = size;
        this.workQueue = new LinkedList<>();
        this.taskQueue = new LinkedList<>();
        init(size);
    }

    //创建并启动count个线程
    private void init(int count) {
        if (count <= 0) {
            throw new IllegalArgumentException("thread pool size must greater than zero");
        }
        for (int i = 0; i < count; i++) {
            Worker worker = new Worker(taskQueue);
            Thread thread = new Thread(worker, "ThreadPool-" + i);
            thread.start();
            workQueue.add(worker);
        }
    }

    public void execute(Runnable task) {
        synchronized (taskQueue) {
            taskQueue.add(task);
            taskQueue.notifyAll();
        }
    }

    public int getThreadCount() {
        return threadCount;
    }

    public int getTaskCount() {
        return taskQueue.size();
    }

    //对wait中的线程调用stop,他也无法轮询该变量而退出循环
    //因此对于wait中的工作线程直接中断它,而正在执行的线程则等他自己轮询到stop而退出
    public void shutdown() {
        synchronized (taskQueue) {
            for (Worker worker : workQueue) {
                worker.stop();
                worker.interrupt();
            }
        }
        System.out.println("thread pool destroyed");
    }
}

контрольная работа

public class ThreadPoolTest {
    public static void main(String[] args) throws InterruptedException {
        ThreadPool threadPool = new ThreadPool();
        for (int i = 0; i < 40; i++) {
            int number = i;
            threadPool.execute(()->{
                System.out.println(Thread.currentThread().getName() + "start execute task-" + number);
                try {
                    Thread.sleep(new Random(System.currentTimeMillis()).nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        Thread.sleep(5000);
        threadPool.shutdown();
    }
}

Добавьте политику отклонения в пул потоков

Очередь работ пула потоков не должна быть бесконечно большой.Если не обращать внимания, это может привести к OOM.Поэтому, когда количество задач в очереди задач достигает определенного числа, следует принять стратегию отклонения для представленные задачи.

Здесь следует использовать режим стратегии, интерфейс стратегии:

public interface RefusePolicy {
    void refuse() throws Exception;
}

Стратегия выдачи исключения, когда количество простых задач слишком велико:

public class DiscardRefusePolicy implements RefusePolicy {

    public class TaskExceededException extends Exception {
        public TaskExceededException(String message) {
            super(message);
        }
    }

    @Override
    public void refuse() throws TaskExceededException {
        throw new TaskExceededException("task has exceeded the taskSize of thread poll");
    }
}

Модернизацияexecuteметод:

private static final int DEFAULT_THREAD_COUNT = 10;
private static final RefusePolicy DEFAULT_REFUSE_POLICY = new DiscardRefusePolicy();
private static final int DEFAULT_TASK_SIZE = 200;

private int threadCount;
private LinkedList<Worker> workQueue;
private LinkedList<Runnable> taskQueue;
private int maxTaskSize;
private RefusePolicy refusePolicy;

public ThreadPool() {
    this(DEFAULT_THREAD_COUNT, DEFAULT_TASK_SIZE, DEFAULT_REFUSE_POLICY);
}

public ThreadPool(int size, int maxTaskSize, RefusePolicy refusePolicy) {
    this.threadCount = size;
    this.maxTaskSize = maxTaskSize;
    this.workQueue = new LinkedList<>();
    this.taskQueue = new LinkedList<>();
    this.refusePolicy = refusePolicy;
    init(size);
}

public void execute(Runnable task) throws Exception {
    synchronized (taskQueue) {
        if (taskQueue.size() >= maxTaskSize) {
            refusePolicy.refuse();
            return;
        }
        taskQueue.add(task);
        taskQueue.notifyAll();
    }
}

контрольная работа

public static void main(String[] args) throws InterruptedException {
    ThreadPool threadPool = new ThreadPool();
    for (int i = 0; i < 300; i++) {
        int number = i;
        try {
            threadPool.execute(() -> {
                System.out.println(Thread.currentThread().getName() + "start execute task-" + number);
                try {
                    Thread.sleep(new Random(System.currentTimeMillis()).nextInt(100));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        } catch (Exception e) {
            System.out.println("task-" + i + " execution error : " + e.getMessage());
        }
    }

    Thread.sleep(5000);
    threadPool.shutdown();
}