Пул потоков визуального мониторинга JMX

Java

Я прочитал код компании два дня назад и увидел использование JMX для мониторинга информации и состояния временных задач.Слово JMX было очень знакомым, поэтому я пошел проверить его и написал демонстрацию для мониторинга пула потоков.

Прочитав эту статью, вы узнаете:

  • Введение в JMX
  • Введение в пулы потоков
  • Приложение JMX для мониторинга пула потоков

Что такое JMX

Введение в JMX

JMX(Java Management Extensions), мониторинг структуры управления с помощьюJMXПриложения можно отслеживать и управлять ими.JMXНаиболее распространенный сценарий — мониторингJavaОсновная информация и работа с программой, любыеJavaпрограммы можно открыватьJMX, затем используйтеJConsoleилиVisual VMсделать предварительный просмотр

Архитектура JMX

Он разделен на три уровня: уровень распространения, уровень агента и уровень устройства.
слой распределения: Интерфейс управления различными операциями на уровне агента определяется по разным протоколам.Проще говоря, это способ просмотра индикаторов мониторинга, который можноHTTPсоединять,RMIсоединять,SNMPсоединять
прокси-уровень:управлятьMBean, поставивMBeanЗарегистрируйтесь в реализации прокси-уровняMBeanуправление, помимо регистрацииMBean, вы также можете зарегистрироватьсяAdapter, уровень прокси обычно используется в приложенииMBeanService
Уровень устройства: классы, абстрагированные от индикаторов мониторинга, можно разделить на следующие категории:

  • Standard MBean
  • Dynamic MBean
  • Open MBean
  • Model MBean
  • MXBean

Общее использование в приложенияхStandard MBeanЕсть еще, поэтому я только представляю здесьStandard MBean,использоватьStandard MBeanНеобходимо соблюдать определенные правила, правила следующие:

  • Определите интерфейс, имя интерфейса должно бытьXXXXMBeanформат,долженотMBeanконец
  • Если интерфейсXXXXMBean, класс реализации интерфейса должен бытьMBean, иначе программа сообщит об ошибке
  • через интерфейсgetиsetМетод указывает, доступен ли индикатор мониторинга для чтения и записи. НапримерgetXXX()абстрактный метод, тоXXXявляется индикатором мониторинга.getXXX()выражатьXXXИндикаторы производительности читабельны,setXXX()Метод указывает, что индикатор мониторинга может быть записан
  • Типы параметров и возвращаемых значений могут быть только простыми ссылочными типами (например,String) и базовые типы данных, а не пользовательские типы, если возвращаемое значение является настраиваемым типом, вы можете выбратьMXBean

Краткое введение в пулы потоков

Пул потоков — это инструмент управления потоками. С помощью пула потоков можно повторно использовать потоки, чтобы снизить потребление ресурсов, повысить скорость отклика и улучшить управляемость потоками. Если в системе используется большое количество пулов потоков, необходимо отслеживать пул потоков, чтобы обнаруживать проблемы при возникновении ошибок. Это можно отслеживать с помощью параметров, предоставляемых пулом потоков.Параметры, предоставляемые пулом потоков, следующие:

метод значение
getActiveCount Количество потоков, выполняющих задачи в пуле потоков
getCompletedTaskCount Количество задач, выполненных пулом потоков
getCorePoolSize Количество основных потоков в пуле потоков
getLargestPoolSize Максимальное количество потоков, когда-либо созданных пулом потоков
getMaximumPoolSize Максимальное количество потоков в пуле потоков
getPoolSize Текущее количество потоков в пуле потоков
getTaskCount Количество задач, которые должен выполнить пул потоков

применение

ВведениеJMXИ после пула потоков напишитеJMXконтролировать пул потоковDemo, вы не можете говорить об этом на бумаге.

  • Определите класс мониторинга пула потоков:ThreadPoolMonitor.java

    public class ThreadPoolMonitor extends ThreadPoolExecutor {
        private final Logger logger = LoggerFactory.getLogger(getClass());
    
        /**
         * ActiveCount
         * */
        int ac = 0;
    
        /**
         * 当前所有线程消耗的时间
         * */
        private AtomicLong totalCostTime = new AtomicLong();
    
        /**
         * 当前执行的线程总数
         * */
        private AtomicLong totalTasks = new AtomicLong();
    
        /**
         * 线程池名称
         */
        private String poolName;
    
        /**
         * 最短 执行时间
         * */
        private long minCostTime;
    
        /**
         * 最长执行时间
         * */
        private long maxCostTime;
    
    
        /**
         * 保存任务开始执行的时间
         */
        private ThreadLocal<Long> startTime = new ThreadLocal<>();
    
        public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                               TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
              Executors.defaultThreadFactory(), poolName);
        }
    
        public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                               TimeUnit unit, BlockingQueue<Runnable> workQueue,
                               ThreadFactory threadFactory, String poolName) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
            this.poolName = poolName;
        }
    
        public static ExecutorService newFixedThreadPool(int nThreads, String poolName) {
            return new ThreadPoolMonitor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), poolName);
        }
    
        public static ExecutorService newCachedThreadPool(String poolName) {
            return new ThreadPoolMonitor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), poolName);
        }
    
        public static ExecutorService newSingleThreadExecutor(String poolName) {
            return new ThreadPoolMonitor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), poolName);
        }
    
        /**
         * 线程池延迟关闭时(等待线程池里的任务都执行完毕),统计线程池情况
         */
        @Override
        public void shutdown() {
            // 统计已执行任务、正在执行任务、未执行任务数量
            logger.info("{} Going to shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
              this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
            super.shutdown();
        }
    
        @Override
        public List<Runnable> shutdownNow() {
            // 统计已执行任务、正在执行任务、未执行任务数量
            logger.info("{} Going to immediately shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
              this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
            return super.shutdownNow();
        }
    
        /**
         * 任务执行之前,记录任务开始时间
         */
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            startTime.set(System.currentTimeMillis());
        }
    
        /**
         * 任务执行之后,计算任务结束时间
         */
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            long costTime = System.currentTimeMillis() - startTime.get();
            startTime.remove();  //删除,避免占用太多内存
            //设置最大最小执行时间
            maxCostTime = maxCostTime > costTime ? maxCostTime : costTime;
            if (totalTasks.get() == 0) {
                minCostTime = costTime;
            }
            minCostTime = minCostTime < costTime ? minCostTime : costTime;
            totalCostTime.addAndGet(costTime);
            totalTasks.incrementAndGet();
    
            logger.info("{}-pool-monitor: " +
                          "Duration: {} ms, PoolSize: {}, CorePoolSize: {}, ActiveCount: {}, " +
                          "Completed: {}, Task: {}, Queue: {}, LargestPoolSize: {}, " +
                          "MaximumPoolSize: {},  KeepAliveTime: {}, isShutdown: {}, isTerminated: {}",
                    this.poolName,
                    costTime, this.getPoolSize(), this.getCorePoolSize(), super.getActiveCount(),
                    this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(),
                    this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated());
        }
    
        public int getAc() {
            return ac;
        }
    
        /**
         * 线程平均耗时
         *
         * @return
         * */
        public float getAverageCostTime() {
            return totalCostTime.get() / totalTasks.get();
        }
    
        /**
         * 线程最大耗时
         * */
        public long getMaxCostTime() {
            return maxCostTime;
        }
    
        /**
         * 线程最小耗时
         * */
        public long getMinCostTime() {
            return minCostTime;
        }
    
        /**
         * 生成线程池所用的线程,改写了线程池默认的线程工厂
         */
        static class EventThreadFactory implements ThreadFactory {
            private static final AtomicInteger poolNumber = new AtomicInteger(1);
            private final ThreadGroup group;
            private final AtomicInteger threadNumber = new AtomicInteger(1);
            private final String namePrefix;
    
            /**
             * 初始化线程工厂
             *
             * @param poolName 线程池名称
             */
            EventThreadFactory(String poolName) {
                SecurityManager s = System.getSecurityManager();
                group = Objects.nonNull(s) ? s.getThreadGroup() :   Thread.currentThread().getThreadGroup();
                namePrefix = poolName + "-pool-" + poolNumber.getAndIncrement() + "-thread-";
            }
    
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
                if (t.isDaemon())
                    t.setDaemon(false);
                if (t.getPriority() != Thread.NORM_PRIORITY)
                    t.setPriority(Thread.NORM_PRIORITY);
                return t;
            }
        }
    }
    

    Настройте пул потоков, унаследовав пул потоков и добавив его в конструктор.poolNameУкажите, какой это пул потоков, и заодно перепишите егоbeforeExecute,afterExecute,terminatedи так далее, вbeforeExecuteМетод записывает время выполнения пула потоков, вafterExecuteМетод вычисляет времязатратность, максимальное времязатратность, минимальное времязатратность и среднее времязатратность выполнения потока. Переписать метод генерации потока пула потоков, указав имя сгенерированного потока.

  • определитьMBean:ThreadPoolParamMBean.java
    MBeanПо сути, это не класс сущностей, а интерфейс, определяющий индикаторы мониторинга.

    public interface ThreadPoolParamMBean {
        /**
         * 线程池中正在执行任务的线程数量
         *
         * @return
         */
        int getActiveCount();
    
        /**
         * 线程池已完成的任务数量
         *
         * @return
         */
        long getCompletedTaskCount();
    
        /**
         * 线程池的核心线程数量
         *
         * @return
         */
        int getCorePoolSize();
    
        /**
         * 线程池曾经创建过的最大线程数量
         *
         * @return
         */
        int getLargestPoolSize();
    
        /**
         * 线程池的最大线程数量
         *
         * @return
         */
        int getMaximumPoolSize();
    
        /**
         * 线程池当前的线程数量
         *
         * @return
         */
        int getPoolSize();
    
        /**
         * 线程池需要执行的任务数量
         *
         * @return
         */
        long getTaskCount();
    
        /**
         * 线程最大耗时
         *
         * @return
         * */
        long getMaxCostTime();
    
        /**
         * 线程最小耗时
         *
         * @return
         * */
        long getMinCostTime();
    
        /**
         * 线程平均耗时
         *
         * @return
         * */
        float getAverageCostTime();
    }
    
  • определитьMBeanКласс реализации:ThreadPoolParam.java
    Определение является статическим MBean, поэтому класс реализации интерфейса должен соответствовать требованиям, т.е.xxxMBean, класс реализацииxxx

    public class ThreadPoolParam implements ThreadPoolParamMBean  {
        private ThreadPoolMonitor threadPoolMonitor;
    
        public ThreadPoolParam(ExecutorService es) {
            this.threadPoolMonitor = (ThreadPoolMonitor) es;
        }
    
        /**
         * 线程池中正在执行任务的线程数量
         *
         * @return
         */
        @Override
        public int getActiveCount() {
            return threadPoolMonitor.getAc();
        }
    
        /**
         * 线程池已完成的任务数量
         *
         * @return
         */
        @Override
        public long getCompletedTaskCount() {
            return threadPoolMonitor.getCompletedTaskCount();
        }
    
        /**
         * 线程池的核心线程数量
         *
         * @return
         */
        @Override
        public int getCorePoolSize() {
            return threadPoolMonitor.getCorePoolSize();
        }
    
        /**
         * 线程池曾经创建过的最大线程数量
         *
         * @return
         */
        @Override
        public int getLargestPoolSize() {
            return threadPoolMonitor.getLargestPoolSize();
        }
    
        /**
         * 线程池的最大线程数量
         *
         * @return
         */
        @Override
        public int getMaximumPoolSize() {
            return threadPoolMonitor.getMaximumPoolSize();
        }
    
        /**
         * 线程池当前的线程数量
         *
         * @return
         */
        @Override
        public int getPoolSize() {
            return threadPoolMonitor.getPoolSize();
        }
    
        /**
         * 线程池需要执行的任务数量
         *
         * @return
         */
        @Override
        public long getTaskCount() {
            return threadPoolMonitor.getTaskCount();
        }
    
        /**
         * 线程最大耗时
         *
         * @return
         * */
        @Override
        public long getMaxCostTime() {
            return threadPoolMonitor.getMaxCostTime();
        }
    
        /**
         * 线程最小耗时
         *
         * @return
         * */
        @Override
        public long getMinCostTime() {
            return threadPoolMonitor.getMinCostTime();
        }
    
        /**
         * 线程平均耗时
         *
         * @return
         * */
        @Override
        public float getAverageCostTime() {
            return threadPoolMonitor.getAverageCostTime();
        }
    }
    

    Показатели отслеживаемых параметров получаются через пул потоков

  • Тестовый класс:Test.java

    public class Test {
        private static Random random = new Random();
        public static void main(String[] args) throws MalformedObjectNameException,  InterruptedException {
            ExecutorService es1 = ThreadPoolMonitor.newCachedThreadPool("test-pool-1");
            ThreadPoolParam threadPoolParam1 = new ThreadPoolParam(es1);
    
            ExecutorService es2 = ThreadPoolMonitor.newCachedThreadPool("test-pool-2");
            ThreadPoolParam threadPoolParam2 = new ThreadPoolParam(es2);
    
            MBeanServerUtil.registerMBean(threadPoolParam1, new ObjectName("test-pool-1:type=threadPoolParam"));
            MBeanServerUtil.registerMBean(threadPoolParam2, new ObjectName("test-pool-2:type=threadPoolParam"));
    
            //http连接的方式查看监控任务
            HtmlAdaptor.start();
    
            executeTask(es1);
            executeTask(es2);
            Thread.sleep(1000 * 60 * 60);
        }
    
        private static void executeTask(ExecutorService es) {
            new Thread(() -> {
                for (int i = 0; i < 10; i++) {
                    int temp = i;
                    es.submit(() -> {
                        //随机睡眠时间
                        try {
                            Thread.sleep(random.nextInt(60) * 1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println(temp);
                    });
                }
            }).start();
        }
    }
    

    инструкция:

    • MBeanServerUtil.registerMBean()Зарегистрировать отслеживаемый класс
    • HtmlAdaptor.start()включиHTTPПросмотр задач мониторинга путем подключения

    Открыть после запуска программыhttp://localhost:8082/Как показано ниже:

нажмитеtest-pool-1внизtype=threadPoolParam

Получите последние показатели мониторинга пула потоков, обновивtest-pool-1иtype=threadPoolParamЭти свойства находятся вObjectNameзначения свойств, определенные в

Суммировать

использоватьJMXМониторинг пула потоков простоJMXФункция, эта статья предназначена только для того, чтобы применить то, что вы узнали, больше информации о JMX и пулах потоков можно найти в других материалах. Если в статье есть ошибки, поправьте меня

Наконец прикрепил:код проекта,Добро пожаловатьforkиstar, [Я сделал точкуstarнемного】