Elastic-Job: динамическое добавление задач, поддержка динамического сегментирования

Java
Elastic-Job: динамическое добавление задач, поддержка динамического сегментирования

Страстный только весной и при луне, как падающие цветы, когда люди разлучены.

https://p1-jj.byteimg.com/tos-cn-i-t2oaga2asx/gold-user-assets/2019/9/19/16d48304045990f4~tplv-t2oaga2asx-image.image
https://p1-jj.byteimg.com/tos-cn-i-t2oaga2asx/gold-user-assets/2019/9/19/16d48304045990f4~tplv-t2oaga2asx-image.image

Обзор

Поскольку в проекте используются временные задачи и развернуто несколько экземпляров сервиса, необходимо решить проблему многократного выполнения временных задач. То есть в один и тот же момент времени каждое запланированное задание выполняется только на одном узле. Общие решения с открытым исходным кодом, такие какelastic-job,xxl-job,quartz,saturn,opencron,antaresЖдать. окончательное решение об использованииelastic-job.elastic-jobОсновные моменты заключаются в следующем:

  1. Основанный на структуре задачи синхронизации кварца, он имеет большинство функций кварца.
  2. Используйте zookeeper для координации, центра планирования, более легкого
  3. Sharding для поддержки задач
  4. Он поддерживает эластичное расширение и может масштабироваться по горизонтали.При повторном запуске задачи он проверит текущее количество серверов, повторно раздробит и продолжит выполнение задачи после завершения разбиения.
  5. Отработка отказа, отказоустойчивая обработка, когда сервер планирования не работает или отключается от zookeeper, он немедленно останавливает задание, а затем переходит к поиску других простаивающих серверов планирования для выполнения оставшихся задач.
  6. Предоставляет интерфейс эксплуатации и обслуживания для управления заданиями и центрами регистрации.

Но найдено в фактической разработкеelastic-jobФрагментация не поддерживается для динамически добавляемых запланированных задач. То есть в случае нескольких экземпляров, если задача динамически добавляется в экземпляр, задача всегда будет выполняться на этом узле. Если вам нужно запустить на других экземплярах, вам нужно вызвать интерфейс другого экземпляра с теми же параметрами. Ссылаться на:Эластичная работа: динамически добавлять задачи. В нескольких байду +googleнайдено подElastic-Job динамически добавляет задачиБыла такая же проблема с хостом здесь. Тем не менее, сегментация динамически добавляемых задач была протестирована арендодателем, иногда это хорошо или плохо, и до тех пор, покаzookeeperЕсли задача зарегистрирована в системе, то при перезапуске задача будет автоматически инициализирована. (Для описания динамической задачи вы можете обратиться к описанию по ссылке выше, и я не буду здесь слишком много объяснять).

решать

следитьБольшой ИньИдея состоит в том, чтобы централизованно управлять узлами задачи.Независимо от того, на каком узле зарегистрирована динамическая задача, запрос необходимо перенаправить на другие узлы для инициализации, чтобы задача многоузлового шардирования могла быть гарантированно выполнена. обычно.

код показывает, как показано ниже:

/**
     * 开启任务监听,当有任务添加时,监听zk中的数据增加,自动在其他节点也初始化该任务
     */
    public void monitorJobRegister() {
        CuratorFramework client = zookeeperRegistryCenter.getClient();
        @SuppressWarnings("resource")
        PathChildrenCache childrenCache = new PathChildrenCache(client, "/", true);
        PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                ChildData data = event.getData();
                switch (event.getType()) {
                    case CHILD_ADDED:
                        String config = new String(client.getData().forPath(data.getPath() + "/config"));
                        Job job = JsonUtils.toBean(Job.class, config);
                        Object bean = null;
                        // 获取bean失败则添加任务
                        try {
                            bean = ctx.getBean("SpringJobScheduler" + job.getJobName());
                        } catch (BeansException e) {
                            logger.error("ERROR NO BEAN,CREATE BEAN SpringJobScheduler" + job.getJobName());
                        }
                        if (Objects.isNull(bean)) {
                            addJob(job);
                        }
                        break;
                    default:
                        break;
                }
            }
        };
        childrenCache.getListenable().addListener(childrenCacheListener);
        try {
            // https://blog.csdn.net/u010402202/article/details/79581575
            childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

контрольная работа

Тест динамически добавляет запланированные задачи для поддержки аварийного переключения фрагментации.

  1. скачатьelastic-job-spring-boot-starterиспользоватьmavenЗаказinstallк местному
  2. Создайтеdemo-elastic-jobпроект Структура каталогов следующая:
demo-elastic-job
├── mvnw
├── mvnw.cmd
├── src
│   ├── main
│   │   ├── java
│   │   │   └── com
│   │   │       └── example
│   │   │           └── demo
│   │   │               ├── job
│   │   │               │   ├── DynamicJob.java
│   │   │               │   └── TestJob.java
│   │   │               └── DemoApplication.java
│   │   └── resources
│   │       ├── application.yml
│   │       └── application-dev.yml
│   └── test
│       └── java
│           └── com
│               └── example
│                   └── demo
│                       └── DemoApplicationTests.java
├── pom.xml
└── demo.iml

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.10.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.10.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>com.cxytiandi</groupId>
            <artifactId>elastic-job-spring-boot-starter</artifactId>
            <version>1.0.4</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

DemoApplication.java

package com.example.demo;

import com.cxytiandi.elasticjob.annotation.EnableElasticJob;
import com.cxytiandi.elasticjob.dynamic.service.JobService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;

@SpringBootApplication
@EnableElasticJob
@ComponentScan(basePackages = {"com.cxytiandi", "com.example.demo"})
public class DemoApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Autowired
    private JobService jobService;

    @Override
    public void run(String... args) throws Exception {
        // 模拟初始化读取数据库 添加任务
//        Job job1 = new Job();
//        job1.setJobName("job1");
//        job1.setCron("0/10 * * * * ? ");
//        job1.setJobType("SIMPLE");
//        job1.setJobClass("com.example.demo.job.DynamicJob");
//        job1.setShardingItemParameters("");
//        job1.setShardingTotalCount(2);
//        jobService.addJob(job1);
//        Job job2 = new Job();
//        job2.setJobName("job2");
//        job2.setCron("0/10 * * * * ? ");
//        job2.setJobType("SIMPLE");
//        job2.setJobClass("com.example.demo.job.DynamicJob");
//        job2.setShardingItemParameters("0=A,1=B");
//        job2.setShardingTotalCount(2);
//        jobService.addJob(job2);
    }
}

TestJob.java

package com.example.demo.job;

import com.cxytiandi.elasticjob.annotation.ElasticJobConf;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;

/**
 * Created by zhenglongfei on 2019/7/22
 *
 * @VERSION 1.0
 */
@Component
@Slf4j
@ElasticJobConf(name = "dayJob", cron = "0/10 * * * * ?", shardingTotalCount = 2,
        shardingItemParameters = "0=AAAA,1=BBBB", description = "简单任务", failover = true)
public class TestJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        log.info("TestJob任务名:【{}】, 片数:【{}】, param=【{}】", shardingContext.getJobName(), shardingContext.getShardingTotalCount(),
                shardingContext.getShardingParameter());
    }
}

DynamicJob.java

package com.example.demo.job;

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * Created by zhenglongfei on 2019/7/24
 *
 * @VERSION 1.0
 */
@Component
@Slf4j
public class DynamicJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {


        switch (shardingContext.getShardingItem()) {
            case 0:
                log.info("【0】 is running");
                break;
            case 1:
                log.info("【1】 is running");
                break;
        }
    }
}

application.yml

elastic:
  job:
    zk:
      serverLists: 172.25.66.137:2181
      namespace: demo_test
server:
  port: 8082
spring:
  redis:
    host: 127.0.0.1
    port: 6379

Результаты теста

Начать два проекта соответственно8081и8082порт, использоватьREST APIдля динамической регистрации задач.

http://localhost:8081/jobПочта Параметры следующие:

{
  "jobName": "DynamicJob01",
  "cron": "0/3 * * * * ?",
  "jobType": "SIMPLE",
  "jobClass": "com.example.demo.job.DynamicJob",
  "jobParameter": "test",
  "shardingTotalCount": 2,
  "shardingItemParameters": "0=AAAA,1=BBBB"
}

https://p1-jj.byteimg.com/tos-cn-i-t2oaga2asx/gold-user-assets/2019/9/19/16d483a9c926830e~tplv-t2oaga2asx-image.image
https://p1-jj.byteimg.com/tos-cn-i-t2oaga2asx/gold-user-assets/2019/9/19/16d483a9c926830e~tplv-t2oaga2asx-image.image

загрузка кода


https://p1-jj.byteimg.com/tos-cn-i-t2oaga2asx/gold-user-assets/2019/9/18/16d42fc88345bad5~tplv-t2oaga2asx-image.image
https://p1-jj.byteimg.com/tos-cn-i-t2oaga2asx/gold-user-assets/2019/9/18/16d42fc88345bad5~tplv-t2oaga2asx-image.image

🙂 🙂 🙂 Подпишитесь на публичный аккаунт WeChatява галантерейные товарыДелитесь информацией о галантерее время от времени

Ссылка на ссылку: