Как изящно закрыть службу

Java

задний план

Во многих случаях службы должны завершаться плавно.Например, после остановки службы RPC узел необходимо удалить из службы регистрации, а сообщения, полученные из очереди сообщений, необходимо обработать в обычном режиме. Как правило, мы надеемся, что служба сможет завершить текущие выполняемые задачи перед выходом.На данный момент нам нужно запустить некоторый код для очистки сцены при выключении JVM.

план

ShutdownHook

JDK предоставляет метод Java.Runtime.addShutdownHook (перехватчик потока), который позволяет пользователям зарегистрировать перехватчик завершения работы JVM. Этот хук можно вызвать в следующих сценариях:

  • Программа завершается нормально;
  • использовать System.exit();
  • Терминал использует терминалы, активируемые Ctrl + C;
  • система выключена;
  • Используйте команду kill pid, чтобы убить процесс;

Как правило, система выпуска останавливает службу с помощью команды kill. В это время служба может получить сигнал завершения работы и выполнить программу-ловушку для очистки.

Пример сценария

Предположим следующий сценарий: производитель отправляет сообщение во внутреннюю очередь, а потребитель читает сообщение очереди и выполняет его. Когда мы останавливаем службу, мы надеемся, что сообщения в очереди могут нормально обрабатываться Пример кода выглядит следующим образом:

/**
 * 服务关闭测试
 */
public class ShutDownTest {

    private static BlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(50);

    private static AtomicLong taskId = new AtomicLong(0);

    // 生产任务
    private static class ProduceTask implements Runnable {

        private AtomicBoolean stopped = new AtomicBoolean(false);

        @Override
        public void run() {
            while (!stopped.get()) {
                long element = taskId.incrementAndGet();
                queue.add(element);
                System.out.println("add element : " + element);
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                }
            }
        }

        public void setStopped() {
            stopped.compareAndSet(false, true);
            System.out.println("stop producer.");
        }
    }

    // 消费任务
    private static class ConsumeTask implements Runnable {

        private AtomicBoolean stopped = new AtomicBoolean(false);

        @Override
        public void run() {
            while (!stopped.get() || queue.size() > 0) {
                try {
                    long element = queue.take();
                    System.out.println("consume element : " + element);
                    doWork();
                } catch (InterruptedException e) {
                }
            }
        }

        private void doWork() {
            try {
                // 消费速度比生产速度稍慢,模拟积压情况
                Thread.sleep(60);
            } catch (InterruptedException e) {
            }
        }

        public void setStopped() {
            stopped.compareAndSet(false, true);
            System.out.println("stop consumer.");
        }
    }

    public static void main(String[] args) {
        final ProduceTask producerTask = new ProduceTask();
        final Thread producerThread = new Thread(producerTask);

        final ConsumeTask consumeTask = new ConsumeTask();
        Thread consumeThread = new Thread(consumeTask);

        // 先启动消费
        consumeThread.start();
        // 再启动生产
        producerThread.start();

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("try close...");
            // 先关闭生产
            producerTask.setStopped();
            // 再关闭消费
            consumeTask.setStopped();
            try {
                System.out.println("close wait...");
                Thread.sleep(5000);
            } catch (InterruptedException e) {
            }
            System.out.println("close finished...");
        }));
    }
}

Результат выполнения выглядит следующим образом: вы можете видеть, что программа-ловушка успешно выполняется, когда служба закрывается, и завершается после ожидания завершения обработки сообщения.

钩子示例结果

потенциальные проблемы

При использовании ShutdownHook мы часто не можем контролировать порядок выполнения хуков. Java.Runtime.addShutdownHook — это интерфейс API, доступный для внешнего мира. В приведенном выше сценарии, если хуки регистрируются независимо, нельзя ли гарантировать порядок выполнения в более сложных проектах? Мы столкнулись с такой проблемой в реальных сценариях.Сообщения потребляются из очереди kafka и передаются во внутренний пул потоков для обработки.Мы настроили политику отклонения пула потоков, чтобы ждать все время (чтобы гарантировать, что сообщение действительно обрабатывается ), а затем иногда возникает проблема, что служба не может быть отключена. Причина в том, что сначала закрывается пул потоков, но очередь Kafka все еще потребляет сообщения, в результате чего потребляющий поток все время ожидает.

Signal

Java также предоставляет механизм сигнального сигнала, и наш сервис также может получать сигнал завершения работы.

Механизм Signal используется по следующим причинам:

  • Порядок выполнения ShutdownHook не может быть гарантирован, и сторонние компоненты также могут быть зарегистрированы, в результате чего ресурсы, от которых зависит определяемый бизнесом процесс выхода, будут закрыты и очищены заранее;
  • Signal — это непубличный API, сторонние компоненты используются редко Мы можем разместить порядок выполнения отключения службы внутри себя;
  • После завершения работы по очистке вы можете выполнить вызов выхода, чтобы гарантировать, что очистка ресурсов не повлияет на логику очистки выхода ShutdownHook;

Основная причина здесь состоит в том, чтобы полностью гарантировать порядок, в котором сервисы закрываются, чтобы избежать проблем. Поддерживаем задачи выключения в порядке внутри службы, а приведенный выше код корректируется следующим образом:

public class TermHelper {

    private static AtomicBoolean signalTriggered = new AtomicBoolean(false);
    private static AtomicBoolean stopping = new AtomicBoolean(false);
    private static AtomicBoolean registeredHolder = new AtomicBoolean(false);

    private static Deque<Runnable> terms = new ConcurrentLinkedDeque<>();

    private static void tryRegisterOnlyOnce() {
        boolean previousRegistered = registeredHolder.getAndSet(true);
        if (!previousRegistered) {
            registerTermSignal();
        }
    }

    private static void registerTermSignal() {
        Signal.handle(new Signal("TERM"), signal -> {
            boolean previous = signalTriggered.getAndSet(true);
            if (previous) {
                System.out.println("Term has been triggered.");
                return;
            }
            termAndExit();
        });
    }

    public static void addTerm(Runnable runnable) {
        tryRegisterOnlyOnce();
        terms.addLast(runnable);
    }

    public static void addFirstTerm(Runnable runnable) {
        tryRegisterOnlyOnce();
        terms.addFirst(runnable);
    }

    private static void termAndExit() {
        try {
            Thread current = Thread.currentThread();
            current.setName(current.getName() + "(退出线程)");
            System.out.println("do term cleanup....");
            doTerm();
            System.out.println("exit success.");
            System.exit(0);
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    public static void doTerm() {
        boolean previousStopping = stopping.getAndSet(true);
        if (previousStopping) {
            System.out.println("Term routine already running, wait until done!");
            return;
        }
        for (Runnable runnable : terms) {
            try {
                System.out.println("execute term runnable : " + runnable);
                runnable.run();
            } catch (Throwable e) {
                e.printStackTrace();
            }
        }
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

TermHelper внутренне использует очередь для обслуживания задач выключения и последовательно выполняет связанные задачи, когда служба закрывается, чтобы обеспечить ее порядок. Мы также можем поддерживать приоритет задачи выключения на этой основе и реализовывать задачу выключения последовательно в соответствии с приоритетом.

public class ShutDownTest {

    private static BlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(50);

    private static AtomicLong taskId = new AtomicLong(0);

    // 生产任务
    private static class ProduceTask implements Runnable {

        private AtomicBoolean stopped = new AtomicBoolean(false);

        @Override
        public void run() {
            while (!stopped.get()) {
                long element = taskId.incrementAndGet();
                queue.add(element);
                System.out.println("add element : " + element);
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                }
            }
        }

        public void setStopped() {
            stopped.compareAndSet(false, true);
            System.out.println("stop producer.");
        }
    }

    // 消费任务
    private static class ConsumeTask implements Runnable {

        private AtomicBoolean stopped = new AtomicBoolean(false);

        @Override
        public void run() {
            while (!stopped.get() || queue.size() > 0) {
                try {
                    long element = queue.take();
                    System.out.println("consume element : " + element);
                    doWork();
                } catch (InterruptedException e) {
                }
            }
        }

        private void doWork() {
            try {
                // 消费速度比生产速度稍慢,模拟积压情况
                Thread.sleep(60);
            } catch (InterruptedException e) {
            }
        }

        public void setStopped() {
            stopped.compareAndSet(false, true);
            System.out.println("stop consumer.");
        }
    }

    public static void main(String[] args) {
        final ProduceTask producerTask = new ProduceTask();
        final Thread producerThread = new Thread(producerTask);

        final ConsumeTask consumeTask = new ConsumeTask();
        Thread consumeThread = new Thread(consumeTask);

        // 先启动消费
        consumeThread.start();
        // 再启动生产
        producerThread.start();

        TermHelper.addFirstTerm(() -> {
            // 关闭生产
            producerTask.setStopped();
        });

        TermHelper.addTerm(() -> {
            // 再关闭消费
            consumeTask.setStopped();
        });

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("shut down hook...");
        }));
    }
}

Результат выполнения показан ниже. Следует отметить, что мы зарегистрировали только сигнал TERM, поэтому нам нужно отключить службу, уничтожив -TERM. Из рисунка видно, что протестированные нами производители и потребители завершили работу нормально, а внутренние сообщения были наконец обработаны.

image-20190407212611347

резюме

Если вам нужно плавно остановить службу, мы обычно можем добиться этого с помощью ShutdownHook и Signal. ShutdownHook, как правило, трудно гарантировать порядок выполнения задач завершения работы.В настоящее время мы можем рассмотреть возможность использования механизма Signal для полного управления порядком выполнения наших служб отключения.