Как сеть цен на жилье использует структуру распределенной работы эластичной работы

задняя часть GitHub Spring открытый источник

Что такое эластичная работа?

Elastic-Job — это решение для распределенного планирования, состоящее из двух независимых подпроектов: Elastic-Job-Lite и Elastic-Job-Cloud.

Elastic-Job-Lite позиционируется как легкое и децентрализованное решение, предоставляющее услуги по координации распределенных задач в виде jar-пакетов; Elastic-Job-Cloud адаптирует решение Mesos Framework собственной разработки, которое дополнительно обеспечивает управление ресурсами и распространение приложений. и изоляция процесса.

Адрес официального сайта:elasticjob.io/

Гитхаб:GitHub.com/эластик-работа/…

Зачем использовать Elastic-Job

В настоящее время наша компания использует временный исполнитель задач на базе Linux Crontab.

Существуют следующие проблемы:

  • Невозможность централизованно управлять задачами
  • Невозможно масштабировать по горизонтали
  • Нет работы с визуальным интерфейсом
  • Есть единая точка отказа

Помимо решения Linux Crontab на java, есть еще Quartz, но в Quartz отсутствует функция распределенного параллельного планирования.

Проблемы тоже очевидны:

  • Когда мой проект представляет собой одно приложение, я запускаю запланированную задачу на основе Quartz, которая может успешно работать.
  • Когда мой проект загружается и расширяется до 3 узлов, задачи на 3 узлах будут выполняться одновременно, и данные будут перепутаны.
  • В то же время, чтобы не было проблем с данными, необходимо вводить распределенные блокировки для планирования, что увеличивает сложность.

Как с этим бороться?

1. Самостоятельно разработанный фреймворк

В этом случае вам может понадобиться разработать структуру планирования, которая может удовлетворить бизнес-потребности компании. Стоимость высока и не рекомендуется.

Я также подумывал о том, чтобы написать его раньше. У меня есть идея, но она еще не началась. Пока структура планирования является проблемой планирования, как Elastic-Job, она делает очень хорошую работу. Она позволяет вам определить правила сегментирования, а затем в соответствии с вами. Данные данного слайса планируются для вас, и вы контролируете, какие данные обрабатывает каждый узел.

Если отдел примет этот метод и не будет писать распределение данных, то я думаю, что самый простой способ — использовать для этого очереди сообщений.

Zookeeper используется для планирования, хранения данных задач и определения общего интерфейса, который разделен на две части следующим образом:

public interface Job {
    void read();
    void process(Object data);
}

Затем пользователь читает данные, которые будут обработаны путем реализации вышеуказанного интерфейса и обрабатывает распределенные данные в процессе

Что касается распределения, задача может быть помечена аннотацией для использования очереди, или может быть использована общая, чтобы несколько потребителей могли потреблять одновременно, даже если один из них умрет, это не повлияет на всю задачу, и нет необходимости рассматривать отказоустойчивость.

Но что нужно контролировать, так это метод чтения, который должен выполняться только одним узлом, иначе данные будут распространяться повторно.

Это всего лишь простая идея.Конечно, есть задачи управления веб-страницей, а также задачи, которые можно выполнять вручную.

2. Выберите решение с открытым исходным кодом

TBSchedule: ранняя распределенная система планирования задач с открытым исходным кодом Али. Код немного устарел и использует таймер вместо пула потоков для выполнения планирования задач. Хорошо известно, что таймеры плохо обрабатывают исключительные ситуации. Более того, тип задания TBSchedule относительно прост и может быть только режимом получения/обработки данных. Серьезный недостаток документации.

Spring Batch: Spring Batch — это легкая, полностью ориентированная на Spring среда пакетной обработки, которую можно применять к крупномасштабным системам обработки данных корпоративного уровня. Spring Batch основан на POJO и хорошо известной среде Spring, что упрощает для разработчиков доступ к службам корпоративного уровня и их использование. Spring Batch может предоставить большое количество функций повторяемой обработки данных, включая важные функции, такие как ведение журнала/отслеживание, управление транзакциями, перезапуск задания статистики обработки заданий, пропуск и управление ресурсами.

Elastic-Job: внутренние продукты с открытым исходным кодом, документация на китайском языке, быстрая, простая в использовании, полные функции, активное сообщество, активен архитектор Dangang Zhang Liang, в настоящее время уделяющий больше времени открытому исходному коду.

Почему стоит выбрать Elastic-Job?

  • Координация распределенного планирования
  • Упругое расширение и сжатие
  • отказоустойчивость
  • Повторный запуск пропущенного задания
  • Согласованность сегментов заданий гарантирует, что в распределенной среде существует только один исполняемый экземпляр одного и того же сегмента.
  • Самодиагностика и устранение проблем, вызванных распределенной нестабильностью
  • Поддержка параллельного планирования
  • Поддержка операций жизненного цикла задания
  • Богатые типы заданий
  • Интеграция с Spring и предоставление пространства имен
  • Платформа для эксплуатации и обслуживания

Знакомство с типами работ

Простой: простая работа, обычно используемая, что означает простую реализацию без какой-либо инкапсуляции. Необходимо реализовать интерфейс SimpleJob. Этот интерфейс предоставляет только один метод для переопределения, и этот метод будет выполняться периодически. Аналогичен собственному интерфейсу Quartz, но предоставляет такие функции, как эластичное масштабирование и сегментирование.

public class MyElasticJob implements SimpleJob {
    
    @Override
    public void execute(ShardingContext context) {
        switch (context.getShardingItem()) {
            case 0: 
                // do something by sharding item 0
                break;
            case 1: 
                // do something by sharding item 1
                break;
            case 2: 
                // do something by sharding item 2
                break;
            // case n: ...
        }
    }
}

DataFlow: тип Dataflow используется для обработки потока данных и требует реализации интерфейса DataflowJob. Этот интерфейс предоставляет два метода переопределения, которые используются для выборки (fetchData) и обработки (processData) данных соответственно.

public class MyElasticJob implements DataflowJob<Foo> {
    
    @Override
    public List<Foo> fetchData(ShardingContext context) {
        switch (context.getShardingItem()) {
            case 0: 
                List<Foo> data = // get data from database by sharding item 0
                return data;
            case 1: 
                List<Foo> data = // get data from database by sharding item 1
                return data;
            case 2: 
                List<Foo> data = // get data from database by sharding item 2
                return data;
            // case n: ...
        }
    }
    
    @Override
    public void processData(ShardingContext shardingContext, List<Foo> data) {
        // process data
        // ...
    }
}

Сценарий: задание типа сценария означает задание типа сценария и поддерживает все типы сценариев, такие как оболочка, python и perl. Просто настройте scriptCommandLine через консоль или код, кодирование не требуется. Путь к сценарию выполнения может содержать параметры. После передачи параметров среда задания автоматически добавит последний параметр в качестве информации о времени выполнения задания.

На самом деле, я предлагаю добавить тип задачи, которая является конвейерной задачей, поэтому я также специально упомянул проблемы:

GitHub.com/эластик-работа/…

在特定的业务需求下,A任务执行完之后,需要执行B任务,以此类推,这种具有依赖性的流水式的任务。

在目前可以将这些任务合在一起,通过代码调用的方式来达到效果。

但我希望能增加这样一个功能,比如加一个配置,job-after="com.xxx.job.XXXJob" 在执行完这个任务之后,自动调用另一个任务BB,BB任务只需要配置任务信息,把cron去掉就可以,因为BB是依靠别的任务触发执行的。

当然这些任务必须在同一个zk的命名空间下,如果能支持夸命名空间就更好了

这样就能达到,流水式的任务操作了,并且每个任务可以用不同的分片key

начать использовать

1. Я не буду объяснять, как собрать фреймворк и как его настроить, официальная документация на сайте однозначно лучше, чем то, что я написал, вообще у фреймворков с открытым исходным кодом есть демки, их можно скачать и импортировать в IDE для запуска.

демонстрационный адрес:GitHub.com/эластик-работа/…

2. Введите некоторый опыт использования

  • Рекомендуется делить задачи по продуктам.Один продукт соответствует проекту одной задачи.Когда команда относительно большая, одна команда может отвечать за один продукт, чтобы он не смешивался с другими.
  • В описании задания должно быть написано понятно, для чего оно используется, в задании на настройку описана конфигурация, заполните понятно
/**
 * 用户维度统计任务<br>统计出用户的房产,置换,贷款等信息
 * @author yinjihuan
 */
public class UserStatJob implements SimpleJob {

	private Logger logger = LoggerFactory.getLogger(UserStatJob.class);
	
	@Autowired
	private EnterpriseProductUserService enterpriseProductUserService;
	
	@Autowired
	private UserStatService userStatService;
	
	@Autowired
	private HouseInfoService houseInfoService;
	
	@Autowired
	private HouseSubstitutionService houseSubstitutionService;
	
	@Autowired
	private LoanApplyService loanApplyService;
	
	@Override
	public void execute(ShardingContext shardingContext) {
		logger.info("开始执行UserStatJob");
		long total = enterpriseProductUserService.queryCount();
		int pages = PageBean.calcPages(total, 1000);
		for (int i = 1; i <= pages; i++) {
			List<EnterpriseProductUser> users = enterpriseProductUserService.queryByPage(i, 1000);
			for (EnterpriseProductUser user : users) {
				try {
					processStat(user);
				} catch (Exception e) {
					logger.error("用户维度统计任务异常", e);
					DingDingMessageUtil.sendTextMessage("用户维度统计任务异常:" + e.getMessage());
				}
			}
		}
		logger.info("UserStatJob执行结束");
	}
	
	private void processStat(EnterpriseProductUser user) {
		UserStat stat = userStatService.getByUid(user.getEid(), user.getUid());
		Long eid = user.getEid();
		String uid = user.getUid();
		if (stat == null) {
			stat = new UserStat();
			stat.setEid(eid);
			stat.setUid(uid);
			stat.setUserAddTime(user.getAddTime());
			stat.setCity(user.getCity());
			stat.setRegion(user.getRegion());
		}
		stat.setHouseCount(houseInfoService.queryCountByEidAndUid(eid, uid));
		stat.setHousePrice(houseInfoService.querySumMoneyByEidAndUid(eid, uid));
		stat.setSubstitutionCount(houseSubstitutionService.queryCount(eid, uid));
		stat.setSubstitutionMaxPrice(houseSubstitutionService.queryMaxBudget(eid, uid));
		stat.setLoanEvalCount(loanApplyService.queryUserCountByType(eid, uid, 2));
		stat.setLoanEvalMaxPrice(loanApplyService.queryMaxEvalMoney(eid, uid));
		stat.setLoanCount(loanApplyService.queryUserCountByType(eid, uid, 1));
		stat.setModifyDate(new Date());
		userStatService.save(stat);
	}

}
 <!-- 用户统计任务 每天1点10分执行 -->
 <job:simple id="userStatJob" class="com.fangjia.job.fsh.job.UserStatJob" registry-center-ref="regCenter"
    	 sharding-total-count="1" cron="0 10 1 * * ?" sharding-item-parameters=""
    	 failover="true" description="【房生活】用户维度统计任务,统计出用户的房产,置换,贷款等信息 UserStatJob"
    	 overwrite="true" event-trace-rdb-data-source="elasticJobLog" job-exception-handler="com.fangjia.job.fsh.handler.CustomJobExceptionHandler">
    	 
    	  <job:listener class="com.fangjia.job.fsh.listener.MessageElasticJobListener"></job:listener>
    	  
 </job:simple>
  • Настройте единый прослушиватель для каждой задачи, чтобы уведомлять о выполнении и завершении задачи, что может быть текстовыми сообщениями, электронными письмами и т. д. Я использую робота DingTalk для отправки сообщений в DingTalk.
/**
 * 作业监听器, 执行前后发送钉钉消息进行通知
 * @author yinjihuan
 */
public class MessageElasticJobListener implements ElasticJobListener {
    
    @Override
    public void beforeJobExecuted(ShardingContexts shardingContexts) {
    	String date = DateUtils.date2Str(new Date());
    	String msg = date + " 【FSH-" + shardingContexts.getJobName() + "】任务开始执行====" + JsonUtils.toJson(shardingContexts);
    	DingDingMessageUtil.sendTextMessage(msg);
    }
    
    @Override
    public void afterJobExecuted(ShardingContexts shardingContexts) {
    	String date = DateUtils.date2Str(new Date());
    	String msg = date + " 【FSH-" + shardingContexts.getJobName() + "】任务执行结束====" + JsonUtils.toJson(shardingContexts);
    	DingDingMessageUtil.sendTextMessage(msg);
    }

}
  • Вы можете определить аннотацию для каждого класса задачи.Аннотация используется для определения того, кто разработал задачу, а затем кому будет отправлено соответствующее сообщение DingTalk.Я лично рекомендую вам создать группу и все будут в ней, потому что если вы один Отправьте разработчику, если только его инициатива не очень высока, иначе бесполезно.Лично рекомендую выложить в группу, чтобы руководитель сказал кто есть кто, ваша задача неверна, и пошли разбираться причина. Я разместил его здесь единообразно, и нет определенных аннотаций.

  • Обработка исключений задач может обрабатывать исключения в задачах.Помимо записи журналов, также уведомляет, отправляя сообщения DingTalk в едином пакете.Вы можете узнать, есть ли исключение в задаче в режиме реального времени.Вы можете проверить мой код выше .

  • Существует также неперехваченное исключение, как уведомить группу, вы можете настроить класс обработки исключений для достижения, по конфигурацииjob-exception-handler="com.fangjia.job.fsh.handler.CustomJobExceptionHandler"

/**
 * 自定义异常处理,在任务异常时使用钉钉发送通知
 * @author yinjihuan
 */
public class CustomJobExceptionHandler implements JobExceptionHandler {
	
	private Logger logger = LoggerFactory.getLogger(CustomJobExceptionHandler.class);
	
	@Override
	public void handleException(String jobName, Throwable cause) {
		logger.error(String.format("Job '%s' exception occur in job processing", jobName), cause);
		DingDingMessageUtil.sendTextMessage("【"+jobName+"】任务异常。" + cause.getMessage());
	}

}
  • Вы можете определить, не работает ли узел задания, проверив, существует ли узел имя_задания\экземпляры\ИД_экземпляра_задания. Этот узел является временным узлом. Если сервер заданий отключится, узел будет удален. Конечно, для мониторинга можно использовать и другие инструменты.

  • Написание задачи должно максимально учитывать горизонтальную масштабируемость.Пример, который я выложил выше, фактически не рассматривается.Это просто простая задача, так как я не использовал shardingParameter для обработки данных соответствующего слайса. На самом деле, рекомендуется рассмотреть его, если время выполнения задачи невелико. Данные для обработки небольшие, и их можно записать, как я. Если можно предсказать, что в будущем будет обрабатываться большой объем данных, а время долгое, лучше всего настроить правила сегментирования и написать код для обработки с помощью сегментирования, чтобы вы могли непосредственно изменить конфигурацию и добавить следующий узел позже.

Для большего обмена технологиями, пожалуйста, обратите внимание на общедоступную учетную запись WeChat: Yuantiandi

image.png