Компонент динамического пула потоков — проектирование и реализация

Java

1. Общий дизайн

Целое разделено на две части: бизнес-обслуживание и управление, включая три основных функциональных пункта:

(1) Упростить конфигурацию пула потоков: corePoolSize, maxPoolSize, workQueue, которые в наибольшей степени определяют распределение задач и стратегию распределения потоков пула потоков. Учитывая, что существует два основных сценария получения параллелизма в практических приложениях: (1) Выполнение подзадач параллельно для повышения скорости отклика. В этом случае следует использовать синхронную очередь, задачи не должны кэшироваться, а выполняться немедленно. (2) Выполняйте большие пакеты задач параллельно, чтобы повысить пропускную способность. В этом случае следует использовать ограниченную очередь, и очередь должна использоваться для буферизации больших пакетов задач, емкость очереди должна быть объявлена ​​для предотвращения неограниченного накопления задач. Таким образом, пул потоков должен только обеспечивать настройку этих трех ключевых параметров и выбор двух очередей для удовлетворения подавляющего большинства потребностей бизнеса.

(2) параметры могут быть динамически настроены: на основе пула потоков Java инкапсулировать пул потоков, разрешить пулу потоков изменять конфигурацию в соответствии с внешними изменениями конфигурации и разработать интерфейс управления на стороне управления для облегчения разработки;

(3) Расширение мониторинга пула потоков: добавьте возможности мониторинга на протяжении всего жизненного цикла задач выполнения пула потоков, например текущее количество активных задач, количество исключений, возникающих при выполнении задач, и размер очереди задач для облегчения соответствующие разработки для понимания состояния пула потоков;

2. Общий архитектурный дизайн

Динамическая настройка параметров: предоставляет интерфейс управления и поддерживает динамическую настройку параметров пула потоков, включая количество основных потоков в пуле потоков, максимальное количество потоков, длину буферной очереди и т. д.; параметры вступают в силу со временем. после модификации

Мониторинг задач: поддерживает мониторинг транзакций для детализации приложений, детализации пула потоков и детализации задач.Вы можете просматривать состояние выполнения задачи пула потоков, максимальное время выполнения задачи и среднее время выполнения задачи.

Загрузить сигнал тревоги: поддержка конфигурации правил сигналов тревоги, при превышении порогового значения соответствующий специалист по разработке будет уведомлен

Мониторинг и ведение журнала операций: журнал аудита доступа к конфигурации сегмента управления

Проверка разрешений: разные пользователи приложения

3. Основной бизнес-процесс

В основном его можно разделить на две части: приложение (клиент) и сторона управления, использующая базы данных mysql и es, в которых страница мониторинга настраивается с помощью инструмента визуализации kibana, а страница конфигурации пула потоков заполняется с помощью простого фронта. -конец;

4. Часть основного кода

4.1 Реализация клиента

4.1.1 Определение и реализация динамического пула потоков

Поскольку ThreadPoolExecutor открывает только maxCorePoolSize/corePoolSize/

Только интерфейс BlockingQueue может быть реализован для реализации очереди блокировки, а метод изменения емкости очереди может быть открыт, чтобы установленный метод можно было динамически модифицировать, а другие части могли напрямую копировать реализацию ArrayBlockingQueue для реализации динамической блокировки. очередь;

//自定义链表阻塞队列
public class PhxResizeLinkedBlockingQueue extends AbstractQueue
        implements BlockingQueue, java.io.Serializable {
            
    private volatile int capacity;
    
    //增加设置队列容量接口
    public void setCapacity(int capacity) {
        this.capacity = capacity;
    }
}

В дополнение к динамической модификации он также должен поддерживать отчеты о состоянии, отчеты об исключениях перегрузки, статистику выполнения задач, занимающую много времени, и т. д., поэтому также необходимо переписать метод afterExecute для отслеживания состояния выполнения задачи и настроить RejectHandler. для отслеживания действия отклонения, как показано ниже.

//重写方法,监听任务运行状态
 public class PhxThreadPool extends ThreadPoolExecutor {
    
    @Override
    protected void afterExecute(Runnable r, Throwable throwable) {
        LocalDateTime startTime = LocalDateTime.now();
        super.afterExecute(r, throwable);
        LocalDateTime endTime = LocalDateTime.now();
        
        if (throwable == null && r instanceof Future) {
            try {
                ((Future) r).get();
            } catch (CancellationException ce) {
                throwable = ce;
            } catch (ExecutionException ee) {
                throwable = ee.getCause();
            } catch(InterruptedException ie){
                Thread.currentThread().interrupt();
            }
        }
        
        try {
            if (throwable != null) {
                publishEvent(EventEnums.POOL_RUNNABLE_EXECUTE_ERROR.name(), null);
            } else {
                Duration duration = Duration.between(startTime, endTime);
                Integer costTime = Integer.parseInt(String.valueOf(duration.getSeconds()));
                publishEvent(EventEnums.POOL_RUNTIME_STATISTICS.name(), costTime);
            }
        } catch (Exception ex){
         
        }
}
    
 //自定义拒绝策略,监听并上报
 public class PhxRejectHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        PhxThreadPool threadPool = (PhxThreadPool)executor;
        List metaDataDTOS = new ArrayList<>();
        ...
        publisher.publishEvent(metaDataDTOS);
    }
}

4.1.2 Регистрация динамического пула потоков

Для облегчения управления экземплярами пула потоков определяется контейнер пула потоков.

public class ThreadPoolContainer {
    private ConcurrentMap container = new ConcurrentHashMap<>();
    public void put(String poolName, PhxThreadPool threadPool) {
        container.putIfAbsent(poolName, threadPool);
    }
    public PhxThreadPool get(String poolName) {
        return container.getOrDefault(poolName, null);
    }   
}

При сканировании и регистрации потоков используется постпроцессор бина, а определенные экземпляры пула потоков сканируются один за другим при запуске контейнера spring.Чтобы не блокировать процесс запуска, для регистрации создается отдельный поток . Вам также необходимо установить пользовательскую политику отказа в экземпляре пула потоков.

public class PhxThreadPoolBeanPostProcessor implements BeanPostProcessor {
    
     private final String url;
     private PhxThreadPoolConfig config;
     private ThreadPoolContainer container;
     private RejectedExecutionHandler rejectHandler;
     private ExecutorService executorService = Executors.newSingleThreadExecutor();
     public PhxThreadPoolBeanPostProcessor(PhxThreadPoolConfig config, ThreadPoolContainer container) {
        this.config = config;
        url = config.getAdminUrl() + "/meta/register";
        this.container = container;
    }

    @Override
    public Object postProcessBeforeInitialization(final Object bean, final String beanName) throws BeansException {
        return bean;
    }

    @Override
    public Object postProcessAfterInitialization(final Object poolBean, final String beanName) throws BeansException {
        if (poolBean instanceof PhxThreadPool) {
            executorService.execute(() -> handler((PhxThreadPool) poolBean));
        }
        return poolBean;
    }
    
    private void handler(final PhxThreadPool poolBean) {
        String poolName = poolBean.getName();
        poolBean.setRejectedExecutionHandler(rejectHandler);
        container.put(poolName, poolBean);
        post(buildJsonParams(poolBean));
    }
    
}

4.1.3 Отчет о состоянии работы пула потоков

Чтобы увеличить пропускную способность сообщений и максимально сэкономить ресурсы приложения, очереди сообщений используются для буферизации в клиентских компонентах, включая события/публикаторы событий/очереди сообщений/обработчики событий.

Основной код реализации выглядит следующим образом:

//监控事件包装类
public class MonitorEvent implements Serializable {
    private List eventDTOs;  
    public void clear () {
        eventDTOs = null; //使用后让jvm主动gc回收
    }
}

//事件
public class MonitorEventDTO {
    private String appName;
    private String ip;
    private String poolName;
    private Integer corePoolSize;
    private Integer maxPoolSize;
    private Integer queueSize;
    private Integer queueCapacity;
}

//事件生成工厂类
public class MonitorEventFactory implements EventFactory {
    @Override
    public MonitorEvent newInstance() {
        return new MonitorEvent();
    }
}

//事件处理器
public class MonitorEventHandler implements EventHandler {
    private PhxThreadPoolConfig config;
    public MonitorEventHandler(PhxThreadPoolConfig config) {
        this.config = config;
    }
    @Override
    public void onEvent(MonitorEvent monitorEvent, long l, boolean b) throws Exception {
        if (monitorEvent == null || monitorEvent.getEventDTOs() == null
                || monitorEvent.getEventDTOs().size() == 0) {
            return;
        }
        String path = config.getAdminUrl() + "/meta/upload";
        List eventDTOs = monitorEvent.getEventDTOs();
        PhxSender.upload(path, eventDTOs);        
        monitorEvent.clear();
    }
}

//消息发布器
public class MonitorEventPublisher {    
    private Disruptor disruptor;
    private MonitorEventHandler eventHandler;
    private PhxThreadPoolConfig config;
    public void publishEvent(final List events) {
        final RingBuffer ringBuffer = disruptor.getRingBuffer();
        ringBuffer.publishEvent(new EventTranslator(), events);
    }
    public void destroy() {
        disruptor.shutdown();
    }
}

//消息转换
public class EventTranslator implements EventTranslatorOneArg> {
    @Override
    public void translateTo(MonitorEvent event, long l, List monitorEventDTOS) {
        event.setEventDTOs(monitorEventDTOS);
    }
}

//发送
public class PhxSender {
    public static void upload(String path, List metaDataDTOS) throws IOException {
        if (metaDataDTOS != null && metaDataDTOS.size() > 0) {
            String jsonBody = OkHttpTools.getInstance().getGosn().toJson(metaDataDTOS);
            OkHttpTools.getInstance().post(path, jsonBody);
        }
    }
}

4.1.4 Динамическая конфигурация пула потоков

Существует два способа динамического изменения конфигурации пула потоков.Первый способ заключается в использовании клиента apollo для прослушивания события изменения конфигурации, отправленного управляющим терминалом, а затем ответа в соответствии с событием, аналогично «режиму отправки». "; второй способ - использовать клиент. Запустите поток, продолжайте запрашивать службу на стороне управления, а затем обновите конфигурацию локального пула потоков, аналогично "режиму извлечения". Здесь используется второй метод.

public class PhxClient {
    public void startPullPoolConfig() {
        pullConfigThread = new Thread(() -> {
            while(!toStop) {
                try {
    
                    String path = config.getAdminUrl() + "/meta/" + config.getAppName();
                    String response = OkHttpTools.getInstance().get(path);
                    //解析控制端返回的线程池配置信息
                    List pools = JSON.parseArray(response, ThreadPoolConfigDto.class);
                    if (pools != null && pools.size() > 0) {
                        pools.forEach(pool -> {
                            PhxThreadPool threadPool = container.get(pool.getPoolName());
                            threadPool.setCorePoolSize(pool.getCorePoolSize());
                            threadPool.setMaximumPoolSize(pool.getMaxPoolSize());
                            PhxResizeLinkedBlockingQueue queue = (PhxResizeLinkedBlockingQueue)threadPool.getQueue();
                            queue.setCapacity(pool.getQueueCapacity());
                            container.put(pool.getPoolName(), threadPool);
                        });
                    }
    
                } catch (Exception ex) {
                    log.error("【线程池控制客户端-拉取配置线程发生异常】exception:", ex);
                    if (!toStop) {
                    }
                }
    
                try {
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }

5. Применение эффектов

5.1 Контроль нагрузки и сигнализация