Узнайте подробности в XXL-JOB

Java

предисловие

Скоро Национальный день, и вот-вот начнутся каникулы.Автор до сих пор очень рад.С Национальным праздником!

Без лишних слов, давайте сразу к делу.

я верю всемXXL-JOBЯ очень хорошо знаю, поэтому в этой статье не слишком много рассказывается об исходном коде, основное внимание уделяетсяНесколько моментов, которые приходят на ум в процессе просмотра исходного кода, не обязательно все в порядке, пожалуйста, критикуйте и поправляйте меня.

Введение в XXL-JOB

  • XXL-JOBЭто легкая распределенная платформа планирования задач, основными целями которой являются быстрая разработка, простота обучения, малый вес и легкое расширение. Теперь с открытым исходным кодом и подключен к линейкам онлайн-продуктов многих компаний, из коробки.
  • XXL-JOBОн разделен на центр планирования, исполнитель и центр обработки данных.Центр планирования отвечает за управление задачами и планирование, управление исполнителем, управление журналом и т. д. Исполнитель отвечает за выполнение задачи и обратный вызов результата выполнения.

Планирование задач — реализация «Колеса времени»

колесо времени

колесо времени отNettyсерединаHashedWheelTimer, представляет собой кольцевую структуру, которую можно сравнить с часами.bucket,КаждыйbucketНесколько задач могут быть сохранены наListСохраняйте все задачи, которые должны быть выполнены в данный момент, и в то же время указатель вращается одна за другой по прошествии времени и выполняет соответствующиеbucketпо всем поставленным задачам. миссия пройденапо модулюРешите, что нужно поставитьbucket. иHashMapПринцип аналогичен,newTaskсоответствоватьput,использоватьListдля разрешения хеш-конфликтов.

Возьмите приведенный выше рисунок в качестве примера, предполагая, чтоbucketЕсли это 1 секунда, то период времени, представленный одним оборотом указателя, равен 8 с. Предполагая, что текущий указатель указывает на 0, необходимо запланировать выполнение задачи через 3 с. Очевидно, его следует добавить в квадрат (0 + 3 = 3). Это может быть выполнено после прохождения 3 с раз; если задача должна быть выполнена после 10 с, она должна дождаться, пока указатель завершит раунд нуля и 2, прежде чем выполнять ее, поэтому она должна поставить в 2, и в то же времяround(1)Сохранить в задаче. Выполнять только при проверке сроков выполнения задачround0,bucketпо другим задачамroundВычесть 1.

Конечно, есть и оптимизированная реализация «иерархического колеса времени», см. https://cnkirito.moe/timer/.

«Колесо времени» в XXL-JOB

  • Метод планирования в XXL-JOB начинается сQuartzЭто стало самостоятельно разработанным методом планирования, очень похожим на колесо времени, которое можно понимать как 60bucketи каждыйbucketна 1 секунду, но не болееroundКонцепция чего-либо.

  • Подробности см. на рисунке ниже.

  • За планирование задач в XXL-JOB отвечают два потока, а именноringThreadиscheduleThread, который работает следующим образом.

1. ScheduleThread: прочитайте информацию о задаче и заранее прочитайте будущее5sЗадача, которая должна быть запущена, помещается в колесо времени. 2, кольцевая нить: для токаbucketи предыдущийbucketЗадачи извлекаются и выполняются.

  • Давайте посмотрим на исходный код ниже, почему он называется «колесо времени класса», ключевой код аннотирован, обратите внимание на просмотр.
// 环状结构
private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();

// 任务下次启动时间(单位为秒) % 60
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

// 任务放进时间轮
private void pushTimeRing(int ringSecond, int jobId){
        // push async ring
        List<Integer> ringItemData = ringData.get(ringSecond);
        if (ringItemData == null) {
            ringItemData = new ArrayList<Integer>();
            ringData.put(ringSecond, ringItemData);
        }
        ringItemData.add(jobId);
    }
// 同时取两个时间刻度的任务
List<Integer> ringItemData = new ArrayList<>();
int nowSecond = Calendar.getInstance().get(Calendar.SECOND);  
// 避免处理耗时太长,跨过刻度,向前校验一个刻度;
for (int i = 0; i < 2; i++) {
	List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
	if (tmpData != null) {
		ringItemData.addAll(tmpData);
	}
}
// 运行
for (int jobId: ringItemData) {
	JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);
}

Алгоритм хеширования в согласованной хеш-маршрутизации

  • Все это знают,XXL-JOBПри выполнении задачи конкретный исполнитель, на котором выполняется задача, определяется в соответствии со стратегией маршрутизации, одной из стратегий является стратегия консистентного хэширования (исходный код находится в ExecutorRouteConsistentHash.java), что естественно приходит на ум.Последовательный алгоритм хеширования.
  • Последовательный алгоритм хешированияЧтобы решить проблему балансировки нагрузки в распределенных системах, можно использовать алгоритм хеширования, чтобы фиксированная часть запросов попадала на один и тот же сервер, так что каждый сервер обрабатывает фиксированную часть запросов (и поддерживает информацию о эти запросы), что играет роль в балансировке нагрузки.
  • Алгоритм общего остаточного хэша (хеш (например, идентификатор пользователя)% номер сервера) имеет плохую масштабируемость.Когда сервер добавляется или отключается, отношение сопоставления между идентификатором пользователя и сервером часто не работает. Последовательное хеширование использует хеш-кольца для его улучшения.
  • Последовательный алгоритм хешированияНа практике, когда число серверных узлов относительно невелико, возникает проблема согласованного перекоса хэшей, упомянутая в предыдущем разделе.Одно из решений — добавить больше машин, но добавление машин стоит дорого, а затем добавить больше машин.виртуальный узел.
  • Конкретные принципы см. на странице https://www.jianshu.com/p/e968c081f563.
  • На следующем рисунке показано хеш-кольцо с виртуальными узлами, где ip1-1 — виртуальный узел ip1, ip2-1 — виртуальный узел ip2, а ip3-1 — виртуальный узел ip3.

видимый, ключом к согласованному алгоритму хеширования являетсяХэш-алгоритм,гарантироватьвиртуальный узелиРезультат хешированияРавномерность , а равномерность можно понимать какУменьшить конфликт хэшей, пожалуйста, обратитесь к пункту знаний о конфликте хэшейПосмотрите на [Hash] из HashMap, словарь Redis. . ..

  • Хеш-функция согласованного хэша в XXL-JOB выглядит следующим образом.
// jobId转换为md5
// 不直接用hashCode() 是因为扩大hash取值范围,减少冲突
byte[] digest = md5.digest();

// 32位hashCode
long hashCode = ((long) (digest[3] & 0xFF) << 24)
	| ((long) (digest[2] & 0xFF) << 16)
	| ((long) (digest[1] & 0xFF) << 8)
	| (digest[0] & 0xFF);

long truncateHashCode = hashCode & 0xffffffffL;
  • Увидев хэш-функцию на картинке выше, я задумалсяHashMapХэш-функция
f(key) = hash(key) & (table.length - 1) 
// 使用>>> 16的原因,hashCode()的高位和低位都对f(key)有了一定影响力,使得分布更加均匀,散列冲突的几率就小了。
hash(key) = (h = key.hashCode()) ^ (h >>> 16)
  • Точно так же старший и младший биты кода md5 jobId влияют на результат хеширования, что снижает вероятность конфликта хэшей.

Реализация сегментированных задач — поддержание контекста потока

  • Задача шардинга XXL-JOB реализует распределенное выполнение задач, что собственно и является предметом исследования автора.Многие запланированные задачи в ежедневной разработке выполняются на одной машине.Для последующих задач с большими объемами данных лучше всего иметь распределенное решение.

  • Стратегию маршрутизации для шардирования задач предлагает автор исходного кодаТрансляция осколкаПонятие , я сначала немного запутался, но после прочтения исходного кода стало понятнее.

  • Те, кто видел исходный код, наверняка столкнулись с таким небольшим эпизодом, почему не реализована стратегия маршрутизации? Как показано ниже.

public enum ExecutorRouteStrategyEnum {

    FIRST(I18nUtil.getString("jobconf_route_first"), new ExecutorRouteFirst()),
    LAST(I18nUtil.getString("jobconf_route_last"), new ExecutorRouteLast()),
    ROUND(I18nUtil.getString("jobconf_route_round"), new ExecutorRouteRound()),
    RANDOM(I18nUtil.getString("jobconf_route_random"), new ExecutorRouteRandom()),
    CONSISTENT_HASH(I18nUtil.getString("jobconf_route_consistenthash"), new ExecutorRouteConsistentHash()),
    LEAST_FREQUENTLY_USED(I18nUtil.getString("jobconf_route_lfu"), new ExecutorRouteLFU()),
    LEAST_RECENTLY_USED(I18nUtil.getString("jobconf_route_lru"), new ExecutorRouteLRU()),
    FAILOVER(I18nUtil.getString("jobconf_route_failover"), new ExecutorRouteFailover()),
    BUSYOVER(I18nUtil.getString("jobconf_route_busyover"), new ExecutorRouteBusyover()),
    // 说好的实现呢???竟然是null
    SHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"), null);
  • Я продолжил расследование и пришел к выводу.Расскажу потихоньку.Во-первых,какой параметр выполнения задачи шардинга передается? СмотретьXxlJobTrigger.triggerКусок кода в функции.
...
// 如果是分片路由,走的是这段逻辑
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
                && group.getRegistryList() != null && !group.getRegistryList().isEmpty()
                && shardingParam == null) {
            for (int i = 0; i < group.getRegistryList().size(); i++) {
	            // 最后两个参数,i是当前机器在执行器集群当中的index,group.getRegistryList().size()为执行器总数
                processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
            }
        } 
...
  • Параметры передаются исполнителю через RPC собственной разработки, и исполнитель отвечает за выполнение задачи.JobThread.run, Вы видите следующий код.
// 分片广播的参数比set进了ShardingUtil
ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));
...
// 将执行参数传递给jobHandler执行
handler.execute(triggerParamTmp.getExecutorParams())
  • см. далееShardingUtil, Я открыл тайну, пожалуйста, посмотрите на код.
public class ShardingUtil {
	// 线程上下文
    private static InheritableThreadLocal<ShardingVO> contextHolder = new InheritableThreadLocal<ShardingVO>();
	// 分片参数对象
    public static class ShardingVO {

        private int index;  // sharding index
        private int total;  // sharding total
		// 次数省略 get/set
    }
	// 参数对象注入上下文
    public static void setShardingVo(ShardingVO shardingVo){
        contextHolder.set(shardingVo);
    }
	// 从上下文中取出参数对象
    public static ShardingVO getShardingVo(){
        return contextHolder.get();
    }

}
  • Очевидно, в задаче шардингаShardingJobHandlerПараметры фрагментации в контексте потока вынесены, и вот код, который нужно поставить~
@JobHandler(value="shardingJobHandler")
@Service
public class ShardingJobHandler extends IJobHandler {

	@Override
	public ReturnT<String> execute(String param) throws Exception {

		// 分片参数
		ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
		XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal());

		// 业务逻辑
		for (int i = 0; i < shardingVO.getTotal(); i++) {
			if (i == shardingVO.getIndex()) {
				XxlJobLogger.log("第 {} 片, 命中分片开始处理", i);
			} else {
				XxlJobLogger.log("第 {} 片, 忽略", i);
			}
		}

		return SUCCESS;
	}

}
  • Отсюда следует, что распределенная реализация основана на параметрах шардированияindexиtotalПроще говоря, эта идентификация дается исполнительным механизмом, дифференцированным в соответствии с идентификатором данных задачи или может быть реализована логическая распределенная операция.
  • Не по теме: Что касается того, почему используется внешняя инъекция параметров шардинга, то это не напрямуюexecuteперечислить?

1. Это может быть связано с тем, что эти два параметра используются только в задачах шардинга. 2. IJobHandler имеет только параметры типа String

Мысли после прочтения исходного кода

  • 1. После просмотра исходного кода на этот раз, цели дизайна XXL-JOB действительно соответствуютБыстрая разработка, простота в освоении, легкость и простота расширения.
  • 2. Что касается самостоятельно разработанного RPC, особого рассмотрения нет. Конкретный доступ должен учитывать структуру RPC компании.
  • 3. Дано авторомQuartzОтсутствие планирования, я должен продолжать понимать.
  • 4. Стоит изучить совместимость многих нештатных ситуаций, таких как простои, сбои и тайм-ауты в фреймворке.
  • 5. Роллинг журнал и реализация системы журналов требуют дальнейшего понимания.

использованная литература