Добавить Автора
Это вторая часть шаблона проектирования, первая показывает наилучший способ разработки шаблона — успешное использование шаблона стратегии.
Шаблон конвейера — это один из распространенных вариантов шаблона цепочки ответственности. В конвейерном режиме конвейер играет роль конвейера, передавая данные в последовательность обработки.После обработки данных на каждом шаге они передаются на следующий шаг для обработки до тех пор, пока не будут обработаны все шаги.
PS: в режиме чистой цепочки ответственности для обработки данных используется только один процессор в цепочке, а в конвейерном режиме данные обрабатывают несколько процессоров.
Когда использовать шаблон конвейера
Код задачи относительно сложен и должен быть разделен на несколько подшагов, особенно когда новые подшаги могут быть добавлены в любом месте, старые подшаги могут быть удалены, а порядок подшагов может быть изменен. можно рассмотреть возможность использования конвейерного режима.
Получайте удовольствие, используя шаблон трубы
Фоновое воспроизведение
Когда я впервые начал работать в качестве платформы моделей, функция создания экземпляров модели включала: «проверка входных данных -> создание экземпляров модели на основе ввода -> сохранение экземпляров модели в соответствующей таблице БД», всего три шага, не слишком сложный, поэтому на тот момент код выглядел так:
public class ModelServiceImpl implements ModelService {
/**
* 提交模型(构建模型实例)
*/
public CommonReponse<Long> buildModelInstance(InstanceBuildRequest request) {
// 输入数据校验
validateInput(request);
// 根据输入创建模型实例
ModelInstance instance = createModelInstance(request);
// 保存实例到相关 DB 表
saveInstance(instance);
}
}
Однако вскоре мы обнаружили, что формат входных данных формы не полностью соответствует входным требованиям модели, поэтому мы добавили «предварительную обработку данных формы». Эта функция еще не реализована, и некоторые бизнес-стороны предложили им также обрабатывать данные (например, в соответствии с формой, введенной продавцом, генерировать некоторые другие бизнес-данные в качестве входных данных модели).
Следовательно, после «проверки входных данных» необходимо добавить «предварительная обработка ввода и вывода формы» и «обработка пользовательских данных на стороне бизнеса (необязательно)». В этот момент я столкнулся с выбором: продолжать ли мне реализовывать эти новые этапы обработки, добавляя новые методы в buildModelInstance? Преимущество в том, что вы можете быть ленивым в данный момент, но недостаток в том, что:
1. ModelService следует использовать только для получения запросов HSF, а не для выполнения бизнес-логики.Если логика отправки модели написана в этом классе, это нарушает единственную ответственность и позже приведет к взрыву кода класса.
2. Каждый раз, когда в будущем добавляется новый шаг обработки или удаляется определенный шаг, я буду модифицировать метод buildModelInstance, который должен быть очень связным, нарушая принцип открытого-закрытого.
Итак, чтобы не копать яму для себя в будущем, я думаю, мне нужно подумать о безошибочном плане. В это время моя головушка начала летать, и вдруг промелькнул ChannelPipeline в Netty — да, режим конвейера, неужели это именно то, что мне нужно!
Существуют также различные способы реализации конвейерного режима.Далее, на основе предыдущего фона, я поделюсь своей текущей «лучшей рутиной» для реализации конвейерного режима на основе Spring (если у вас есть лучшая рутина, пожалуйста, просветите меня и обсудите ее вместе).
Определите контекст для конвейерной обработки
/**
* 传递到管道的上下文
*/
@Getter
@Setter
public class PipelineContext {
/**
* 处理开始时间
*/
private LocalDateTime startTime;
/**
* 处理结束时间
*/
private LocalDateTime endTime;
/**
* 获取数据名称
*/
public String getName() {
return this.getClass().getSimpleName();
}
}
Определить процессор контекста
/**
* 管道中的上下文处理器
*/
public interface ContextHandler<T extends PipelineContext> {
/**
* 处理输入的上下文数据
*
* @param context 处理时的上下文数据
* @return 返回 true 则表示由下一个 ContextHandler 继续处理,返回 false 则表示处理结束
*/
boolean handle(T context);
}
Для удобства объяснения теперь мы определяем контекст и связанные процессоры самой ранней версии [Submit Model Logic]:
/**
* 模型实例构建的上下文
*/
@Getter
@Setter
public class InstanceBuildContext extends PipelineContext {
/**
* 模型 id
*/
private Long modelId;
/**
* 用户 id
*/
private long userId;
/**
* 表单输入
*/
private Map<String, Object> formInput;
/**
* 保存模型实例完成后,记录下 id
*/
private Long instanceId;
/**
* 模型创建出错时的错误信息
*/
private String errorMsg;
// 其他参数
@Override
public String getName() {
return "模型实例构建上下文";
}
}
Процессор — проверка входных данных:
@Component
public class InputDataPreChecker implements ContextHandler<InstanceBuildContext> {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public boolean handle(InstanceBuildContext context) {
logger.info("--输入数据校验--");
Map<String, Object> formInput = context.getFormInput();
if (MapUtils.isEmpty(formInput)) {
context.setErrorMsg("表单输入数据不能为空");
return false;
}
String instanceName = (String) formInput.get("instanceName");
if (StringUtils.isBlank(instanceName)) {
context.setErrorMsg("表单输入数据必须包含实例名称");
return false;
}
return true;
}
}
Процессор — создает экземпляр модели на основе ввода:
@Component
public class ModelInstanceCreator implements ContextHandler<InstanceBuildContext> {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public boolean handle(InstanceBuildContext context) {
logger.info("--根据输入数据创建模型实例--");
// 假装创建模型实例
return true;
}
}
Процессор — сохраняет экземпляр модели в соответствующую таблицу БД:
@Component
public class ModelInstanceSaver implements ContextHandler<InstanceBuildContext> {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public boolean handle(InstanceBuildContext context) {
logger.info("--保存模型实例到相关DB表--");
// 假装保存模型实例
return true;
}
}
На этом этапе возникает вопрос: какой метод следует использовать для конкатенации ContextHandler одного и того же контекста в конвейер? подумайте немного:
Добавьте метод setNext в ContextHandler, каждый реализующий класс должен указать свой следующий обработчик. Недостаток тоже очевиден: если в середине текущего пайплайна добавляется новый ContextHandler, необходимо модифицировать метод setNext предыдущего ContextHandler, кроме того, код написан для чтения, поэтому его невозможно интуитивно знать весь конвейер с первого взгляда.Чтобы обработать ссылку, вы должны перейти к каждому связанному ContextHandler, чтобы увидеть его.
Аннотируйте ContextHandler с помощью @Order и определите последовательность каждого ContextHandler в соответствии с числом, указанным в @Order.В начале интервал между каждым числом может быть больше (например, 10, 20, 30), а при добавлении новых ContextHandlers позже вы можете указать число как (11, 21, 31), тогда вы можете избежать проблемы изменения кода в приведенной выше схеме, но все же неизбежно вводить каждый соответствующий ContextHandler, чтобы проверить, чтобы узнать проблему конвейера ссылка на обработку.
Заранее напишите таблицу маршрутизации, укажите сопоставление «Контекст -> конвейер» (конвейер представлен списком) и порядок процессоров в конвейере. На основе этой таблицы маршрутизации Spring при запуске строит карту, ключ карты — это тип контекста, а значение — конвейер (то есть список). В этом случае, если вы хотите узнать канал обработки каждого конвейера, вы можете просто посмотреть на эту таблицу маршрутизации, которая понятна с первого взгляда. Недостатком является то, что каждый раз, когда добавляется новый ContextHandler, эта таблица маршрутизации также должна вносить небольшие изменения в соответствующий конвейер — но если это может сделать читаемый код более понятным, я думаю, что такие изменения целесообразны и приемлемы~
Создайте таблицу маршрутизации конвейера
Основываясь на конфигурации Java Bean Spring, мы можем легко построить таблицу маршрутизации конвейера:
/**
* 管道路由的配置
*/
@Configuration
public class PipelineRouteConfig implements ApplicationContextAware {
/**
* 数据类型->管道中处理器类型列表 的路由
*/
private static final
Map<Class<? extends PipelineContext>,
List<Class<? extends ContextHandler<? extends PipelineContext>>>> PIPELINE_ROUTE_MAP = new HashMap<>(4);
/*
* 在这里配置各种上下文类型对应的处理管道:键为上下文类型,值为处理器类型的列表
*/
static {
PIPELINE_ROUTE_MAP.put(InstanceBuildContext.class,
Arrays.asList(
InputDataPreChecker.class,
ModelInstanceCreator.class,
ModelInstanceSaver.class
));
// 将来其他 Context 的管道配置
}
/**
* 在 Spring 启动时,根据路由表生成对应的管道映射关系
*/
@Bean("pipelineRouteMap")
public Map<Class<? extends PipelineContext>, List<? extends ContextHandler<? extends PipelineContext>>> getHandlerPipelineMap() {
return PIPELINE_ROUTE_MAP.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, this::toPipeline));
}
/**
* 根据给定的管道中 ContextHandler 的类型的列表,构建管道
*/
private List<? extends ContextHandler<? extends PipelineContext>> toPipeline(
Map.Entry<Class<? extends PipelineContext>, List<Class<? extends ContextHandler<? extends PipelineContext>>>> entry) {
return entry.getValue()
.stream()
.map(appContext::getBean)
.collect(Collectors.toList());
}
private ApplicationContext appContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
appContext = applicationContext;
}
}
Определить исполнителя конвейера
Последним шагом является определение исполнителя конвейера. Исполнитель конвейера находит соответствующий конвейер в соответствии с типом входящих данных контекста, а затем помещает данные контекста в конвейер для обработки.
/**
* 管道执行器
*/
@Component
public class PipelineExecutor {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
/**
* 引用 PipelineRouteConfig 中的 pipelineRouteMap
*/
@Resource
private Map<Class<? extends PipelineContext>,
List<? extends ContextHandler<? super PipelineContext>>> pipelineRouteMap;
/**
* 同步处理输入的上下文数据<br/>
* 如果处理时上下文数据流通到最后一个处理器且最后一个处理器返回 true,则返回 true,否则返回 false
*
* @param context 输入的上下文数据
* @return 处理过程中管道是否畅通,畅通返回 true,不畅通返回 false
*/
public boolean acceptSync(PipelineContext context) {
Objects.requireNonNull(context, "上下文数据不能为 null");
// 拿到数据类型
Class<? extends PipelineContext> dataType = context.getClass();
// 获取数据处理管道
List<? extends ContextHandler<? super PipelineContext>> pipeline = pipelineRouteMap.get(dataType);
if (CollectionUtils.isEmpty(pipeline)) {
logger.error("{} 的管道为空", dataType.getSimpleName());
return false;
}
// 管道是否畅通
boolean lastSuccess = true;
for (ContextHandler<? super PipelineContext> handler : pipeline) {
try {
// 当前处理器处理数据,并返回是否继续向下处理
lastSuccess = handler.handle(context);
} catch (Throwable ex) {
lastSuccess = false;
logger.error("[{}] 处理异常,handler={}", context.getName(), handler.getClass().getSimpleName(), ex);
}
// 不再向下处理
if (!lastSuccess) { break; }
}
return lastSuccess;
}
}
Использовать конвейерный режим
На этом этапе мы можем изменить начальный buildModelInstance на:
public CommonResponse<Long> buildModelInstance(InstanceBuildRequest request) {
InstanceBuildContext data = createPipelineData(request);
boolean success = pipelineExecutor.acceptSync(data);
// 创建模型实例成功
if (success) {
return CommonResponse.success(data.getInstanceId());
}
logger.error("创建模式实例失败:{}", data.getErrorMsg());
return CommonResponse.failed(data.getErrorMsg());
}
Смоделируем процесс создания экземпляра модели:
Когда параметры в норме:
Когда параметр неправильный:
В настоящее время мы добавляем два новых ContextHandler в InstanceBuildContext: FormInputPreprocessor (предварительная обработка входных данных формы) и BizSideCustomProcessor (обработка пользовательских данных на стороне бизнеса).
@Component
public class FormInputPreprocessor implements ContextHandler<InstanceBuildContext> {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public boolean handle(InstanceBuildContext context) {
logger.info("--表单输入数据预处理--");
// 假装进行表单输入数据预处理
return true;
}
}
@Component
public class BizSideCustomProcessor implements ContextHandler<InstanceBuildContext> {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public boolean handle(InstanceBuildContext context) {
logger.info("--业务方自定义数据处理--");
// 先判断是否存在自定义数据处理,如果没有,直接返回 true
// 调用业务方的自定义的表单数据处理
return true;
}
}
В настоящее время buildModelInstance изменять не нужно. Нам нужно только добавить эти два ContextHandler в конвейер, связанный с InstanceBuildContext в «таблице маршрутизации». Когда Spring запустится, он автоматически поможет нам построить конвейер, соответствующий каждому контексту:
Затем смоделируйте процесс создания экземпляра модели:
Асинхронная обработка
В PipelineExecutor acceptSync — это синхронный метод.
Xiaomi: Вы можете сказать, взглянув на имя, которое вы тихо настраиваете.
Для задач с большим количеством шагов нам часто требуется асинхронная обработка, например, определенные запланированные задачи, отнимающие много времени. Асинхронная конвейерная обработка очень проста, мы сначала определяем пул потоков, например:
<!-- 专门用于执行管道任务的线程池 -->
<bean id="pipelineThreadPool"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="4" /> <!-- 核心线程数 -->
<property name="maxPoolSize" value="8" /> <!-- 最大线程数 -->
<property name="keepAliveSeconds" value="960" /> <!-- 线程最大空闲时间/秒(根据管道使用情况指定)-->
<property name="queueCapacity" value="256" /> <!-- 任务队列大小(根据管道使用情况指定)-->
<property name="threadNamePrefix" value="pipelineThreadPool" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy" />
</property>
</bean>
Затем добавьте метод асинхронной обработки в PipelineExecutor:
/**
* 管道线程池
*/
@Resource
private ThreadPoolTaskExecutor pipelineThreadPool;
/**
* 异步处理输入的上下文数据
*
* @param context 上下文数据
* @param callback 处理完成的回调
*/
public void acceptAsync(PipelineContext context, BiConsumer<PipelineContext, Boolean> callback) {
pipelineThreadPool.execute(() -> {
boolean success = acceptSync(context);
if (callback != null) {
callback.accept(context, success);
}
});
}
Общая обработка
Например, мы хотим записать время обработки каждого конвейера и распечатать соответствующие журналы до и после обработки. Затем мы можем предоставить два общих ContextHandler, которые размещаются в начале и в конце каждого конвейера:
@Component
public class CommonHeadHandler implements ContextHandler<PipelineContext> {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public boolean handle(PipelineContext context) {
logger.info("管道开始执行:context={}", JSON.toJSONString(context));
// 设置开始时间
context.setStartTime(LocalDateTime.now());
return true;
}
}
@Component
public class CommonTailHandler implements ContextHandler<PipelineContext> {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public boolean handle(PipelineContext context) {
// 设置处理结束时间
context.setEndTime(LocalDateTime.now());
logger.info("管道执行完毕:context={}", JSON.toJSONString(context));
return true;
}
}
Общие головной и хвостовой процессоры можно разместить в таблице маршрутизации, но каждый раз при добавлении нового PipelineContext это кажется излишним — модифицируем напрямую метод acceptSync в исполнителе конвейера PipelineExecutor:
@Component
public class PipelineExecutor {
......
@Autowired
private CommonHeadHandler commonHeadHandler;
@Autowired
private CommonTailHandler commonTailHandler;
public boolean acceptSync(PipelineContext context) {
......
// 【通用头处理器】处理
commonHeadHandler.handle(context);
// 管道是否畅通
boolean lastSuccess = true;
for (ContextHandler<? super PipelineContext> handler : pipeline) {
try {
// 当前处理器处理数据,并返回是否继续向下处理
lastSuccess = handler.handle(context);
} catch (Throwable ex) {
lastSuccess = false;
logger.error("[{}] 处理异常,handler={}", context.getName(), handler.getClass().getSimpleName(), ex);
}
// 不再向下处理
if (!lastSuccess) { break; }
}
// 【通用尾处理器】处理
commonTailHandler.handle(context);
return lastSuccess;
}
}
Суммировать
Благодаря конвейерному режиму мы значительно снижаем степень связанности системы и улучшаем степень сплоченности и масштабируемости:
-
ModelService отвечает только за обработку запросов HSF и не заботится о конкретной бизнес-логике.
-
PipelineExecutor выполняет только выполнение работы, не заботясь о конкретных деталях конвейера.
-
Каждый ContextHandler отвечает только за свою часть бизнес-логики, ему не нужно знать структуру конвейера и он отделен от бизнес-логики других ContextHandler.
-
При добавлении, удалении или замене подэтапов вам нужно только управлять конфигурацией таблицы маршрутизации вместо изменения исходного кода вызова.