содержание
- Повторите сценарии использования
- Как элегантно спроектировать повторную реализацию
- guava-повторная попытка базового использования
- Принцип реализации Guava-retry
- Расширенное использование guava-retrying
- Проблемы, возникающие при использовании
Повторите сценарии использования
Во многих бизнес-сценариях для устранения различных нестабильных факторов и логических ошибок в системе и обеспечения ожидаемых результатов с наибольшей вероятностью необходим механизм повторных попыток.
Особенно при вызове удаленных служб в сценариях с высокой степенью параллелизма очень вероятно, что мы не сможем получить желаемые результаты или вообще не получим ответа из-за задержек ответа сервера или сетевых причин. В настоящее время элегантный механизм повторных вызовов может повысить вероятность получения ожидаемого ответа.
Обычно мы проходили бызадача на времяПопробуй снова. Например, если операция не удалась, запишите ее.Когда запланированная задача запустится снова, поместите данные в метод запланированной задачи и снова запустите ее. Наконец, пока вы не получите желаемый результат.
Будь то механизм повтора, основанный на задачах с заданным временем, или простой повторитель, написанный нами, недостатком является то, что механизм повтора слишком прост и неудобен для реализации.
Как элегантно спроектировать повторную реализацию
Полная реализация повторных попыток должна решить следующие проблемы:
- Повторить при каких условиях
- остановиться при каких условиях
- Как остановить повторы
- Как долго ждать, чтобы прекратить повторную попытку
- как ждать
- срок запроса
- как закончить
- Как прослушать весь процесс повтора
И, для лучшей инкапсуляции, реализация retry обычно делится на два шага:
- Постройте повторитель, используя заводской шаблон
- Выполните метод повторной попытки и получите результат
Полный процесс повторной попытки может быть просто показан как:
guava-повторная попытка базового использования
Повторные попытки Guava основаны на реализации механизма повторных попыток основной библиотеки классов Google guava, которую можно назвать инструментом повторных попыток.
Вот краткий обзор его использования.
1. Конфигурация Maven
<!-- https://mvnrepository.com/artifact/com.github.rholder/guava-retrying -->
<dependency>
<groupId>com.github.rholder</groupId>
<artifactId>guava-retrying</artifactId>
<version>2.0.0</version>
</dependency>
Обратите внимание, что эта версия полагается на 27.0.1 версия Guava. Если низкая гуава несколько версий вашего проекта не проблема, но не слишком низкая совместима. На этот раз вам нужно обновить вашу версию вашего проекта Guava, или просто избавиться от собственной зависимости ГУВА, используя Guava-Proidering Pass по зависимости Гуавы.
2. Реализовать вызываемый
Callable<Boolean> callable = new Callable<Boolean>() {
public Boolean call() throws Exception {
return true; // do something useful here
}
};
Метод вызова Callable — это ваш собственный фактический деловой вызов.
- Построить Retryer с помощью RetryerBuilder
Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder()
.retryIfResult(Predicates.<Boolean>isNull())
.retryIfExceptionOfType(IOException.class)
.retryIfRuntimeException()
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
.build();
- Делайте свой бизнес с ретриверами
retryer.call(callable);
Ниже приведена полная эталонная реализация.
public Boolean test() throws Exception {
//定义重试机制
Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder()
//retryIf 重试条件
.retryIfException()
.retryIfRuntimeException()
.retryIfExceptionOfType(Exception.class)
.retryIfException(Predicates.equalTo(new Exception()))
.retryIfResult(Predicates.equalTo(false))
//等待策略:每次请求间隔1s
.withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS))
//停止策略 : 尝试请求6次
.withStopStrategy(StopStrategies.stopAfterAttempt(6))
//时间限制 : 某次请求不得超过2s , 类似: TimeLimiter timeLimiter = new SimpleTimeLimiter();
.withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(2, TimeUnit.SECONDS))
.build();
//定义请求实现
Callable<Boolean> callable = new Callable<Boolean>() {
int times = 1;
@Override
public Boolean call() throws Exception {
log.info("call times={}", times);
times++;
if (times == 2) {
throw new NullPointerException();
} else if (times == 3) {
throw new Exception();
} else if (times == 4) {
throw new RuntimeException();
} else if (times == 5) {
return false;
} else {
return true;
}
}
};
//利用重试器调用请求
return retryer.call(callable);
}
Принцип реализации Guava-retry
Ядром guava-retry является класс Attempt, класс Retryer и некоторые классы, связанные со стратегией.
- Attempt
Попытка является как повторным запросом (вызовом), так и результатом запроса, и записывает количество текущих запросов, содержат ли они исключения, а также возвращаемое значение запроса.
/**
* An attempt of a call, which resulted either in a result returned by the call,
* or in a Throwable thrown by the call.
*
* @param <V> The type returned by the wrapped callable.
* @author JB
*/
public interface Attempt<V>
- Retryer
Retryer создается с помощью фабричного класса RetryerBuilder. RetryerBuilder отвечает за назначение определенной стратегии повтора объекту Retryer.
Когда Retryer выполняет метод вызова, эти стратегии повтора используются одна за другой.
Давайте взглянем на конкретную реализацию метода call Retryer.
/**
* Executes the given callable. If the rejection predicate
* accepts the attempt, the stop strategy is used to decide if a new attempt
* must be made. Then the wait strategy is used to decide how much time to sleep
* and a new attempt is made.
*
* @param callable the callable task to be executed
* @return the computed result of the given callable
* @throws ExecutionException if the given callable throws an exception, and the
* rejection predicate considers the attempt as successful. The original exception
* is wrapped into an ExecutionException.
* @throws RetryException if all the attempts failed before the stop strategy decided
* to abort, or the thread was interrupted. Note that if the thread is interrupted,
* this exception is thrown and the thread's interrupt status is set.
*/
public V call(Callable<V> callable) throws ExecutionException, RetryException {
long startTime = System.nanoTime();
//说明: 根据attemptNumber进行循环——也就是重试多少次
for (int attemptNumber = 1; ; attemptNumber++) {
//说明:进入方法不等待,立即执行一次
Attempt<V> attempt;
try {
//说明:执行callable中的具体业务
//attemptTimeLimiter限制了每次尝试等待的时常
V result = attemptTimeLimiter.call(callable);
//利用调用结果构造新的attempt
attempt = new ResultAttempt<V>(result, attemptNumber, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
} catch (Throwable t) {
attempt = new ExceptionAttempt<V>(t, attemptNumber, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
}
//说明:遍历自定义的监听器
for (RetryListener listener : listeners) {
listener.onRetry(attempt);
}
//说明:判断是否满足重试条件,来决定是否继续等待并进行重试
if (!rejectionPredicate.apply(attempt)) {
return attempt.get();
}
//说明:此时满足停止策略,因为还没有得到想要的结果,因此抛出异常
if (stopStrategy.shouldStop(attempt)) {
throw new RetryException(attemptNumber, attempt);
} else {
//说明:执行默认的停止策略——线程休眠
long sleepTime = waitStrategy.computeSleepTime(attempt);
try {
//说明:也可以执行定义的停止策略
blockStrategy.block(sleepTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RetryException(attemptNumber, attempt);
}
}
}
}
Процесс выполнения Retryer выглядит следующим образом.
Расширенное использование guava-retrying
Основываясь на принципе реализации guava-retry, мы можем определить нашу собственную стратегию повторных попыток в соответствии с реальным бизнесом.
Ниже с数据同步
Возьмите этот обычный системный бизнес в качестве примера и настройте стратегию повторных попыток.
Следующая реализация основана на версии Spring Boot 2.1.2.RELEASE.
И используйте Lombok, чтобы упростить бин.
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
Описание деятельности
Когда продукт создан, цена продукта должна быть установлена отдельно. Поскольку две операции выполняются двумя людьми, возникнет следующая проблема, то есть товар не создан, а данные о цене созданы. В этом случае данные о цене должны дождаться нормального создания продукта, а затем продолжить синхронизацию.
Создаем товар через http запрос, а цену товара изменяем через таймер.
Если товара нет или количество товара меньше 1, цена товара не может быть установлена. Вам нужно дождаться, пока продукт будет успешно создан, а количество больше 0, чтобы успешно установить цену продукта.
Процесс реализации
- Пользовательская стратегия блокировки повторных попыток
Стратегия блокировки по умолчанию — это спящий режим потока, который реализован здесь с использованием циклических блокировок без блокировки потоков.
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.strategy;
import com.github.rholder.retry.BlockStrategy;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
import java.time.LocalDateTime;
/**
* 自旋锁的实现, 不响应线程中断
*/
@Slf4j
@NoArgsConstructor
public class SpinBlockStrategy implements BlockStrategy {
@Override
public void block(long sleepTime) throws InterruptedException {
LocalDateTime startTime = LocalDateTime.now();
long start = System.currentTimeMillis();
long end = start;
log.info("[SpinBlockStrategy]...begin wait.");
while (end - start <= sleepTime) {
end = System.currentTimeMillis();
}
//使用Java8新增的Duration计算时间间隔
Duration duration = Duration.between(startTime, LocalDateTime.now());
log.info("[SpinBlockStrategy]...end wait.duration={}", duration.toMillis());
}
}
- Пользовательский прослушиватель повторных попыток
RetryListener может отслеживать процесс множественных повторных попыток и может использоватьattempt
Сделайте что-нибудь дополнительно.
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.listener;
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryListener;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RetryLogListener implements RetryListener {
@Override
public <V> void onRetry(Attempt<V> attempt) {
// 第几次重试,(注意:第一次重试其实是第一次调用)
log.info("retry time : [{}]", attempt.getAttemptNumber());
// 距离第一次重试的延迟
log.info("retry delay : [{}]", attempt.getDelaySinceFirstAttempt());
// 重试结果: 是异常终止, 还是正常返回
log.info("hasException={}", attempt.hasException());
log.info("hasResult={}", attempt.hasResult());
// 是什么原因导致异常
if (attempt.hasException()) {
log.info("causeBy={}" , attempt.getExceptionCause().toString());
} else {
// 正常返回时的结果
log.info("result={}" , attempt.getResult());
}
log.info("log listen over.");
}
}
- Пользовательское исключение
Для некоторых исключений требуется повторная попытка, для некоторых нет.
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.exception;
/**
* 当抛出这个异常的时候,表示需要重试
*/
public class NeedRetryException extends Exception {
public NeedRetryException(String message) {
super("NeedRetryException can retry."+message);
}
}
- Реализовать конкретный бизнес повторных попыток и вызываемый интерфейс
Используйте метод call, чтобы позвонить в свою компанию.
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.math.BigDecimal;
/**
* 商品model
*/
@Data
@AllArgsConstructor
public class Product {
private Long id;
private String name;
private Integer count;
private BigDecimal price;
}
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.repository;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.model.Product;
import org.springframework.stereotype.Repository;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
/**
* 商品DAO
*/
@Repository
public class ProductRepository {
private static ConcurrentHashMap<Long,Product> products=new ConcurrentHashMap();
private static AtomicLong ids=new AtomicLong(0);
public List<Product> findAll(){
return new ArrayList<>(products.values());
}
public Product findById(Long id){
return products.get(id);
}
public Product updatePrice(Long id, BigDecimal price){
Product p=products.get(id);
if (null==p){
return p;
}
p.setPrice(price);
return p;
}
public Product addProduct(Product product){
Long id=ids.addAndGet(1);
product.setId(id);
products.put(id,product);
return product;
}
}
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.service;
import lombok.extern.slf4j.Slf4j;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.exception.NeedRetryException;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.model.Product;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.repository.ProductRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
/**
* 业务方法实现
*/
@Component
@Slf4j
public class ProductInformationHander implements Callable<Boolean> {
@Autowired
private ProductRepository pRepo;
private static Map<Long, BigDecimal> prices = new HashMap<>();
static {
prices.put(1L, new BigDecimal(100));
prices.put(2L, new BigDecimal(200));
prices.put(3L, new BigDecimal(300));
prices.put(4L, new BigDecimal(400));
prices.put(8L, new BigDecimal(800));
prices.put(9L, new BigDecimal(900));
}
@Override
public Boolean call() throws Exception {
log.info("sync price begin,prices size={}", prices.size());
for (Long id : prices.keySet()) {
Product product = pRepo.findById(id);
if (null == product) {
throw new NeedRetryException("can not find product by id=" + id);
}
if (null == product.getCount() || product.getCount() < 1) {
throw new NeedRetryException("product count is less than 1, id=" + id);
}
Product updatedP = pRepo.updatePrice(id, prices.get(id));
if (null == updatedP) {
return false;
}
prices.remove(id);
}
log.info("sync price over,prices size={}", prices.size());
return true;
}
}
- Построить ретриер
Создайте Retryer с приведенной выше реализацией в качестве параметра.
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.service;
import com.github.rholder.retry.*;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.exception.NeedRetryException;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.listener.RetryLogListener;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.strategy.SpinBlockStrategy;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* 构造重试器
*/
@Component
public class ProductRetryerBuilder {
public Retryer build() {
//定义重试机制
Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder()
//retryIf 重试条件
//.retryIfException()
//.retryIfRuntimeException()
//.retryIfExceptionOfType(Exception.class)
//.retryIfException(Predicates.equalTo(new Exception()))
//.retryIfResult(Predicates.equalTo(false))
.retryIfExceptionOfType(NeedRetryException.class)
//等待策略:每次请求间隔1s
.withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS))
//停止策略 : 尝试请求3次
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
//时间限制 : 某次请求不得超过2s , 类似: TimeLimiter timeLimiter = new SimpleTimeLimiter();
.withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(2, TimeUnit.SECONDS))
//默认的阻塞策略:线程睡眠
//.withBlockStrategy(BlockStrategies.threadSleepStrategy())
//自定义阻塞策略:自旋锁
.withBlockStrategy(new SpinBlockStrategy())
//自定义重试监听器
.withRetryListener(new RetryLogListener())
.build();
return retryer;
}
}
- Выполнение Retryer в сочетании с задачами на время
Временную задачу нужно запустить только один раз, но все стратегии повтора фактически реализованы. Это значительно упрощает конструкцию таймера.
первое использование@EnableScheduling
Объявляет, что элемент поддерживает аннотации таймера.
@SpringBootApplication
@EnableScheduling
public class DemoRetryerApplication {
public static void main(String[] args) {
SpringApplication.run(DemoRetryerApplication.class, args);
}
}
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.task;
import com.github.rholder.retry.Retryer;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.service.ProductInformationHander;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.service.ProductRetryerBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* 商品信息定时器
*/
@Component
public class ProductScheduledTasks {
@Autowired
private ProductRetryerBuilder builder;
@Autowired
private ProductInformationHander hander;
/**
* 同步商品价格定时任务
* @Scheduled(fixedDelay = 30000) :上一次执行完毕时间点之后30秒再执行
*/
@Scheduled(fixedDelay = 30*1000)
public void syncPrice() throws Exception{
Retryer retryer=builder.build();
retryer.call(hander);
}
}
Результат выполнения: поскольку продукта нет, после повторной попытки возникает исключение.
2019-二月-28 14:37:52.667 INFO [scheduling-1] n.i.t.f.s.i.d.r.g.l.RetryLogListener - log listen over.
2019-二月-28 14:37:52.672 ERROR [scheduling-1] o.s.s.s.TaskUtils$LoggingErrorHandler - Unexpected error occurred in scheduled task.
com.github.rholder.retry.RetryException: Retrying failed to complete successfully after 3 attempts.
at com.github.rholder.retry.Retryer.call(Retryer.java:174)
Вы также можете добавить некоторые данные о продукте, чтобы увидеть результат успешной повторной попытки.
Полный пример кода находится по адресуздесь.
Проблемы, возникающие при использовании
Конфликт версий Гуавы
Поскольку версия guava, от которой зависит проект, слишком низкая, при запуске проекта возникает следующее исключение.
java.lang.NoSuchMethodError: com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor()Lcom/google/common/util/concurrent/ListeningExecutorService;
at org.apache.curator.framework.listen.ListenerContainer.addListener(ListenerContainer.java:41)
at com.bzn.curator.ZkOperator.getZkClient(ZkOperator.java:207)
at com.bzn.curator.ZkOperator.checkExists(ZkOperator.java:346)
at com.bzn.curator.watcher.AbstractWatcher.initListen(AbstractWatcher.java:87)
at com.bzn.web.listener.NebulaSystemInitListener.initZkWatcher(NebulaSystemInitListener.java:84)
at com.bzn.web.listener.NebulaSystemInitListener.contextInitialized(NebulaSystemInitListener.java:33)
at org.apache.catalina.core.StandardContext.listenerStart(StandardContext.java:4939)
at org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5434)
at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:150)
at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1559)
at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1549)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Поэтому необходимо исключить в проекте младшие версии guava-зависимостей.
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
Между тем с тех пор, как в новой версии удалили ГуавуsameThreadExecutor
метод, но ЗК в текущем проекте требует этот метод, поэтому нужно вручную установить соответствующую версию гуавы.
Конечно, этот метод MoreExecutors все еще существует в версии 19.0, но он помечен как просроченный.
@Deprecated
@GwtIncompatible("TODO")
public static ListeningExecutorService sameThreadExecutor() {
return new DirectExecutorService();
}
Версия гуавы, которая объявляет зависимость, может быть изменена на 19.0.0.
<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
</dependency>
Динамическая настройка стратегии повторных попыток
В процессе фактического использования часто необходимо настроить стратегию повторных попыток, например количество повторных попыток и время ожидания, поэтому конфигурация стратегии повторных попыток параметризуется и сохраняется, что может быть динамически изменено.
Например, увеличьте время ожидания и количество повторных попыток во время seckill, торгового фестиваля Double Eleven и других периодов, чтобы обеспечить непиковые запросы. В обычное время время ожидания и количество повторных попыток могут быть соответствующим образом сокращены.
Для важных для системы служб, если несколько повторных попыток успешны, RetryListener можно использовать для мониторинга и оповещения.
Что касается «стратегии повторных попыток динамической корректировки», ниже приведена эталонная реализация:
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.WaitStrategy;
/**
* 自定义等待策略:根据重试次数动态调节等待时间,第一次请求间隔1s,第二次间隔10s,第三次及以后都是20s。
*
*
* 在创建Retryer的时候通过withWaitStrategy将该等待策略生效即可。
*
* RetryerBuilder.<Boolean>newBuilder()
* .withWaitStrategy(new AlipayWaitStrategy())
*
* 类似的效果也可以通过自定义 BlockStrategy 来实现,你可以写一下试试。
*
*/
public class AlipayWaitStrategy implements WaitStrategy {
@Override
public long computeSleepTime(Attempt failedAttempt) {
long number = failedAttempt.getAttemptNumber();
if (number==1){
return 1*1000;
}
if (number==2){
return 10*1000;
}
return 20*1000;
}
}