Объединение запросов Hystrix и кэширование запросов (1): кэширование запросов

Java

предисловие

После праздника Национального дня автор боролся с праздничным синдромом, особенно выходя на работу по субботам. . .

Я думаю, что все знакомы с Hystrix, в его исходном коде много используется RxJava, так получилось, что автор является инженером-разработчиком Android в своем старом бизнесе.Объединение и кэширование запросов HystixНекоторый урожай некоторого исходного кода.

Введение в Хайстрикс

  • Hystrix имеет открытый исходный код Netflix и официально определяется следующим образом:

Hystrix is a latency and fault tolerance library designed to isolate points of access to remote systems, services and 3rd party libraries, stop cascading failure and enable resilience in complex distributed systems where failure is inevitable.

  • Автор понимает: в распределенной среде ошибки неизбежны, а Hystrix предоставляет механизм изоляции, деградации и прерывания цепи.

1. Изоляция: Изоляция позволяет избежать взаимного влияния между службами.Если одна служба недоступна, это не повлияет на другие службы.Сервис Лавина. 2. Деградация: В распределенной среде невозможно избежать ситуации, когда служба недоступна, и механизм деградации может обеспечить более дружественное взаимодействие (значение по умолчанию, аварийный возврат). 3. Прерыватель цепи. Механизм прерывателя цепи позволяет избежать ситуации, когда служба недоступна, вызывающая служба по-прежнему вызывает недоступную службу, что приводит к повышенному потреблению ресурсов и затрат времени. 4. Обеспечьте визуальный мониторинг, Hystrix Dashboard. 4. Конечно, есть и то, о чем сегодня хочет рассказать авторЗапросить слияние и запросить кэширование.

  • Запрос на слияние и запрос на кеширование, соответствующий п.3 официального **Что он делает?**:

Parallel execution. Concurrency aware request caching. Automated batching through request collapsing.

  • Ниже приведены все официальные тестовые примеры в качестве записи, чтобы найти исходный код и проанализировать его.

1. Кэш запроса: CommandUsingRequestCache 2. Запросить слияние: CommandCollaperGetValueForKey

кеш запросов

  • Пример кэширования запросов находится вCommandUsingRequestCache, унаследовано отHystrixCommand, и общееCommandПоследовательный.
  • Итак, какая разница на уровне кода с кэшированием и без него?

1. ИнициализацияHystrixRequestContext2. ПереписатьgetCacheKey

HystrixRequestContext

  • HystrixRequestContext.initializeContextкод находится вHystrixRequestContext, по названию класса видно, что это контекст запроса, в котором сохраняется некоторая информация о запросе.

  • Как видно из исходного кода, new производитHystrixRequestContext, подключенныйThreadLocalв переменной.

	
    private static ThreadLocal<HystrixRequestContext> requestVariables = new ThreadLocal<HystrixRequestContext>();

/**
     * Call this at the beginning of each request (from parent thread)
     * to initialize the underlying context so that {@link HystrixRequestVariableDefault} can be used on any children threads and be accessible from
     * the parent thread.
     * <p>
     * <b>NOTE: If this method is called then <code>shutdown()</code> must also be called or a memory leak will occur.</b>
     * <p>
     * See class header JavaDoc for example Servlet Filter implementation that initializes and shuts down the context.
     */
    public static HystrixRequestContext initializeContext() {
        HystrixRequestContext state = new HystrixRequestContext();
        requestVariables.set(state);
        return state;
    }
  • Так,HystrixRequestContextКакова структура данных контекста хранения?
// 每个HystrixRequestContext实例,都会有一个ConcurrentMap
ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>> state = new ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>>();

/**
     删除ConcurrentMap中存储的所有键值对,如果初始化了HystrixRequestContext对象,没有调用shutdown方法,确实会导致内存泄漏,因为state还在。
     */
    public void shutdown() {
        if (state != null) {
            for (HystrixRequestVariableDefault<?> v : state.keySet()) {
                // for each RequestVariable we call 'remove' which performs the shutdown logic
                try {
                    HystrixRequestVariableDefault.remove(this, v);
                } catch (Throwable t) {
                    HystrixRequestVariableDefault.logger.error("Error in shutdown, will continue with shutdown of other variables", t);
                }
            }
            // null out so it can be garbage collected even if the containing object is still
            // being held in ThreadLocals on threads that weren't cleaned up
            state = null;
        }
    }
  • этоConcurrentHashMapХранится вHystrixRequestVariableDefaultи статические внутренние классыHystrixRequestVariableDefault.LazyInitializerЧто это?

HystrixRequestVariableDefault

  • HystrixRequestVariableDefaultНа самом деле общий тип хранитсяTизvalue, и инкапсулируетinitialValue,get,setметод.
  • LazyInitializerКак следует из названия, это для ленивой инициализации.value, при разработке внутреннего класса.
// 作用一:作为内部类调用HystrixRequestVariableDefault.initialValue方法,通过维护initialized布尔值,使HystrixRequestVariableDefault.initialValue方法只调用一次。
// 作用二:new一个LazyInitializer对象或LazyInitializer被垃圾回收时不会调用HystrixRequestVariableDefault.initialValue方法,也就是说对于业务初始化逻辑的影响被排除。
// 作用三:调用get方法时,可以通过CAS乐观锁的方式实现value的获取,具体请参照get方法。
static final class LazyInitializer<T> {
        // @GuardedBy("synchronization on get() or construction")
        private T value;

        /*
         * Boolean to ensure only-once initialValue() execution instead of using
         * a null check in case initialValue() returns null
         */
        // @GuardedBy("synchronization on get() or construction")
        private boolean initialized = false;

        private final HystrixRequestVariableDefault<T> rv;

		// 不会调用HystrixRequestVariableDefault.initialValue,不会更新initialized值
        private LazyInitializer(HystrixRequestVariableDefault<T> rv) {
            this.rv = rv;
        }

		// 不会调用HystrixRequestVariableDefault.initialValue,只能通过set方式调用
        private LazyInitializer(HystrixRequestVariableDefault<T> rv, T value) {
            this.rv = rv;
            this.value = value;
            this.initialized = true;
        }
		// 如果未初始化(没有调用过set方法)过,则返回HystrixRequestVariableDefault.initialValue的值,初始化过则返回初始化的值
        public synchronized T get() {
            if (!initialized) {
                value = rv.initialValue();
                initialized = true;
            }
            return value;
        }
    }
  • получить метод, начните сConcurrentHashMapвынуть соответствующийLazyInitializer, если он пуст, используйте метод оптимистичной блокировки CAS, новыйLazyInitializerи депозитConcurrentHashMap, и, наконец, перезвонитьLazyInitializer.get()и вернуться
public T get() {
		// 当前线程的HystrixRequestContext为null 或 ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>> 为null
        if (HystrixRequestContext.getContextForCurrentThread() == null) {
            throw new IllegalStateException(HystrixRequestContext.class.getSimpleName() + ".initializeContext() must be called at the beginning of each request before RequestVariable functionality can be used.");
        }
        ConcurrentHashMap<HystrixRequestVariableDefault<?>, LazyInitializer<?>> variableMap = HystrixRequestContext.getContextForCurrentThread().state;

        // short-circuit the synchronized path below if we already have the value in the ConcurrentHashMap
        LazyInitializer<?> v = variableMap.get(this);
        if (v != null) {
            return (T) v.get();
        }

        /*
         * 乐观锁方式(CAS)new一个LazyInitializer,放进ConcurrentHashMap 
         * 这里值得注意的是,不调用LazyInitializer.get方法是不会执行HystrixRequestVariableDefault.initialValue,故当putIfAbsent失败时,可以乐观地放弃该实例,使该实例被GC。
         * 不管哪个LazyInitializer实例的get方法被调用,HystrixRequestVariableDefault.initialValue也只会被调用一次。
         */
        LazyInitializer<T> l = new LazyInitializer<T>(this);
        LazyInitializer<?> existing = variableMap.putIfAbsent(this, l);
        if (existing == null) {
            /*
             * We won the thread-race so can use 'l' that we just created.
             */
            return l.get();
        } else {
            /*
             * We lost the thread-race so let 'l' be garbage collected and instead return 'existing'
             */
            return (T) existing.get();
        }
    }

отношения между типами

  • Один запрос (не ограничиваясь одним потоком) -> HystrixRequestContext -> ConcurrentHashMap
  • Другими словами, каждый запрос имеет карту ConcurrentHashMap.

получить кеш

  • getCacheKeyпереписанныйAbstractCommand.getCacheKeyметод,AbstractCommandзаHystrixCommandбазовый класс.
    enter image description here
  • Из приведенного выше рисунка мы видим, чтоexecuteметод, который в итоге вызываетtoObservableметод, в то время какtoObservableметод вAbstractCommand, поэтому мы можем сделать предварительный вывод, чтоAbstractCommand.toObservableметод, сHystrixRequestVariableDefaultИли интерфейс, который он реализует, связан с чтением и записью кеша.

*AbstractCommand.toObservableКод ключа следующий:

 final String cacheKey = getCacheKey();

                /* 如果开启了缓存功能,从缓存读取 */
                if (requestCacheEnabled) {
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                    if (fromCache != null) {
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    }
                }

				// 缓存对象
                Observable<R> hystrixObservable =
                        Observable.defer(applyHystrixSemantics)
                                .map(wrapWithAllOnNextHooks);

                Observable<R> afterCache;

                // 放进缓存
                if (requestCacheEnabled && cacheKey != null) {
                    // 包装成缓存Observable对象
                    HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
  • Далее просто ищемHystrixRequestCacheа такжеHystrixRequestVariableDefaultсвязь между,AbstractCommandчерез конструкторHystrixRequestCache.getInstanceпостроенHystrixRequestCacheобъект.
// 又是CAS,putIfAbsent。。。
 private static HystrixRequestCache getInstance(RequestCacheKey rcKey, HystrixConcurrencyStrategy concurrencyStrategy) {
        HystrixRequestCache c = caches.get(rcKey);
        if (c == null) {
            HystrixRequestCache newRequestCache = new HystrixRequestCache(rcKey, concurrencyStrategy);
            HystrixRequestCache existing = caches.putIfAbsent(rcKey, newRequestCache);
            if (existing == null) {
                // we won so use the new one
                c = newRequestCache;
            } else {
                // we lost so use the existing
                c = existing;
            }
        }
        return c;
    }
  • приходите посмотретьHystrixRequestCacheКак хранится значение, см.HystrixRequestCache.putIfAbsent.
HystrixCachedObservable<T> putIfAbsent(String cacheKey, HystrixCachedObservable<T> f) {
		// 使用HystrixRequestCache.prefix + concurrencyStrategy + HystrixCommand.getCacheKey包装成缓存key
        ValueCacheKey key = getRequestCacheKey(cacheKey);
        if (key != null) {
            // 寻找缓存,关键代码
            ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>> cacheInstance = requestVariableForCache.get(concurrencyStrategy);
            if (cacheInstance == null) {
                throw new IllegalStateException("Request caching is not available. Maybe you need to initialize the HystrixRequestContext?");
            }
            HystrixCachedObservable<T> alreadySet = (HystrixCachedObservable<T>) cacheInstance.putIfAbsent(key, f);
            if (alreadySet != null) {
                // someone beat us so we didn't cache this
                return alreadySet;
            }
        }
        // we either set it in the cache or do not have a cache key
        return null;
    }
  • requestVariableInstance.get(key)заHystrixRequestVariableHolderметод в .
 // 找到了关联。。。这里有HystrixRequestVariable
 private static ConcurrentHashMap<RVCacheKey, HystrixRequestVariable<?>> requestVariableInstance = new ConcurrentHashMap<RVCacheKey, HystrixRequestVariable<?>>();
 // 
 public T get(HystrixConcurrencyStrategy concurrencyStrategy) {
        
        RVCacheKey key = new RVCacheKey(this, concurrencyStrategy);
        HystrixRequestVariable<?> rvInstance = requestVariableInstance.get(key);
        if (rvInstance == null) {
            requestVariableInstance.putIfAbsent(key, concurrencyStrategy.getRequestVariable(lifeCycleMethods));
            /*
             * 内存泄漏检测,
             */
            if (requestVariableInstance.size() > 100) {
                logger.warn("Over 100 instances of HystrixRequestVariable are being stored. This is likely the sign of a memory leak caused by using unique instances of HystrixConcurrencyStrategy instead of a single instance.");
            }
        }
        // HystrixRequestVariable.get取出ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>的map,再从ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>中根据重写的getCacheKey构造出ValueCacheKey,拿出缓存值。
        return (T) requestVariableInstance.get(key).get();
    }

Получить соответствующую связь каждого объекта в процессе кэширования

  • ключ команды
  • A HystrixRequestVariableHolder>>
  • A ConcurrentHashMap requestVariableInstance = new ConcurrentHashMap>()

Сводка кэша запросов

Наконец, суммируйте механизм кэширования запросов, один запрос соответствует одномуHystrixRequestContext,HystrixRequestVariableХраните кэшированную ценность вgetCacheKeyПостроить перепискуRVCacheKey,пройти черезHystrixRequestCacheизHystrixRequestVariableHolderполучитьHystrixRequestVariableзначение .

Суммировать

Прочитав исходный код, я обнаружил, что у автора следующие чувства:

1. Различные ConcurrentHashMap 2. Наконец, RxJava впервые используется в областях, отличных от Android. 3. Ленивая загрузка + CAS сопровождает весь процесс, и эта реализация без блокировки также будет рассмотрена в будущем

использованная литература