предисловие
Скоро Национальный день, и вот-вот начнутся каникулы.Автор до сих пор очень рад.С Национальным праздником!
Без лишних слов, давайте сразу к делу.
я верю всемXXL-JOB
Я очень хорошо знаю, поэтому в этой статье не слишком много рассказывается об исходном коде, основное внимание уделяетсяНесколько моментов, которые приходят на ум в процессе просмотра исходного кода, не обязательно все в порядке, пожалуйста, критикуйте и поправляйте меня.
Введение в XXL-JOB
-
XXL-JOB
Это легкая распределенная платформа планирования задач, основными целями которой являются быстрая разработка, простота обучения, малый вес и легкое расширение. Теперь с открытым исходным кодом и подключен к линейкам онлайн-продуктов многих компаний, из коробки. -
XXL-JOB
Он разделен на центр планирования, исполнитель и центр обработки данных.Центр планирования отвечает за управление задачами и планирование, управление исполнителем, управление журналом и т. д. Исполнитель отвечает за выполнение задачи и обратный вызов результата выполнения.
Планирование задач — реализация «Колеса времени»
колесо времени
колесо времени отNetty
серединаHashedWheelTimer
, представляет собой кольцевую структуру, которую можно сравнить с часами.bucket
,Каждыйbucket
Несколько задач могут быть сохранены наList
Сохраняйте все задачи, которые должны быть выполнены в данный момент, и в то же время указатель вращается одна за другой по прошествии времени и выполняет соответствующиеbucket
по всем поставленным задачам. миссия пройденапо модулюРешите, что нужно поставитьbucket
. иHashMap
Принцип аналогичен,newTask
соответствоватьput
,использоватьList
для разрешения хеш-конфликтов.
Возьмите приведенный выше рисунок в качестве примера, предполагая, чтоbucket
Если это 1 секунда, то период времени, представленный одним оборотом указателя, равен 8 с. Предполагая, что текущий указатель указывает на 0, необходимо запланировать выполнение задачи через 3 с. Очевидно, его следует добавить в квадрат (0 + 3 = 3). Это может быть выполнено после прохождения 3 с раз; если задача должна быть выполнена после 10 с, она должна дождаться, пока указатель завершит раунд нуля и 2, прежде чем выполнять ее, поэтому она должна поставить в 2, и в то же времяround(1)
Сохранить в задаче. Выполнять только при проверке сроков выполнения задачround
0,bucket
по другим задачамround
Вычесть 1.
Конечно, есть и оптимизированная реализация «иерархического колеса времени», см. https://cnkirito.moe/timer/.
«Колесо времени» в XXL-JOB
-
Метод планирования в XXL-JOB начинается с
Quartz
Это стало самостоятельно разработанным методом планирования, очень похожим на колесо времени, которое можно понимать как 60bucket
и каждыйbucket
на 1 секунду, но не болееround
Концепция чего-либо. -
Подробности см. на рисунке ниже.
- За планирование задач в XXL-JOB отвечают два потока, а именно
ringThread
иscheduleThread
, который работает следующим образом.
1. ScheduleThread: прочитайте информацию о задаче и заранее прочитайте будущее5sЗадача, которая должна быть запущена, помещается в колесо времени. 2, кольцевая нить: для тока
bucket
и предыдущийbucket
Задачи извлекаются и выполняются.
- Давайте посмотрим на исходный код ниже, почему он называется «колесо времени класса», ключевой код аннотирован, обратите внимание на просмотр.
// 环状结构
private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();
// 任务下次启动时间(单位为秒) % 60
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 任务放进时间轮
private void pushTimeRing(int ringSecond, int jobId){
// push async ring
List<Integer> ringItemData = ringData.get(ringSecond);
if (ringItemData == null) {
ringItemData = new ArrayList<Integer>();
ringData.put(ringSecond, ringItemData);
}
ringItemData.add(jobId);
}
// 同时取两个时间刻度的任务
List<Integer> ringItemData = new ArrayList<>();
int nowSecond = Calendar.getInstance().get(Calendar.SECOND);
// 避免处理耗时太长,跨过刻度,向前校验一个刻度;
for (int i = 0; i < 2; i++) {
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
if (tmpData != null) {
ringItemData.addAll(tmpData);
}
}
// 运行
for (int jobId: ringItemData) {
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);
}
Алгоритм хеширования в согласованной хеш-маршрутизации
- Все это знают,
XXL-JOB
При выполнении задачи конкретный исполнитель, на котором выполняется задача, определяется в соответствии со стратегией маршрутизации, одной из стратегий является стратегия консистентного хэширования (исходный код находится в ExecutorRouteConsistentHash.java), что естественно приходит на ум.Последовательный алгоритм хеширования. - Последовательный алгоритм хешированияЧтобы решить проблему балансировки нагрузки в распределенных системах, можно использовать алгоритм хеширования, чтобы фиксированная часть запросов попадала на один и тот же сервер, так что каждый сервер обрабатывает фиксированную часть запросов (и поддерживает информацию о эти запросы), что играет роль в балансировке нагрузки.
- Алгоритм общего остаточного хэша (хеш (например, идентификатор пользователя)% номер сервера) имеет плохую масштабируемость.Когда сервер добавляется или отключается, отношение сопоставления между идентификатором пользователя и сервером часто не работает. Последовательное хеширование использует хеш-кольца для его улучшения.
- Последовательный алгоритм хешированияНа практике, когда число серверных узлов относительно невелико, возникает проблема согласованного перекоса хэшей, упомянутая в предыдущем разделе.Одно из решений — добавить больше машин, но добавление машин стоит дорого, а затем добавить больше машин.виртуальный узел.
- Конкретные принципы см. на странице https://www.jianshu.com/p/e968c081f563.
- На следующем рисунке показано хеш-кольцо с виртуальными узлами, где ip1-1 — виртуальный узел ip1, ip2-1 — виртуальный узел ip2, а ip3-1 — виртуальный узел ip3.
видимый, ключом к согласованному алгоритму хеширования являетсяХэш-алгоритм,гарантироватьвиртуальный узелиРезультат хешированияРавномерность , а равномерность можно понимать какУменьшить конфликт хэшей, пожалуйста, обратитесь к пункту знаний о конфликте хэшейПосмотрите на [Hash] из HashMap, словарь Redis. . ..
- Хеш-функция согласованного хэша в XXL-JOB выглядит следующим образом.
// jobId转换为md5
// 不直接用hashCode() 是因为扩大hash取值范围,减少冲突
byte[] digest = md5.digest();
// 32位hashCode
long hashCode = ((long) (digest[3] & 0xFF) << 24)
| ((long) (digest[2] & 0xFF) << 16)
| ((long) (digest[1] & 0xFF) << 8)
| (digest[0] & 0xFF);
long truncateHashCode = hashCode & 0xffffffffL;
- Увидев хэш-функцию на картинке выше, я задумался
HashMap
Хэш-функция
f(key) = hash(key) & (table.length - 1)
// 使用>>> 16的原因,hashCode()的高位和低位都对f(key)有了一定影响力,使得分布更加均匀,散列冲突的几率就小了。
hash(key) = (h = key.hashCode()) ^ (h >>> 16)
- Точно так же старший и младший биты кода md5 jobId влияют на результат хеширования, что снижает вероятность конфликта хэшей.
Реализация сегментированных задач — поддержание контекста потока
-
Задача шардинга XXL-JOB реализует распределенное выполнение задач, что собственно и является предметом исследования автора.Многие запланированные задачи в ежедневной разработке выполняются на одной машине.Для последующих задач с большими объемами данных лучше всего иметь распределенное решение.
-
Стратегию маршрутизации для шардирования задач предлагает автор исходного кодаТрансляция осколкаПонятие , я сначала немного запутался, но после прочтения исходного кода стало понятнее.
-
Те, кто видел исходный код, наверняка столкнулись с таким небольшим эпизодом, почему не реализована стратегия маршрутизации? Как показано ниже.
public enum ExecutorRouteStrategyEnum {
FIRST(I18nUtil.getString("jobconf_route_first"), new ExecutorRouteFirst()),
LAST(I18nUtil.getString("jobconf_route_last"), new ExecutorRouteLast()),
ROUND(I18nUtil.getString("jobconf_route_round"), new ExecutorRouteRound()),
RANDOM(I18nUtil.getString("jobconf_route_random"), new ExecutorRouteRandom()),
CONSISTENT_HASH(I18nUtil.getString("jobconf_route_consistenthash"), new ExecutorRouteConsistentHash()),
LEAST_FREQUENTLY_USED(I18nUtil.getString("jobconf_route_lfu"), new ExecutorRouteLFU()),
LEAST_RECENTLY_USED(I18nUtil.getString("jobconf_route_lru"), new ExecutorRouteLRU()),
FAILOVER(I18nUtil.getString("jobconf_route_failover"), new ExecutorRouteFailover()),
BUSYOVER(I18nUtil.getString("jobconf_route_busyover"), new ExecutorRouteBusyover()),
// 说好的实现呢???竟然是null
SHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"), null);
- Я продолжил расследование и пришел к выводу.Расскажу потихоньку.Во-первых,какой параметр выполнения задачи шардинга передается? Смотреть
XxlJobTrigger.trigger
Кусок кода в функции.
...
// 如果是分片路由,走的是这段逻辑
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
&& group.getRegistryList() != null && !group.getRegistryList().isEmpty()
&& shardingParam == null) {
for (int i = 0; i < group.getRegistryList().size(); i++) {
// 最后两个参数,i是当前机器在执行器集群当中的index,group.getRegistryList().size()为执行器总数
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
}
}
...
- Параметры передаются исполнителю через RPC собственной разработки, и исполнитель отвечает за выполнение задачи.
JobThread.run
, Вы видите следующий код.
// 分片广播的参数比set进了ShardingUtil
ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));
...
// 将执行参数传递给jobHandler执行
handler.execute(triggerParamTmp.getExecutorParams())
- см. далее
ShardingUtil
, Я открыл тайну, пожалуйста, посмотрите на код.
public class ShardingUtil {
// 线程上下文
private static InheritableThreadLocal<ShardingVO> contextHolder = new InheritableThreadLocal<ShardingVO>();
// 分片参数对象
public static class ShardingVO {
private int index; // sharding index
private int total; // sharding total
// 次数省略 get/set
}
// 参数对象注入上下文
public static void setShardingVo(ShardingVO shardingVo){
contextHolder.set(shardingVo);
}
// 从上下文中取出参数对象
public static ShardingVO getShardingVo(){
return contextHolder.get();
}
}
- Очевидно, в задаче шардинга
ShardingJobHandler
Параметры фрагментации в контексте потока вынесены, и вот код, который нужно поставить~
@JobHandler(value="shardingJobHandler")
@Service
public class ShardingJobHandler extends IJobHandler {
@Override
public ReturnT<String> execute(String param) throws Exception {
// 分片参数
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal());
// 业务逻辑
for (int i = 0; i < shardingVO.getTotal(); i++) {
if (i == shardingVO.getIndex()) {
XxlJobLogger.log("第 {} 片, 命中分片开始处理", i);
} else {
XxlJobLogger.log("第 {} 片, 忽略", i);
}
}
return SUCCESS;
}
}
- Отсюда следует, что распределенная реализация основана на параметрах шардирования
index
иtotal
Проще говоря, эта идентификация дается исполнительным механизмом, дифференцированным в соответствии с идентификатором данных задачи или может быть реализована логическая распределенная операция. - Не по теме: Что касается того, почему используется внешняя инъекция параметров шардинга, то это не напрямую
execute
перечислить?
1. Это может быть связано с тем, что эти два параметра используются только в задачах шардинга. 2. IJobHandler имеет только параметры типа String
Мысли после прочтения исходного кода
- 1. После просмотра исходного кода на этот раз, цели дизайна XXL-JOB действительно соответствуютБыстрая разработка, простота в освоении, легкость и простота расширения.
- 2. Что касается самостоятельно разработанного RPC, особого рассмотрения нет. Конкретный доступ должен учитывать структуру RPC компании.
- 3. Дано автором
Quartz
Отсутствие планирования, я должен продолжать понимать. - 4. Стоит изучить совместимость многих нештатных ситуаций, таких как простои, сбои и тайм-ауты в фреймворке.
- 5. Роллинг журнал и реализация системы журналов требуют дальнейшего понимания.