Глубокое понимание взаимодействия потоков

интервью Java задняя часть JVM

предисловие

При разработке неизбежно встречаются сценарии, требующие, чтобы все подпотоки завершили выполнение и уведомляли основной поток об обработке некоторой логики.

Или поток A выполняет условие, чтобы уведомить поток B о выполнении операции.

Этого можно достичь следующими способами:

механизм уведомления об ожидании

Режим ожидания уведомления — это классический метод взаимодействия потоков в Java.

Два потока обмениваются данными, вызывая методы wait() и notify() для одного и того же объекта.

Например, два потока поочередно печатают нечетные и четные числа:

public class TwoThreadWaitNotify {

    private int start = 1;

    private boolean flag = false;

    public static void main(String[] args) {
        TwoThreadWaitNotify twoThread = new TwoThreadWaitNotify();

        Thread t1 = new Thread(new OuNum(twoThread));
        t1.setName("A");


        Thread t2 = new Thread(new JiNum(twoThread));
        t2.setName("B");

        t1.start();
        t2.start();
    }

    /**
     * 偶数线程
     */
    public static class OuNum implements Runnable {
        private TwoThreadWaitNotify number;

        public OuNum(TwoThreadWaitNotify number) {
            this.number = number;
        }

        @Override
        public void run() {

            while (number.start <= 100) {
                synchronized (TwoThreadWaitNotify.class) {
                    System.out.println("偶数线程抢到锁了");
                    if (number.flag) {
                        System.out.println(Thread.currentThread().getName() + "+-+偶数" + number.start);
                        number.start++;

                        number.flag = false;
                        TwoThreadWaitNotify.class.notify();

                    }else {
                        try {
                            TwoThreadWaitNotify.class.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }

            }
        }
    }


    /**
     * 奇数线程
     */
    public static class JiNum implements Runnable {
        private TwoThreadWaitNotify number;

        public JiNum(TwoThreadWaitNotify number) {
            this.number = number;
        }

        @Override
        public void run() {
            while (number.start <= 100) {
                synchronized (TwoThreadWaitNotify.class) {
                    System.out.println("奇数线程抢到锁了");
                    if (!number.flag) {
                        System.out.println(Thread.currentThread().getName() + "+-+奇数" + number.start);
                        number.start++;

                        number.flag = true;

                        TwoThreadWaitNotify.class.notify();
                    }else {
                        try {
                            TwoThreadWaitNotify.class.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }
}

Выходной результат:

t2+-+奇数93
t1+-+偶数94
t2+-+奇数95
t1+-+偶数96
t2+-+奇数97
t1+-+偶数98
t2+-+奇数99
t1+-+偶数100

Здесь и поток A, и поток B имеют доступ к одному и тому же объекту.TwoThreadWaitNotify.classЧтобы получить блокировку, поток A вызывает метод wait() объекта синхронизации, чтобы снять блокировку и войти вWAITINGгосударство.

Поток B вызывает метод notify(), чтобы поток A мог вернуться из метода wait() после получения уведомления.

используется здесьTwoThreadWaitNotify.classОбъект завершает общение.

Есть некоторые вещи, на которые стоит обратить внимание:

  • Предпосылкой вызовов wait(), nofify() и nofityAll() является получение блокировки объекта (также называемого монитором объекта).
  • После вызова метода wait() поток снимет блокировку и войдет вWAITINGсостояние, поток также перемещается вочередь ожиданиясередина.
  • Вызов метода notify()очередь ожиданияТемы перемещаются вочередь синхронизации, состояние потока также обновляется доBLOCKED
  • Предпосылкой возврата из метода wait() является то, что поток, вызывающий метод notify(), освобождает блокировку, а поток, вызывающий метод wait(), получает блокировку.

Ожидание уведомления имеет классическую парадигму:

Поток A как потребитель:

  1. Получает блокировку объекта.
  2. Введите время (условие оценки) и вызовите метод wait().
  3. Когда условие выполнено, выйти из цикла и выполнить определенную логику обработки.

Тема B как производитель:

  1. Получите блокировку объекта.
  2. Измените условие оценки, совместно используемое с потоком A.
  3. Вызовите метод уведомления().

Псевдокод выглядит следующим образом:

//Thread A

synchronized(Object){
    while(条件){
        Object.wait();
    }
    //do something
}

//Thread B
synchronized(Object){
    条件=false;//改变条件
    Object.notify();
}

метод присоединения()

    private static void join() throws InterruptedException {
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                LOGGER.info("running");
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }) ;
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                LOGGER.info("running2");
                try {
                    Thread.sleep(4000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }) ;

        t1.start();
        t2.start();

        //等待线程1终止
        t1.join();

        //等待线程2终止
        t2.join();

        LOGGER.info("main over");
    }

Выходной результат:

2018-03-16 20:21:30.967 [Thread-1] INFO  c.c.actual.ThreadCommunication - running2
2018-03-16 20:21:30.967 [Thread-0] INFO  c.c.actual.ThreadCommunication - running
2018-03-16 20:21:34.972 [main] INFO  c.c.actual.ThreadCommunication - main over

существуетt1.join()будет блокироваться до тех пор, пока t1 не завершит выполнение, поэтому в конечном итоге основной поток будет ждать завершения выполнения потоков t1 и t2.

На самом деле, из исходного кода видно, что join() также использует механизм ожидающих уведомлений:

Основная логика:

    while (isAlive()) {
        wait(0);
    }

После завершения потока соединения вызывается метод notifyAll(), который вызывается в реализации JVM, поэтому здесь его не видно.

энергозависимая разделяемая память

Поскольку Java использует разделяемую память для взаимодействия потоков, поток A может быть закрыт с основным потоком следующими способами:

public class Volatile implements Runnable{

    private static volatile boolean flag = true ;

    @Override
    public void run() {
        while (flag){
            System.out.println(Thread.currentThread().getName() + "正在运行。。。");
        }
        System.out.println(Thread.currentThread().getName() +"执行完毕");
    }

    public static void main(String[] args) throws InterruptedException {
        Volatile aVolatile = new Volatile();
        new Thread(aVolatile,"thread A").start();


        System.out.println("main 线程正在运行") ;

        TimeUnit.MILLISECONDS.sleep(100) ;

        aVolatile.stopThread();

    }

    private void stopThread(){
        flag = false ;
    }
}

Выходной результат:

thread A正在运行。。。
thread A正在运行。。。
thread A正在运行。。。
thread A正在运行。。。
thread A执行完毕

Флаг здесь хранится в основной памяти, поэтому его могут видеть как основной поток, так и поток A.

Флаг изменен с помощью volatile в основном для видимости памяти, более подробную информацию можно просмотретьздесь.

Инструмент параллелизма CountDownLatch

CountDownLatch может выполнять ту же функцию, что и соединение, но более гибко.

    private static void countDownLatch() throws Exception{
        int thread = 3 ;
        long start = System.currentTimeMillis();
        final CountDownLatch countDown = new CountDownLatch(thread);
        for (int i= 0 ;i<thread ; i++){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    LOGGER.info("thread run");
                    try {
                        Thread.sleep(2000);
                        countDown.countDown();

                        LOGGER.info("thread end");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }

        countDown.await();
        long stop = System.currentTimeMillis();
        LOGGER.info("main over total time={}",stop-start);
    }

Выходной результат:

2018-03-16 20:19:44.126 [Thread-0] INFO  c.c.actual.ThreadCommunication - thread run
2018-03-16 20:19:44.126 [Thread-2] INFO  c.c.actual.ThreadCommunication - thread run
2018-03-16 20:19:44.126 [Thread-1] INFO  c.c.actual.ThreadCommunication - thread run
2018-03-16 20:19:46.136 [Thread-2] INFO  c.c.actual.ThreadCommunication - thread end
2018-03-16 20:19:46.136 [Thread-1] INFO  c.c.actual.ThreadCommunication - thread end
2018-03-16 20:19:46.136 [Thread-0] INFO  c.c.actual.ThreadCommunication - thread end
2018-03-16 20:19:46.136 [main] INFO  c.c.actual.ThreadCommunication - main over total time=2012

CountDownLatch также реализован на основе AQS (AbstractQueuedSynchronizer), дополнительные сведения о реализацииПринцип реализации ReentrantLock

  • Сообщите параллельным потокам, когда вы инициализируете CountDownLatch, а затем вызовите метод countDown() после завершения обработки каждого потока.
  • Этот метод устанавливает состояние, встроенное в AQS, в -1 .
  • В конце концов, в основном потоке вызывается метод await(), который блокируется до тех пор, покаstate == 0вернуться, когда.

Инструмент параллелизма CyclicBarrier

    private static void cyclicBarrier() throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3) ;

        new Thread(new Runnable() {
            @Override
            public void run() {
                LOGGER.info("thread run");
                try {
                    cyclicBarrier.await() ;
                } catch (Exception e) {
                    e.printStackTrace();
                }

                LOGGER.info("thread end do something");
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                LOGGER.info("thread run");
                try {
                    cyclicBarrier.await() ;
                } catch (Exception e) {
                    e.printStackTrace();
                }

                LOGGER.info("thread end do something");
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                LOGGER.info("thread run");
                try {
                    Thread.sleep(5000);
                    cyclicBarrier.await() ;
                } catch (Exception e) {
                    e.printStackTrace();
                }

                LOGGER.info("thread end do something");
            }
        }).start();

        LOGGER.info("main thread");
    }

CyclicBarrier Китайское название называется барьером или забором, а также может использоваться для межпотокового взаимодействия.

Это приводит к ожиданию N потоков, которые достигнут определенного состояния и продолжат работу.

  1. Сначала инициализируйте участника потока.
  2. перечислитьawait()Будет ждать, пока не будут вызваны все потоки-участники.
  3. пока все участники не позовутawait()После этого все темы изawait()Вернитесь, чтобы продолжить последующую логику.

результат операции:

2018-03-18 22:40:00.731 [Thread-0] INFO  c.c.actual.ThreadCommunication - thread run
2018-03-18 22:40:00.731 [Thread-1] INFO  c.c.actual.ThreadCommunication - thread run
2018-03-18 22:40:00.731 [Thread-2] INFO  c.c.actual.ThreadCommunication - thread run
2018-03-18 22:40:00.731 [main] INFO  c.c.actual.ThreadCommunication - main thread
2018-03-18 22:40:05.741 [Thread-0] INFO  c.c.actual.ThreadCommunication - thread end do something
2018-03-18 22:40:05.741 [Thread-1] INFO  c.c.actual.ThreadCommunication - thread end do something
2018-03-18 22:40:05.741 [Thread-2] INFO  c.c.actual.ThreadCommunication - thread end do something

Видно, что поскольку один из потоков спит в течение пяти секунд, все остальные потоки должны ждать вызова этого потока.await().

Этот инструмент может выполнять ту же функцию, что и CountDownLatch, но он более гибкий. можно даже позвонитьreset()Метод сбрасывает CyclicBarrier (вам нужно самостоятельно поймать BrokenBarrierException) и повторно выполнить его.

Поток отвечает на прерывание

public class StopThread implements Runnable {
    @Override
    public void run() {

        while ( !Thread.currentThread().isInterrupted()) {
            // 线程执行具体逻辑
            System.out.println(Thread.currentThread().getName() + "运行中。。");
        }

        System.out.println(Thread.currentThread().getName() + "退出。。");

    }

    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(new StopThread(), "thread A");
        thread.start();

        System.out.println("main 线程正在运行") ;

        TimeUnit.MILLISECONDS.sleep(10) ;
        thread.interrupt();
    }


}

Выходной результат:

thread A运行中。。
thread A运行中。。
thread A退出。。

Вы можете общаться, прерывая поток, вызываяthread.interrupt()На самом деле метод заключается в том, чтобы установить для атрибута флага в потоке значение true.

Это не означает, что поток может быть прерван вызовом этого метода, если вы не отреагируете на этот флаг, он не будет иметь никакого эффекта (здесь оценивается этот флаг).

Но если выдается InterruptedException, JVM сбрасывает флаг в false.

Метод пула потоков awaitTermination()

Если вы используете пул потоков для управления потоками, вы можете использовать следующие методы, чтобы основной поток ожидал завершения выполнения всех задач в пуле потоков:

    private static void executorService() throws Exception{
        BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(10) ;
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5,5,1, TimeUnit.MILLISECONDS,queue) ;
        poolExecutor.execute(new Runnable() {
            @Override
            public void run() {
                LOGGER.info("running");
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        poolExecutor.execute(new Runnable() {
            @Override
            public void run() {
                LOGGER.info("running2");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        poolExecutor.shutdown();
        while (!poolExecutor.awaitTermination(1,TimeUnit.SECONDS)){
            LOGGER.info("线程还在执行。。。");
        }
        LOGGER.info("main over");
    }

Выходной результат:

2018-03-16 20:18:01.273 [pool-1-thread-2] INFO  c.c.actual.ThreadCommunication - running2
2018-03-16 20:18:01.273 [pool-1-thread-1] INFO  c.c.actual.ThreadCommunication - running
2018-03-16 20:18:02.273 [main] INFO  c.c.actual.ThreadCommunication - 线程还在执行。。。
2018-03-16 20:18:03.278 [main] INFO  c.c.actual.ThreadCommunication - 线程还在执行。。。
2018-03-16 20:18:04.278 [main] INFO  c.c.actual.ThreadCommunication - main over

использовать этоawaitTermination()Посылка метода должна закрыть пул потоков, например вызовshutdown()метод.

называетсяshutdown()После этого пул потоков перестанет принимать новые задачи и плавно закроет существующие задачи в пуле потоков.

Трубопроводная связь

    public static void piped() throws IOException {
        //面向于字符 PipedInputStream 面向于字节
        PipedWriter writer = new PipedWriter();
        PipedReader reader = new PipedReader();

        //输入输出流建立连接
        writer.connect(reader);


        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                LOGGER.info("running");
                try {
                    for (int i = 0; i < 10; i++) {

                        writer.write(i+"");
                        Thread.sleep(10);
                    }
                } catch (Exception e) {

                } finally {
                    try {
                        writer.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }

            }
        });
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                LOGGER.info("running2");
                int msg = 0;
                try {
                    while ((msg = reader.read()) != -1) {
                        LOGGER.info("msg={}", (char) msg);
                    }

                } catch (Exception e) {

                }
            }
        });
        t1.start();
        t2.start();
    }

Выходной результат:

2018-03-16 19:56:43.014 [Thread-0] INFO  c.c.actual.ThreadCommunication - running
2018-03-16 19:56:43.014 [Thread-1] INFO  c.c.actual.ThreadCommunication - running2
2018-03-16 19:56:43.130 [Thread-1] INFO  c.c.actual.ThreadCommunication - msg=0
2018-03-16 19:56:43.132 [Thread-1] INFO  c.c.actual.ThreadCommunication - msg=1
2018-03-16 19:56:43.132 [Thread-1] INFO  c.c.actual.ThreadCommunication - msg=2
2018-03-16 19:56:43.133 [Thread-1] INFO  c.c.actual.ThreadCommunication - msg=3
2018-03-16 19:56:43.133 [Thread-1] INFO  c.c.actual.ThreadCommunication - msg=4
2018-03-16 19:56:43.133 [Thread-1] INFO  c.c.actual.ThreadCommunication - msg=5
2018-03-16 19:56:43.133 [Thread-1] INFO  c.c.actual.ThreadCommunication - msg=6
2018-03-16 19:56:43.134 [Thread-1] INFO  c.c.actual.ThreadCommunication - msg=7
2018-03-16 19:56:43.134 [Thread-1] INFO  c.c.actual.ThreadCommunication - msg=8
2018-03-16 19:56:43.134 [Thread-1] INFO  c.c.actual.ThreadCommunication - msg=9

Хотя Java основан на обмене данными с памятью, также можно использовать канальный обмен данными.

Следует отметить, что сначала необходимо соединить входной поток и выходной поток. Таким образом, поток B может получить сообщение, отправленное потоком A.

В реальной разработке вы можете гибко выбирать наиболее подходящий метод связи потоков в соответствии с вашими потребностями.

Дополнительный

Недавно я обобщил некоторые знания, связанные с Java, и заинтересованные друзья могут поддерживать их вместе.

адрес:GitHub.com/crossover J я…