Можно сказать, что концепция микросервисов открывает новый мир для программирования, дает много преимуществ, но также усложняет некоторые проблемы, с которыми было легко справиться в прошлом, такие как кэширование, транзакции и запланированные задачи. Кэш можно использовать для промежуточного ПО, такого как redis, memcached и т. д. Существует множество распределенных транзакционных фреймворков для решения транзакций, а также есть распределенные решения для запланированных задач, таких как кварц, эластичная работа и т. д. Сегодня я хочу поговорить о запланированных задания.
Теперь, когда существует зрелая структура распределенных задач с временными параметрами, я хочу поговорить не об использовании другого дизайна для достижения той же функции, а о решении проблемы распределенных задач с временными параметрами с другой точки зрения.
источник проблемы
Эта проблема возникает из-за небольшой функции. У нас есть микросервис, который отправляет короткие сообщения, и нам нужно получить отчет о состоянии короткого сообщения. Отчет о состоянии не синхронен для отправки короткого сообщения. Короткое сообщение отправляется поставщику услуг. , и поставщик услуг может сгенерировать его только после отправки оператору для отправки.Отчет о состоянии, поэтому есть определенная задержка, его нужно получать асинхронно, а интерфейс, предоставляемый поставщиком услуг, имеет ограничение по частоте, поэтому необходимо выполнить запланированное задание, и оно должно быть выполнено в одной точке, тогда возникает проблема, потому что эту функцию мне нужно ввести в структуру задач с расписанием?Это всегда кажется немного излишним.
Раньше мы использовали и кварц, и эластичный джоб для обработки задач по расписанию, но вводить фреймворк только для такой маленькой функции не стоит, да и настройка занимает много времени.
Например, для использования кварца нужно создать кучу таблиц базы данных, но в таблице хранится информация только об одной задаче.
Давайте использовать эластичную работу, но также использовать zookeeper, Даже использование облегченной версии требует много настроек, что намного дольше, чем время, которое я потратил на написание бизнеса.
Я просто хочу просто написать логику! ! !
решение
Разговор о распределенных решениях, как правило, неотделим от промежуточного программного обеспечения, подумайте о распределенном решении для решения проблемы веб-сокета в прошлый раз (см.Решение WebSocket в микросервисной архитектуре Spring Cloud) Используемый Spring Cloud Stream, вероятно, имеет идею:
- Мне нужен центр распределения задач, предназначенный для запуска запланированных задач
- Если другим службам необходимо запускать запланированные задачи, получайте определенные триггерные сообщения.
- Когда выполнение задачи будет завершено, отправьте сообщение с подтверждением завершения задачи в центр распределения задач.
- Обеспечьте общедоступный загрузочный стартер Spring для стороны выполнения задачи.Шаги 2 и 3 ночью, почти все, что нужно закодировать, — это сама бизнес-логика.
детальный дизайн
Согласно решению на предыдущем шаге, некоторые детали должны быть подтверждены, а также некоторые особые случаи, например, запланированные задачи могут выполняться одним экземпляром в микросервисном кластере, или может быть коллективное выполнение (например, обновление кэш в памяти), и может быть выполнение раздела.
Клиент (сервер, которому нужны запланированные задачи) должен установить следующие очереди сообщений:
- Очередь, полученная кластером, по одной на каждый экземпляр микрослужбы, и каждый экземпляр микрослужбы получит одно и то же сообщение.
- Отдельно полученные очереди, по одной для каждого кластера приложений, чтобы гарантировать, что сообщения потребляются только одним экземпляром.
- Очереди, полученные разделом, по одному для каждого раздела, чтобы гарантировать, что только один экземпляр в разделе потребляет
Клиент и сервер должны подтвердить выполнение запланированной задачи с помощью уникального идентификатора задачи.
Сервер (микросервис распределения задач) должен отправлять сообщения в разные очереди в зависимости от ситуации. Spring Cloud Stream нельзя использовать напрямую, и необходимо использовать rabbitmq.
Сам сервер тоже распределенный, поэтому для запуска задач нужен таймфреймворк, здесь я выбрал кварц.
Код
Не буду повторять базовые знания Spring Cloud Stream,Решение WebSocket в микросервисной архитектуре Spring CloudЕсть пояснения.
Сервис распределения задач по расписанию
Определение запланированных задач
data class ScheduleTask(
/** 任务的id,全局唯一,与客户端的taskId完全匹配 */
var taskId: String = "",
/** 定时任务的cron 表达式 */
var cron: String = "",
/** 关联应用 */
var appId: Int = 0,
/** 任务描述 */
var description: String = "",
/** 接收任务的分区 */
var zone: String? = null,
/** 调度方式,广播到集群或单例执行,默认单例 */
var dispatchMode: DispatchMode = DispatchMode.Singleton,
/** 是否启用 */
var enabled: Boolean = true,
/** 任务的数据库记录 id,自增 */
var id: Int = -1)
планирование задач
Планирование задач с кварцем
private fun scheduleJob(task: ScheduleTask) {
val job = JobBuilder.newJob(TaskEmitterJob::class.java)
.withIdentity(task.taskId, task.appId.toString())
.withDescription(task.description)
.storeDurably()
.requestRecovery()
.usingJobData("id", task.id)
.usingJobData("taskId", task.taskId)
.build()
val trigger = TriggerBuilder.newTrigger()
.withIdentity(task.taskId, task.appId.toString())
.withSchedule(CronScheduleBuilder.cronSchedule(task.cron))
.forJob(job)
.build()
scheduler.addJob(job, true, true)
if (scheduler.checkExists(trigger.key)) {
scheduler.rescheduleJob(trigger.key, trigger)
} else {
scheduler.scheduleJob(trigger)
}
}
ScheduleTask является постоянным, вставляйте задачи в кварц одновременно при вставке, обновляйте кварц при обновлении и удаляйте одновременно при удалении
Задание триггера кварца
class TaskEmitterJob : Job {
companion object {
private val log = LogFactory.getLog(TaskEmitterJob::class.java)
}
override fun execute(context: JobExecutionContext) {
try {
val taskId = context.jobDetail.jobDataMap["taskId"] as String
log.info("任务分发:$taskId")
val service = ScheduleCenterApplication.context.getBean(ScheduleTaskService::class.java)
service.launch(taskId)
} catch (e: Exception) {
log.error("任务失败$[taskId]", e)
}
}
}
отправить логику rabbitmq
/**
* 发布定时任务事件
*/
fun launch(task: ScheduleTask) {
val exchange = when (task.dispatchMode) {
Cluster -> "aegisScheduleCluster"
Singleton -> "aegisScheduleSingleton"
}
val routingKey = when (task.dispatchMode) {
Cluster -> exchange
Singleton -> "$exchange.${task.appName}"
}
val executeTaskInfo = ScheduleTaskInfo(task.taskId, task.appName!!)
amqpTemplate.convertAndSend(exchange, routingKey,
executeTaskInfo)
taskExecuteRecordDAO.save(
TaskExecuteRecord(executeTaskInfo.uid, task.id, Date())
)
}
Реализация весеннего загрузчика на стороне клиента
Определите интерфейс временной задачи, если интерфейс реализован в проекте и реализация объявлена как bean-компонент, определение временной задачи может быть завершено.
@FunctionalInterface
interface ScheduledJob {
/**
* 执行定时任务
*/
fun execute(properties: Map<String, Any>)
/**
* 获取定时任务id
* @return 定时任务id,对应任务分发中心ScheduleTask的taskId
*/
fun getId(): String
}
получить задание
/**
* 接收单例任务
*/
@StreamListener(SINGLETON_INPUT)
fun acceptGroupTask(taskInfo: ScheduleTaskInfo) {
if (taskInfo.app == application) {
val receivedTime = Date()
val job = jobsProvider.ifAvailable?.firstOrNull {
it.getId() == taskInfo.id
}
job?.execute(taskInfo.properties ?: mapOf())
singletonOutput.send(GenericMessage(
ConfirmInfo(taskInfo.id, taskInfo.uid, job != null, receivedTime, Date())
))
}
}
Разница между общей задачей выполнения кластера и задачей singleton только в настройке потока, в одном нужно объявить группу привязки, а в другом нет, это относится к категории знаний Spring Cloud Stream. Вы можете прочитать официальные документы самостоятельно или проверить документы, о которых я упоминал ранее.Если вам что-то непонятно, вы можете пообщаться со мной в частном порядке.
объявление потока потокового события
/**
* 定时任务信息的事件流接口
* @author 吴昊
* @since 0.1.0
*/
interface AegisScheduleClient {
companion object {
const val CLUSTER_INPUT = "aegisScheduleClusterInput"
const val SINGLETON_INPUT = "aegisScheduleSingletonInput"
const val CONFIRM_OUTPUT = "aegisScheduleGroupOutput"
}
/**
*
* @return
*/
@Input(CLUSTER_INPUT)
fun scheduleInput(): SubscribableChannel
/**
*
* @return
*/
@Input(SINGLETON_INPUT)
fun singletonScheduleInput(): SubscribableChannel
/**
*
* @return
*/
@Output(CONFIRM_OUTPUT)
fun confirmOutput(): MessageChannel
}
Наконец, добавьте код получения сообщения подтверждения сервера:
@StreamListener(CONFIRM_INPUT)
fun acceptGroupTask(confirmInfo: ConfirmInfo) {
LOG.info("接收到确认消息:$confirmInfo")
scheduleTaskService.confirm(confirmInfo)
}
Основной код выложен, да и общая идея тоже очень проста, есть еще много областей, которые нужно оптимизировать, например сбой пуша сообщения, или подтверждение того, что сообщение не доставлено и т.д., чего мало влияние на общий дизайн.
Таким образом, если вам нужно добавить запланированные задачи на стороне микросервера, вам нужно только
- Ввести стартер
- Реализовать интерфейс ScheduledJob
- Добавить задачу в центр планирования задач
Что касается добавления задач в таск-центр, то с кодом темы легко реализовать простой интерфейс управления, ведь это просто ввод нескольких полей.
Напоследок скриншот интерфейса управления прилагается:
список заданий
Детали миссии
Другие мои статьи:
Решение WebSocket в микросервисной архитектуре Spring Cloud
Mybatis de-xml: я больше не хочу писать xml
Кэш Spring Security OAuth2 использует обработку сериализации Джексона