Облегченное решение запланированных задач в микросервисной архитектуре

Микросервисы

Можно сказать, что концепция микросервисов открывает новый мир для программирования, дает много преимуществ, но также усложняет некоторые проблемы, с которыми было легко справиться в прошлом, такие как кэширование, транзакции и запланированные задачи. Кэш можно использовать для промежуточного ПО, такого как redis, memcached и т. д. Существует множество распределенных транзакционных фреймворков для решения транзакций, а также есть распределенные решения для запланированных задач, таких как кварц, эластичная работа и т. д. Сегодня я хочу поговорить о запланированных задания.

Теперь, когда существует зрелая структура распределенных задач с временными параметрами, я хочу поговорить не об использовании другого дизайна для достижения той же функции, а о решении проблемы распределенных задач с временными параметрами с другой точки зрения.

источник проблемы

Эта проблема возникает из-за небольшой функции. У нас есть микросервис, который отправляет короткие сообщения, и нам нужно получить отчет о состоянии короткого сообщения. Отчет о состоянии не синхронен для отправки короткого сообщения. Короткое сообщение отправляется поставщику услуг. , и поставщик услуг может сгенерировать его только после отправки оператору для отправки.Отчет о состоянии, поэтому есть определенная задержка, его нужно получать асинхронно, а интерфейс, предоставляемый поставщиком услуг, имеет ограничение по частоте, поэтому необходимо выполнить запланированное задание, и оно должно быть выполнено в одной точке, тогда возникает проблема, потому что эту функцию мне нужно ввести в структуру задач с расписанием?Это всегда кажется немного излишним.

Раньше мы использовали и кварц, и эластичный джоб для обработки задач по расписанию, но вводить фреймворк только для такой маленькой функции не стоит, да и настройка занимает много времени.

Например, для использования кварца нужно создать кучу таблиц базы данных, но в таблице хранится информация только об одной задаче.

Давайте использовать эластичную работу, но также использовать zookeeper, Даже использование облегченной версии требует много настроек, что намного дольше, чем время, которое я потратил на написание бизнеса.

Я просто хочу просто написать логику! ! !

решение

Разговор о распределенных решениях, как правило, неотделим от промежуточного программного обеспечения, подумайте о распределенном решении для решения проблемы веб-сокета в прошлый раз (см.Решение WebSocket в микросервисной архитектуре Spring Cloud) Используемый Spring Cloud Stream, вероятно, имеет идею:

  1. Мне нужен центр распределения задач, предназначенный для запуска запланированных задач
  2. Если другим службам необходимо запускать запланированные задачи, получайте определенные триггерные сообщения.
  3. Когда выполнение задачи будет завершено, отправьте сообщение с подтверждением завершения задачи в центр распределения задач.
  4. Обеспечьте общедоступный загрузочный стартер Spring для стороны выполнения задачи.Шаги 2 и 3 ночью, почти все, что нужно закодировать, — это сама бизнес-логика.

детальный дизайн

Согласно решению на предыдущем шаге, некоторые детали должны быть подтверждены, а также некоторые особые случаи, например, запланированные задачи могут выполняться одним экземпляром в микросервисном кластере, или может быть коллективное выполнение (например, обновление кэш в памяти), и может быть выполнение раздела.

Клиент (сервер, которому нужны запланированные задачи) должен установить следующие очереди сообщений:

  1. Очередь, полученная кластером, по одной на каждый экземпляр микрослужбы, и каждый экземпляр микрослужбы получит одно и то же сообщение.
  2. Отдельно полученные очереди, по одной для каждого кластера приложений, чтобы гарантировать, что сообщения потребляются только одним экземпляром.
  3. Очереди, полученные разделом, по одному для каждого раздела, чтобы гарантировать, что только один экземпляр в разделе потребляет

Клиент и сервер должны подтвердить выполнение запланированной задачи с помощью уникального идентификатора задачи.

Сервер (микросервис распределения задач) должен отправлять сообщения в разные очереди в зависимости от ситуации. Spring Cloud Stream нельзя использовать напрямую, и необходимо использовать rabbitmq.

Сам сервер тоже распределенный, поэтому для запуска задач нужен таймфреймворк, здесь я выбрал кварц.

Код

Не буду повторять базовые знания Spring Cloud Stream,Решение WebSocket в микросервисной архитектуре Spring CloudЕсть пояснения.

Сервис распределения задач по расписанию

Определение запланированных задач

data class ScheduleTask(
    /** 任务的id,全局唯一,与客户端的taskId完全匹配 */
    var taskId: String = "",
    /** 定时任务的cron 表达式 */
    var cron: String = "",
    /** 关联应用 */
    var appId: Int = 0,
    /** 任务描述 */
    var description: String = "",
    /** 接收任务的分区 */
    var zone: String? = null,
    /**  调度方式,广播到集群或单例执行,默认单例 */
    var dispatchMode: DispatchMode = DispatchMode.Singleton,
    /**  是否启用 */
    var enabled: Boolean = true,
    /** 任务的数据库记录 id,自增 */
    var id: Int = -1) 

планирование задач

Планирование задач с кварцем

private fun scheduleJob(task: ScheduleTask) {
    val job = JobBuilder.newJob(TaskEmitterJob::class.java)
        .withIdentity(task.taskId, task.appId.toString())
        .withDescription(task.description)
        .storeDurably()
        .requestRecovery()
        .usingJobData("id", task.id)
        .usingJobData("taskId", task.taskId)
        .build()
    val trigger = TriggerBuilder.newTrigger()
        .withIdentity(task.taskId, task.appId.toString())
        .withSchedule(CronScheduleBuilder.cronSchedule(task.cron))
        .forJob(job)
        .build()
    scheduler.addJob(job, true, true)
    if (scheduler.checkExists(trigger.key)) {
      scheduler.rescheduleJob(trigger.key, trigger)
    } else {
      scheduler.scheduleJob(trigger)
    }
  }

ScheduleTask является постоянным, вставляйте задачи в кварц одновременно при вставке, обновляйте кварц при обновлении и удаляйте одновременно при удалении

Задание триггера кварца

class TaskEmitterJob : Job {

  companion object {
    private val log = LogFactory.getLog(TaskEmitterJob::class.java)
  }

  override fun execute(context: JobExecutionContext) {
    try {
      val taskId = context.jobDetail.jobDataMap["taskId"] as String
      log.info("任务分发:$taskId")
      val service = ScheduleCenterApplication.context.getBean(ScheduleTaskService::class.java)
      service.launch(taskId)
    } catch (e: Exception) {
      log.error("任务失败$[taskId]", e)
    }
  }

}

отправить логику rabbitmq

/**
   * 发布定时任务事件
   */
  fun launch(task: ScheduleTask) {
    val exchange = when (task.dispatchMode) {
      Cluster   -> "aegisScheduleCluster"
      Singleton -> "aegisScheduleSingleton"
    }
    val routingKey = when (task.dispatchMode) {
      Cluster   -> exchange
      Singleton -> "$exchange.${task.appName}"
    }
    val executeTaskInfo = ScheduleTaskInfo(task.taskId, task.appName!!)
    amqpTemplate.convertAndSend(exchange, routingKey,
        executeTaskInfo)
    taskExecuteRecordDAO.save(
        TaskExecuteRecord(executeTaskInfo.uid, task.id, Date())
    )
  }

Реализация весеннего загрузчика на стороне клиента

Определите интерфейс временной задачи, если интерфейс реализован в проекте и реализация объявлена ​​как bean-компонент, определение временной задачи может быть завершено.

@FunctionalInterface
interface ScheduledJob {

  /**
   * 执行定时任务
   */
  fun execute(properties: Map<String, Any>)

  /**
   * 获取定时任务id
   * @return 定时任务id,对应任务分发中心ScheduleTask的taskId
   */
  fun getId(): String

}

получить задание

/**
   * 接收单例任务
   */
  @StreamListener(SINGLETON_INPUT)
  fun acceptGroupTask(taskInfo: ScheduleTaskInfo) {
    if (taskInfo.app == application) {
      val receivedTime = Date()
      val job = jobsProvider.ifAvailable?.firstOrNull {
        it.getId() == taskInfo.id
      }
      job?.execute(taskInfo.properties ?: mapOf())
      singletonOutput.send(GenericMessage(
          ConfirmInfo(taskInfo.id, taskInfo.uid, job != null, receivedTime, Date())
      ))
    }
  }

Разница между общей задачей выполнения кластера и задачей singleton только в настройке потока, в одном нужно объявить группу привязки, а в другом нет, это относится к категории знаний Spring Cloud Stream. Вы можете прочитать официальные документы самостоятельно или проверить документы, о которых я упоминал ранее.Если вам что-то непонятно, вы можете пообщаться со мной в частном порядке.

объявление потока потокового события

/**
 * 定时任务信息的事件流接口
 * @author 吴昊
 * @since 0.1.0
 */
interface AegisScheduleClient {

  companion object {
    const val CLUSTER_INPUT = "aegisScheduleClusterInput"
    const val SINGLETON_INPUT = "aegisScheduleSingletonInput"
    const val CONFIRM_OUTPUT = "aegisScheduleGroupOutput"
  }

  /**
   *
   * @return
   */
  @Input(CLUSTER_INPUT)
  fun scheduleInput(): SubscribableChannel

  /**
   *
   * @return
   */
  @Input(SINGLETON_INPUT)
  fun singletonScheduleInput(): SubscribableChannel

  /**
   *
   * @return
   */
  @Output(CONFIRM_OUTPUT)
  fun confirmOutput(): MessageChannel

}

Наконец, добавьте код получения сообщения подтверждения сервера:

  @StreamListener(CONFIRM_INPUT)
  fun acceptGroupTask(confirmInfo: ConfirmInfo) {
    LOG.info("接收到确认消息:$confirmInfo")
    scheduleTaskService.confirm(confirmInfo)
  }

Основной код выложен, да и общая идея тоже очень проста, есть еще много областей, которые нужно оптимизировать, например сбой пуша сообщения, или подтверждение того, что сообщение не доставлено и т.д., чего мало влияние на общий дизайн.

Таким образом, если вам нужно добавить запланированные задачи на стороне микросервера, вам нужно только

  1. Ввести стартер
  2. Реализовать интерфейс ScheduledJob
  3. Добавить задачу в центр планирования задач

Что касается добавления задач в таск-центр, то с кодом темы легко реализовать простой интерфейс управления, ведь это просто ввод нескольких полей.

Напоследок скриншот интерфейса управления прилагается:

список заданий

Детали миссии

Другие мои статьи:

Решение WebSocket в микросервисной архитектуре Spring Cloud

Mybatis de-xml: я больше не хочу писать xml

Кэш Spring Security OAuth2 использует обработку сериализации Джексона