1. Общий дизайн
Целое разделено на две части: бизнес-обслуживание и управление, включая три основных функциональных пункта:
(1) Упростить конфигурацию пула потоков: corePoolSize, maxPoolSize, workQueue, которые в наибольшей степени определяют распределение задач и стратегию распределения потоков пула потоков. Учитывая, что существует два основных сценария получения параллелизма в практических приложениях: (1) Выполнение подзадач параллельно для повышения скорости отклика. В этом случае следует использовать синхронную очередь, задачи не должны кэшироваться, а выполняться немедленно. (2) Выполняйте большие пакеты задач параллельно, чтобы повысить пропускную способность. В этом случае следует использовать ограниченную очередь, и очередь должна использоваться для буферизации больших пакетов задач, емкость очереди должна быть объявлена для предотвращения неограниченного накопления задач. Таким образом, пул потоков должен только обеспечивать настройку этих трех ключевых параметров и выбор двух очередей для удовлетворения подавляющего большинства потребностей бизнеса.
(2) параметры могут быть динамически настроены: на основе пула потоков Java инкапсулировать пул потоков, разрешить пулу потоков изменять конфигурацию в соответствии с внешними изменениями конфигурации и разработать интерфейс управления на стороне управления для облегчения разработки;
(3) Расширение мониторинга пула потоков: добавьте возможности мониторинга на протяжении всего жизненного цикла задач выполнения пула потоков, например текущее количество активных задач, количество исключений, возникающих при выполнении задач, и размер очереди задач для облегчения соответствующие разработки для понимания состояния пула потоков;
2. Общий архитектурный дизайн
Динамическая настройка параметров: предоставляет интерфейс управления и поддерживает динамическую настройку параметров пула потоков, включая количество основных потоков в пуле потоков, максимальное количество потоков, длину буферной очереди и т. д.; параметры вступают в силу со временем. после модификации
Мониторинг задач: поддерживает мониторинг транзакций для детализации приложений, детализации пула потоков и детализации задач.Вы можете просматривать состояние выполнения задачи пула потоков, максимальное время выполнения задачи и среднее время выполнения задачи.
Загрузить сигнал тревоги: поддержка конфигурации правил сигналов тревоги, при превышении порогового значения соответствующий специалист по разработке будет уведомлен
Мониторинг и ведение журнала операций: журнал аудита доступа к конфигурации сегмента управления
Проверка разрешений: разные пользователи приложения
3. Основной бизнес-процесс
В основном его можно разделить на две части: приложение (клиент) и сторона управления, использующая базы данных mysql и es, в которых страница мониторинга настраивается с помощью инструмента визуализации kibana, а страница конфигурации пула потоков заполняется с помощью простого фронта. -конец;
4. Часть основного кода
4.1 Реализация клиента
4.1.1 Определение и реализация динамического пула потоков
Поскольку ThreadPoolExecutor открывает только maxCorePoolSize/corePoolSize/
Только интерфейс BlockingQueue может быть реализован для реализации очереди блокировки, а метод изменения емкости очереди может быть открыт, чтобы установленный метод можно было динамически модифицировать, а другие части могли напрямую копировать реализацию ArrayBlockingQueue для реализации динамической блокировки. очередь;
//自定义链表阻塞队列
public class PhxResizeLinkedBlockingQueue extends AbstractQueue
implements BlockingQueue, java.io.Serializable {
private volatile int capacity;
//增加设置队列容量接口
public void setCapacity(int capacity) {
this.capacity = capacity;
}
}
В дополнение к динамической модификации он также должен поддерживать отчеты о состоянии, отчеты об исключениях перегрузки, статистику выполнения задач, занимающую много времени, и т. д., поэтому также необходимо переписать метод afterExecute для отслеживания состояния выполнения задачи и настроить RejectHandler. для отслеживания действия отклонения, как показано ниже.
//重写方法,监听任务运行状态
public class PhxThreadPool extends ThreadPoolExecutor {
@Override
protected void afterExecute(Runnable r, Throwable throwable) {
LocalDateTime startTime = LocalDateTime.now();
super.afterExecute(r, throwable);
LocalDateTime endTime = LocalDateTime.now();
if (throwable == null && r instanceof Future) {
try {
((Future) r).get();
} catch (CancellationException ce) {
throwable = ce;
} catch (ExecutionException ee) {
throwable = ee.getCause();
} catch(InterruptedException ie){
Thread.currentThread().interrupt();
}
}
try {
if (throwable != null) {
publishEvent(EventEnums.POOL_RUNNABLE_EXECUTE_ERROR.name(), null);
} else {
Duration duration = Duration.between(startTime, endTime);
Integer costTime = Integer.parseInt(String.valueOf(duration.getSeconds()));
publishEvent(EventEnums.POOL_RUNTIME_STATISTICS.name(), costTime);
}
} catch (Exception ex){
}
}
//自定义拒绝策略,监听并上报
public class PhxRejectHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
PhxThreadPool threadPool = (PhxThreadPool)executor;
List metaDataDTOS = new ArrayList<>();
...
publisher.publishEvent(metaDataDTOS);
}
}
4.1.2 Регистрация динамического пула потоков
Для облегчения управления экземплярами пула потоков определяется контейнер пула потоков.
public class ThreadPoolContainer {
private ConcurrentMap container = new ConcurrentHashMap<>();
public void put(String poolName, PhxThreadPool threadPool) {
container.putIfAbsent(poolName, threadPool);
}
public PhxThreadPool get(String poolName) {
return container.getOrDefault(poolName, null);
}
}
При сканировании и регистрации потоков используется постпроцессор бина, а определенные экземпляры пула потоков сканируются один за другим при запуске контейнера spring.Чтобы не блокировать процесс запуска, для регистрации создается отдельный поток . Вам также необходимо установить пользовательскую политику отказа в экземпляре пула потоков.
public class PhxThreadPoolBeanPostProcessor implements BeanPostProcessor {
private final String url;
private PhxThreadPoolConfig config;
private ThreadPoolContainer container;
private RejectedExecutionHandler rejectHandler;
private ExecutorService executorService = Executors.newSingleThreadExecutor();
public PhxThreadPoolBeanPostProcessor(PhxThreadPoolConfig config, ThreadPoolContainer container) {
this.config = config;
url = config.getAdminUrl() + "/meta/register";
this.container = container;
}
@Override
public Object postProcessBeforeInitialization(final Object bean, final String beanName) throws BeansException {
return bean;
}
@Override
public Object postProcessAfterInitialization(final Object poolBean, final String beanName) throws BeansException {
if (poolBean instanceof PhxThreadPool) {
executorService.execute(() -> handler((PhxThreadPool) poolBean));
}
return poolBean;
}
private void handler(final PhxThreadPool poolBean) {
String poolName = poolBean.getName();
poolBean.setRejectedExecutionHandler(rejectHandler);
container.put(poolName, poolBean);
post(buildJsonParams(poolBean));
}
}
4.1.3 Отчет о состоянии работы пула потоков
Чтобы увеличить пропускную способность сообщений и максимально сэкономить ресурсы приложения, очереди сообщений используются для буферизации в клиентских компонентах, включая события/публикаторы событий/очереди сообщений/обработчики событий.
Основной код реализации выглядит следующим образом:
//监控事件包装类
public class MonitorEvent implements Serializable {
private List eventDTOs;
public void clear () {
eventDTOs = null; //使用后让jvm主动gc回收
}
}
//事件
public class MonitorEventDTO {
private String appName;
private String ip;
private String poolName;
private Integer corePoolSize;
private Integer maxPoolSize;
private Integer queueSize;
private Integer queueCapacity;
}
//事件生成工厂类
public class MonitorEventFactory implements EventFactory {
@Override
public MonitorEvent newInstance() {
return new MonitorEvent();
}
}
//事件处理器
public class MonitorEventHandler implements EventHandler {
private PhxThreadPoolConfig config;
public MonitorEventHandler(PhxThreadPoolConfig config) {
this.config = config;
}
@Override
public void onEvent(MonitorEvent monitorEvent, long l, boolean b) throws Exception {
if (monitorEvent == null || monitorEvent.getEventDTOs() == null
|| monitorEvent.getEventDTOs().size() == 0) {
return;
}
String path = config.getAdminUrl() + "/meta/upload";
List eventDTOs = monitorEvent.getEventDTOs();
PhxSender.upload(path, eventDTOs);
monitorEvent.clear();
}
}
//消息发布器
public class MonitorEventPublisher {
private Disruptor disruptor;
private MonitorEventHandler eventHandler;
private PhxThreadPoolConfig config;
public void publishEvent(final List events) {
final RingBuffer ringBuffer = disruptor.getRingBuffer();
ringBuffer.publishEvent(new EventTranslator(), events);
}
public void destroy() {
disruptor.shutdown();
}
}
//消息转换
public class EventTranslator implements EventTranslatorOneArg> {
@Override
public void translateTo(MonitorEvent event, long l, List monitorEventDTOS) {
event.setEventDTOs(monitorEventDTOS);
}
}
//发送
public class PhxSender {
public static void upload(String path, List metaDataDTOS) throws IOException {
if (metaDataDTOS != null && metaDataDTOS.size() > 0) {
String jsonBody = OkHttpTools.getInstance().getGosn().toJson(metaDataDTOS);
OkHttpTools.getInstance().post(path, jsonBody);
}
}
}
4.1.4 Динамическая конфигурация пула потоков
Существует два способа динамического изменения конфигурации пула потоков.Первый способ заключается в использовании клиента apollo для прослушивания события изменения конфигурации, отправленного управляющим терминалом, а затем ответа в соответствии с событием, аналогично «режиму отправки». "; второй способ - использовать клиент. Запустите поток, продолжайте запрашивать службу на стороне управления, а затем обновите конфигурацию локального пула потоков, аналогично "режиму извлечения". Здесь используется второй метод.
public class PhxClient {
public void startPullPoolConfig() {
pullConfigThread = new Thread(() -> {
while(!toStop) {
try {
String path = config.getAdminUrl() + "/meta/" + config.getAppName();
String response = OkHttpTools.getInstance().get(path);
//解析控制端返回的线程池配置信息
List pools = JSON.parseArray(response, ThreadPoolConfigDto.class);
if (pools != null && pools.size() > 0) {
pools.forEach(pool -> {
PhxThreadPool threadPool = container.get(pool.getPoolName());
threadPool.setCorePoolSize(pool.getCorePoolSize());
threadPool.setMaximumPoolSize(pool.getMaxPoolSize());
PhxResizeLinkedBlockingQueue queue = (PhxResizeLinkedBlockingQueue)threadPool.getQueue();
queue.setCapacity(pool.getQueueCapacity());
container.put(pool.getPoolName(), threadPool);
});
}
} catch (Exception ex) {
log.error("【线程池控制客户端-拉取配置线程发生异常】exception:", ex);
if (!toStop) {
}
}
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}