Little Leopard показывает исходный код: Пул потоков Java (3) Отправка задач

Java задняя часть исходный код Безопасность

Связывание:предыдущий постМаленький леопард рассказал о процессе создания экземпляра пула потоков и кратко представил переход состояния пула потоков; в этой статье в основном рассказывается о небольших проблемах, с которыми я столкнулся при запуске пула потоков, иexecuteПонимание исходного кода метода.

4 не сложная ошибка

Согласно нашему плану, следующим шагом будет отправка задач для изучения внутреннего поведения пула потоков при выполнении задач, но сначала я хочу отправить задачу. Итак, следуя коду предыдущей статьи, я отправил задачу:

@Test
public void submitTest() {
    // 创建线程池
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, 
        new LinkedBlockingQueue<Runnable>(), 
        new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread();
            }
        }, new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println("拒绝服务");
        }
    });
    // 提交任务,该任务为睡眠 1 秒后打印 Hello
    threadPoolExecutor.submit(new Callable<String>() {
        @Override
        public String call() throws InterruptedException {
            Thread.sleep(1000L);
            System.out.println("Hello");
            return null;
        }
    });
}

И я не вижу никакого вывода, и программа не засыпает ни на секунду, а сразу завершается. Ах да, я вспомнил, что поток по умолчанию, который мы создаем, этоНить демона, когда завершатся все пользовательские потоки, программа завершится, независимо от того, запущены ли потоки демона. Тогда воспользуемся простым и легким способом решения этой проблемы — не даем завершиться пользовательскому потоку, даем ему поспать какое-то время:

@Test
public void submitTest() throws InterruptedException {
    // 创建线程池
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, 
        new LinkedBlockingQueue<Runnable>(), 
        new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread();
            }
        }, new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println("拒绝服务");
        }
    });
    // 提交任务,该任务为睡眠 1 秒后打印 Hello
    threadPoolExecutor.submit(new Callable<String>() {
        @Override
        public String call() throws InterruptedException {
            Thread.sleep(1000L);
            System.out.println("Hello");
            return null;
        }
    });
    // 使主线程休眠 5 秒,防止守护线程意外退出
    Thread.sleep(5000L);
}

Однако после того, как программа подождет 5 секунд, вывода по-прежнему нет. Моей первой реакцией было то, что я неправильно использую пул потоков. Мне все еще нужно вызвать какой-то метод для «активации» или «запуска» пула потоков? И ни в документации, ни в примерах различных блогов я не нашел подобного способа. Давайте внимательно подумаем об этой ошибке.Есть три возможные причины этой проблемы:

  1. ThreadPoolExecutorПроблема с внутренним кодом
  2. я правThreadPoolExecutorиспользуется неправильно
  3. я разработалThreadFactoryилиRejectedExecutionHandlerЧто-то не так

Причина 1, вероятность слишком мала, почти нет. Итак, по причинам 2 и 3 мы не можем исключить их прямо сейчас, поэтому я пытаюсь создать минимальную воспроизводимую ошибку, котораяThreadPoolExecutorРазденьте его и посмотрите, воспроизводится ли ошибка:

Идея минимальной воспроизводимости - это то, что я перевожу«Разработка простого веб-приложения на Rust, часть 4 — разбор параметров CLI», идеи, использованные автором. То есть, когда мы не можем найти ошибку, мы удаляем ту часть текущего кода, которую мы считаем нерелевантной, наблюдаем, повторяется ли ошибка после удаления, и постепенно сужаем масштаб ошибки. С точки зрения непрофессионала, это метод исключения.

private class MyThreadFactory implements ThreadFactory{
    @Override
    public Thread newThread(Runnable r) {
        return new Thread();
    }
}

@Test
public void reproducibleTest() throws InterruptedException {
    new MyThreadFactory().newThread(new Runnable() {
        @Override
        public void run() {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Hello");
        }
    }).start();
    Thread.sleep(5000L);
}

По-прежнему нет вывода, но это хорошая новость, это означает, что мы обнаружили проблему: теперь проблема может возникнуть только вMyThreadFactoryЧто не так с 6 строками кода? Упс (хлопает по бедру), я не ставилRunnable rперейти кnew Thread()Ах, я выполнял пустой поток, какой может быть выход! тогда:return new Thread(r);Просто измените это.

5 Рефакторинг

Вышеупомянутая проблема кажется простой, но над такой низкоуровневой ошибкой стоит задуматься. Я получил эту ошибку по двум причинам:

  1. Я не понимаюThreadPoolExecutorПринцип с грамматической точки зренияThreadFactoryКласс реализации должен только передатьThreadПример подойдет, но я не знаюRunnable rНезаменим.
  2. Структура тестового кода беспорядочна. Даже тестовый код не надо писать как лапшу, Вам непонятно, а что же читателям?

Итак, я решил провести рефакторинг тестового кода. В этом рефакторинге первое, что нужно сделать, это заставить фабрику потоков генерироватьпоток без демона, чтобы предотвратить неожиданное завершение всех потоков в пуле потоков из-за выхода основного процесса; во-вторых, для логирования каждой операции мы должны иметь возможность интуитивно наблюдать за тем, что делает пул потоков. лог работы очереди блокировки, я использовалДинамический проксиСпособ логировать каждый метод, детская обувь, кто не знаком с динамическими агентами может тыкнуть то, что я писал ранееLittle Leopard показывает исходный код: динамический прокси JDK.

// import...

public class ThreadPoolExecutorTest {
    /**
     * 记录启动时间
     */
    private final static long START_TIME = System.currentTimeMillis();

    /**
     * 自定义线程工厂,产生非守护线程,并打印日志
     */
    private class MyThreadFactory implements ThreadFactory {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setDaemon(false);
            debug("创建线程 - %s", thread.getName());
            return thread;
        }
    }

    /**
     * 自定义拒绝服务异常处理器,打印拒绝服务信息
     */
    private class MyRejectedExecutionHandler implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            debug("拒绝请求,Runnable:%s,ThreadPoolExecutor:%s", r, executor);
        }
    }

    /**
     * 自定义任务,休眠 1 秒后打印当前线程名,并返回线程名
     */
    private class MyTask implements Callable<String> {

        @Override
        public String call() throws InterruptedException {
            Thread.sleep(1000L);
            String threadName = Thread.currentThread().getName();
            debug("MyTask - %s", threadName);
            return threadName;
        }
    }

    /**
     * 对 BlockingQueue 的动态代理,实现对 BlockingQueue 的所有方法调用打 Log
     */
    private class PrintInvocationHandler implements InvocationHandler {
        private final BlockingQueue<?> blockingQueue;

        private PrintInvocationHandler(BlockingQueue<?> blockingQueue) {
            this.blockingQueue = blockingQueue;
        }

        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            debug("BlockingQueue - %s,参数为:%s", method.getName(), Arrays.toString(args));
            Object result = method.invoke(blockingQueue, args);
            debug("BlockingQueue - %s 执行完毕,返回值为:%s", method.getName(), String.valueOf(result));
            return result;
        }
    }

    /**
     * 产生 BlockingQueue 代理类
     * @param blockingQueue 原 BlockingQueue
     * @param <E> 任意类型
     * @return 动态代理 BlockingQueue,执行任何方法时会打 Log
     */
    @SuppressWarnings("unchecked")
    private <E> BlockingQueue<E> debugQueue(BlockingQueue<E> blockingQueue) {
        return (BlockingQueue<E>) Proxy.newProxyInstance(this.getClass().getClassLoader(),
                new Class<?>[]{BlockingQueue.class},
                new PrintInvocationHandler(blockingQueue));
    }

    /**
     * 实例化一个 核心池为 3,最大池为 5,存活时间为 20s,利用上述阻塞队列、线程工厂、拒绝服务处理器的线程池实例
     * @return 返回 ThreadPoolExecutor 实例
     */
    private ThreadPoolExecutor newTestPoolInstance() {
        return new ThreadPoolExecutor(3, 5, 20,
                TimeUnit.SECONDS, debugQueue(new LinkedBlockingQueue<>()),
                new MyThreadFactory(), new MyRejectedExecutionHandler());
    }

    /**
     * 向控制台打印日志,自动输出时间,线程等信息
     * @param info
     * @param arg
     */
    private void debug(String info, Object... arg) {
        long time = System.currentTimeMillis() - START_TIME;
        System.out.println(String.format(((double) time / 1000) + "-" + Thread.currentThread().getName() + "-" + info, arg));
    }

    /**
     * 测试实例化操作
     */
    private void newInstanceTest() {
        newTestPoolInstance();
    }

    /**
     * 测试提交操作,提交 10 次任务
     */
    private void submitTest() {
        ThreadPoolExecutor threadPool = newTestPoolInstance();
        for (int i = 0; i < 10; i++) {
            threadPool.submit(new MyTask());
        }
    }

    public static void main(String[] args) {
        ThreadPoolExecutorTest test = new ThreadPoolExecutorTest();
        test.submitTest();
    }
}

скомпилировать, запустить =>

0.047-main-创建线程 - Thread-0
0.064-main-创建线程 - Thread-1
0.064-main-创建线程 - Thread-2
0.064-main-BlockingQueue - offer,参数为:[java.util.concurrent.FutureTask@4d7e1886]
0.064-main-BlockingQueue - offer 执行完毕,返回值为:true
0.064-main-BlockingQueue - offer,参数为:[java.util.concurrent.FutureTask@3cd1a2f1]
0.065-main-BlockingQueue - offer 执行完毕,返回值为:true
0.065-main-BlockingQueue - offer,参数为:[java.util.concurrent.FutureTask@2f0e140b]
0.065-main-BlockingQueue - offer 执行完毕,返回值为:true
0.065-main-BlockingQueue - offer,参数为:[java.util.concurrent.FutureTask@7440e464]
0.065-main-BlockingQueue - offer 执行完毕,返回值为:true
0.065-main-BlockingQueue - offer,参数为:[java.util.concurrent.FutureTask@49476842]
0.065-main-BlockingQueue - offer 执行完毕,返回值为:true
0.065-main-BlockingQueue - offer,参数为:[java.util.concurrent.FutureTask@78308db1]
0.065-main-BlockingQueue - offer 执行完毕,返回值为:true
0.065-main-BlockingQueue - offer,参数为:[java.util.concurrent.FutureTask@27c170f0]
0.065-main-BlockingQueue - offer 执行完毕,返回值为:true
1.065-Thread-1-MyTask - Thread-1
1.065-Thread-0-MyTask - Thread-0
1.065-Thread-2-MyTask - Thread-2
1.065-Thread-1-BlockingQueue - take,参数为:null
1.065-Thread-0-BlockingQueue - take,参数为:null
1.065-Thread-2-BlockingQueue - take,参数为:null
1.065-Thread-0-BlockingQueue - take 执行完毕,返回值为:java.util.concurrent.FutureTask@3cd1a2f1
1.065-Thread-2-BlockingQueue - take 执行完毕,返回值为:java.util.concurrent.FutureTask@2f0e140b
1.065-Thread-1-BlockingQueue - take 执行完毕,返回值为:java.util.concurrent.FutureTask@4d7e1886
2.065-Thread-1-MyTask - Thread-1
2.065-Thread-2-MyTask - Thread-2
2.065-Thread-0-MyTask - Thread-0
2.065-Thread-1-BlockingQueue - take,参数为:null
2.065-Thread-2-BlockingQueue - take,参数为:null
2.065-Thread-0-BlockingQueue - take,参数为:null
2.065-Thread-1-BlockingQueue - take 执行完毕,返回值为:java.util.concurrent.FutureTask@7440e464
2.065-Thread-2-BlockingQueue - take 执行完毕,返回值为:java.util.concurrent.FutureTask@49476842
2.065-Thread-0-BlockingQueue - take 执行完毕,返回值为:java.util.concurrent.FutureTask@78308db1
3.066-Thread-1-MyTask - Thread-1
3.066-Thread-2-MyTask - Thread-2
3.066-Thread-0-MyTask - Thread-0
3.066-Thread-2-BlockingQueue - take,参数为:null
3.066-Thread-1-BlockingQueue - take,参数为:null
3.066-Thread-0-BlockingQueue - take,参数为:null
3.066-Thread-2-BlockingQueue - take 执行完毕,返回值为:java.util.concurrent.FutureTask@27c170f0
4.067-Thread-2-MyTask - Thread-2
4.067-Thread-2-BlockingQueue - take,参数为:null

Формат лога: время (секунды) - имя потока - информация

Из вывода журнала мы можем узнать:

  • Когда очередь пуста и количество потоков меньше количества основных потоков, отправка задачи вызовет создание потока и немедленное выполнение задачи.
  • Когда основные потоки заняты, повторно отправленный запрос будет сохранен в очереди блокировки, а задачи в очереди будут выполняться после ожидания бездействия потока.
  • Всегда есть только три рабочих потока в дополнение к основному потоку.
  • Когда очередь пуста, а рабочий поток все еще выполняется, рабочий поток заблокирует очередь, потому чтоtakeМетод заблокирован (это видно по следующим строкам лога, только лог вызовов, лог завершенных вызовов нет)

Из этого я создаюсомневаться: Почему всегда только три потока? Разве моя установка не «основной пул — 3, максимальный пул — 5»? Почему работают только три потока?

6 отправить задачи

Наконец-то начали смотреть исходники, используемsubmitВ качестве точки входа изучите, что делает пул потоков, когда мы отправляем задачу,submitСам метод очень прост, он заключается в том, чтобы инкапсулировать входящие параметры какRunnableFutureэкземпляр, а затем вызовитеexecuteметод, приведенный нижеsubmitОдин из нескольких перегруженных методов:

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

Итак, давайте двигаться дальшеexecuteкод:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

Давайте сначала объяснимaddWorkerметод, на данный момент нам нужно только понять несколько вещей, чтобы понятьexecuteкод вверх:

  • Этот метод используется для создания нового рабочего потока.
  • Этот метод является потокобезопасным
  • Первый параметр этого метода — это первая задача, которую должен выполнить новый поток, а второй параметр — создать ли новый основной поток.
  • Этот метод возвращает значение, если новый поток выполнен успешноtrue, иначе возвратfalse

Итак, давайте вернемся и поймемexecuteКод:

Чтобы помочь понять, я нарисовал блок-схему, основанную на логике кода:

execute 方法流程图

Теперь я понимаю, что он будет создан только в том случае, если ожидание вставки в очередь не удастся (например, при достижении предела емкости и т. д.).непрофильныйпотоки для обработки задач, то есть мы используемLinkedBlockingQueueОчередь используется как очередь ожидания, которая не виднанепрофильныйФеномен создаваемого потока.

Внимательные читатели могли заметить, что весь процесс не заблокирован.Как обеспечить безопасность параллелизма? Соблюдаем этот код, на самом деле блокировать все не обязательно, нужно только убедитьсяaddWorker,removeиworkQueue.offerПотокобезопасность трех методов, этот метод не нужно блокировать. Фактически, вaddWorkerВ нем происходит перепроверка состояния пула потоков, если создание не удастся, он вернет false.

серия статей

Маленький Леопард учится еще на третьем курсе. Маленький Леопард надеется, что вы сможете прочитать эту статью "критически" и покритиковать неточность и неуместное содержание этой статьи. Маленький Леопард благодарен.