Я прочитал код компании два дня назад и увидел использование JMX для мониторинга информации и состояния временных задач.Слово JMX было очень знакомым, поэтому я пошел проверить его и написал демонстрацию для мониторинга пула потоков.
Прочитав эту статью, вы узнаете:
- Введение в JMX
- Введение в пулы потоков
- Приложение JMX для мониторинга пула потоков
Что такое JMX
Введение в JMX
JMX(Java Management Extensions)
, мониторинг структуры управления с помощьюJMX
Приложения можно отслеживать и управлять ими.JMX
Наиболее распространенный сценарий — мониторингJava
Основная информация и работа с программой, любыеJava
программы можно открыватьJMX
, затем используйтеJConsole
илиVisual VM
сделать предварительный просмотр
Архитектура JMX
слой распределения: Интерфейс управления различными операциями на уровне агента определяется по разным протоколам.Проще говоря, это способ просмотра индикаторов мониторинга, который можно
HTTP
соединять,RMI
соединять,SNMP
соединятьпрокси-уровень:управлять
MBean
, поставивMBean
Зарегистрируйтесь в реализации прокси-уровняMBean
управление, помимо регистрацииMBean
, вы также можете зарегистрироватьсяAdapter
, уровень прокси обычно используется в приложенииMBeanService
Уровень устройства: классы, абстрагированные от индикаторов мониторинга, можно разделить на следующие категории:
- Standard MBean
- Dynamic MBean
- Open MBean
- Model MBean
- MXBean
Общее использование в приложенияхStandard MBean
Есть еще, поэтому я только представляю здесьStandard MBean
,использоватьStandard MBean
Необходимо соблюдать определенные правила, правила следующие:
- Определите интерфейс, имя интерфейса должно быть
XXXXMBean
формат,долженотMBean
конец - Если интерфейс
XXXXMBean
, класс реализации интерфейса должен бытьMBean
, иначе программа сообщит об ошибке - через интерфейс
get
иset
Метод указывает, доступен ли индикатор мониторинга для чтения и записи. НапримерgetXXX()
абстрактный метод, тоXXX
является индикатором мониторинга.getXXX()
выражатьXXX
Индикаторы производительности читабельны,setXXX()
Метод указывает, что индикатор мониторинга может быть записан - Типы параметров и возвращаемых значений могут быть только простыми ссылочными типами (например,
String
) и базовые типы данных, а не пользовательские типы, если возвращаемое значение является настраиваемым типом, вы можете выбратьMXBean
Краткое введение в пулы потоков
Пул потоков — это инструмент управления потоками. С помощью пула потоков можно повторно использовать потоки, чтобы снизить потребление ресурсов, повысить скорость отклика и улучшить управляемость потоками. Если в системе используется большое количество пулов потоков, необходимо отслеживать пул потоков, чтобы обнаруживать проблемы при возникновении ошибок. Это можно отслеживать с помощью параметров, предоставляемых пулом потоков.Параметры, предоставляемые пулом потоков, следующие:
метод | значение |
---|---|
getActiveCount | Количество потоков, выполняющих задачи в пуле потоков |
getCompletedTaskCount | Количество задач, выполненных пулом потоков |
getCorePoolSize | Количество основных потоков в пуле потоков |
getLargestPoolSize | Максимальное количество потоков, когда-либо созданных пулом потоков |
getMaximumPoolSize | Максимальное количество потоков в пуле потоков |
getPoolSize | Текущее количество потоков в пуле потоков |
getTaskCount | Количество задач, которые должен выполнить пул потоков |
применение
ВведениеJMX
И после пула потоков напишитеJMX
контролировать пул потоковDemo
, вы не можете говорить об этом на бумаге.
-
Определите класс мониторинга пула потоков:
ThreadPoolMonitor.java
public class ThreadPoolMonitor extends ThreadPoolExecutor { private final Logger logger = LoggerFactory.getLogger(getClass()); /** * ActiveCount * */ int ac = 0; /** * 当前所有线程消耗的时间 * */ private AtomicLong totalCostTime = new AtomicLong(); /** * 当前执行的线程总数 * */ private AtomicLong totalTasks = new AtomicLong(); /** * 线程池名称 */ private String poolName; /** * 最短 执行时间 * */ private long minCostTime; /** * 最长执行时间 * */ private long maxCostTime; /** * 保存任务开始执行的时间 */ private ThreadLocal<Long> startTime = new ThreadLocal<>(); public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), poolName); } public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, String poolName) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); this.poolName = poolName; } public static ExecutorService newFixedThreadPool(int nThreads, String poolName) { return new ThreadPoolMonitor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), poolName); } public static ExecutorService newCachedThreadPool(String poolName) { return new ThreadPoolMonitor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), poolName); } public static ExecutorService newSingleThreadExecutor(String poolName) { return new ThreadPoolMonitor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), poolName); } /** * 线程池延迟关闭时(等待线程池里的任务都执行完毕),统计线程池情况 */ @Override public void shutdown() { // 统计已执行任务、正在执行任务、未执行任务数量 logger.info("{} Going to shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}", this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size()); super.shutdown(); } @Override public List<Runnable> shutdownNow() { // 统计已执行任务、正在执行任务、未执行任务数量 logger.info("{} Going to immediately shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}", this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size()); return super.shutdownNow(); } /** * 任务执行之前,记录任务开始时间 */ @Override protected void beforeExecute(Thread t, Runnable r) { startTime.set(System.currentTimeMillis()); } /** * 任务执行之后,计算任务结束时间 */ @Override protected void afterExecute(Runnable r, Throwable t) { long costTime = System.currentTimeMillis() - startTime.get(); startTime.remove(); //删除,避免占用太多内存 //设置最大最小执行时间 maxCostTime = maxCostTime > costTime ? maxCostTime : costTime; if (totalTasks.get() == 0) { minCostTime = costTime; } minCostTime = minCostTime < costTime ? minCostTime : costTime; totalCostTime.addAndGet(costTime); totalTasks.incrementAndGet(); logger.info("{}-pool-monitor: " + "Duration: {} ms, PoolSize: {}, CorePoolSize: {}, ActiveCount: {}, " + "Completed: {}, Task: {}, Queue: {}, LargestPoolSize: {}, " + "MaximumPoolSize: {}, KeepAliveTime: {}, isShutdown: {}, isTerminated: {}", this.poolName, costTime, this.getPoolSize(), this.getCorePoolSize(), super.getActiveCount(), this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(), this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated()); } public int getAc() { return ac; } /** * 线程平均耗时 * * @return * */ public float getAverageCostTime() { return totalCostTime.get() / totalTasks.get(); } /** * 线程最大耗时 * */ public long getMaxCostTime() { return maxCostTime; } /** * 线程最小耗时 * */ public long getMinCostTime() { return minCostTime; } /** * 生成线程池所用的线程,改写了线程池默认的线程工厂 */ static class EventThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; /** * 初始化线程工厂 * * @param poolName 线程池名称 */ EventThreadFactory(String poolName) { SecurityManager s = System.getSecurityManager(); group = Objects.nonNull(s) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = poolName + "-pool-" + poolNumber.getAndIncrement() + "-thread-"; } @Override public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } } }
Настройте пул потоков, унаследовав пул потоков и добавив его в конструктор.
poolName
Укажите, какой это пул потоков, и заодно перепишите егоbeforeExecute
,afterExecute
,terminated
и так далее, вbeforeExecute
Метод записывает время выполнения пула потоков, вafterExecute
Метод вычисляет времязатратность, максимальное времязатратность, минимальное времязатратность и среднее времязатратность выполнения потока. Переписать метод генерации потока пула потоков, указав имя сгенерированного потока. -
определить
MBean
:ThreadPoolParamMBean.java
MBean
По сути, это не класс сущностей, а интерфейс, определяющий индикаторы мониторинга.public interface ThreadPoolParamMBean { /** * 线程池中正在执行任务的线程数量 * * @return */ int getActiveCount(); /** * 线程池已完成的任务数量 * * @return */ long getCompletedTaskCount(); /** * 线程池的核心线程数量 * * @return */ int getCorePoolSize(); /** * 线程池曾经创建过的最大线程数量 * * @return */ int getLargestPoolSize(); /** * 线程池的最大线程数量 * * @return */ int getMaximumPoolSize(); /** * 线程池当前的线程数量 * * @return */ int getPoolSize(); /** * 线程池需要执行的任务数量 * * @return */ long getTaskCount(); /** * 线程最大耗时 * * @return * */ long getMaxCostTime(); /** * 线程最小耗时 * * @return * */ long getMinCostTime(); /** * 线程平均耗时 * * @return * */ float getAverageCostTime(); }
-
определить
MBean
Класс реализации:ThreadPoolParam.java
Определение является статическим MBean, поэтому класс реализации интерфейса должен соответствовать требованиям, т.е.xxxMBean
, класс реализацииxxx
public class ThreadPoolParam implements ThreadPoolParamMBean { private ThreadPoolMonitor threadPoolMonitor; public ThreadPoolParam(ExecutorService es) { this.threadPoolMonitor = (ThreadPoolMonitor) es; } /** * 线程池中正在执行任务的线程数量 * * @return */ @Override public int getActiveCount() { return threadPoolMonitor.getAc(); } /** * 线程池已完成的任务数量 * * @return */ @Override public long getCompletedTaskCount() { return threadPoolMonitor.getCompletedTaskCount(); } /** * 线程池的核心线程数量 * * @return */ @Override public int getCorePoolSize() { return threadPoolMonitor.getCorePoolSize(); } /** * 线程池曾经创建过的最大线程数量 * * @return */ @Override public int getLargestPoolSize() { return threadPoolMonitor.getLargestPoolSize(); } /** * 线程池的最大线程数量 * * @return */ @Override public int getMaximumPoolSize() { return threadPoolMonitor.getMaximumPoolSize(); } /** * 线程池当前的线程数量 * * @return */ @Override public int getPoolSize() { return threadPoolMonitor.getPoolSize(); } /** * 线程池需要执行的任务数量 * * @return */ @Override public long getTaskCount() { return threadPoolMonitor.getTaskCount(); } /** * 线程最大耗时 * * @return * */ @Override public long getMaxCostTime() { return threadPoolMonitor.getMaxCostTime(); } /** * 线程最小耗时 * * @return * */ @Override public long getMinCostTime() { return threadPoolMonitor.getMinCostTime(); } /** * 线程平均耗时 * * @return * */ @Override public float getAverageCostTime() { return threadPoolMonitor.getAverageCostTime(); } }
Показатели отслеживаемых параметров получаются через пул потоков
-
Тестовый класс:
Test.java
public class Test { private static Random random = new Random(); public static void main(String[] args) throws MalformedObjectNameException, InterruptedException { ExecutorService es1 = ThreadPoolMonitor.newCachedThreadPool("test-pool-1"); ThreadPoolParam threadPoolParam1 = new ThreadPoolParam(es1); ExecutorService es2 = ThreadPoolMonitor.newCachedThreadPool("test-pool-2"); ThreadPoolParam threadPoolParam2 = new ThreadPoolParam(es2); MBeanServerUtil.registerMBean(threadPoolParam1, new ObjectName("test-pool-1:type=threadPoolParam")); MBeanServerUtil.registerMBean(threadPoolParam2, new ObjectName("test-pool-2:type=threadPoolParam")); //http连接的方式查看监控任务 HtmlAdaptor.start(); executeTask(es1); executeTask(es2); Thread.sleep(1000 * 60 * 60); } private static void executeTask(ExecutorService es) { new Thread(() -> { for (int i = 0; i < 10; i++) { int temp = i; es.submit(() -> { //随机睡眠时间 try { Thread.sleep(random.nextInt(60) * 1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(temp); }); } }).start(); } }
инструкция:
-
MBeanServerUtil.registerMBean()
Зарегистрировать отслеживаемый класс -
HtmlAdaptor.start()
включиHTTP
Просмотр задач мониторинга путем подключения
Открыть после запуска программы
http://localhost:8082/
Как показано ниже: -
нажмитеtest-pool-1
внизtype=threadPoolParam
Получите последние показатели мониторинга пула потоков, обновивtest-pool-1
иtype=threadPoolParam
Эти свойства находятся вObjectName
значения свойств, определенные в
Суммировать
использоватьJMX
Мониторинг пула потоков простоJMX
Функция, эта статья предназначена только для того, чтобы применить то, что вы узнали, больше информации о JMX и пулах потоков можно найти в других материалах. Если в статье есть ошибки, поправьте меня
Наконец прикрепил:код проекта,Добро пожаловатьforkиstar, [Я сделал точкуstarнемного】