Sentinel — это легкий и высокодоступный компонент управления трафиком с открытым исходным кодом для архитектуры распределенных сервисов, исходный код которого открыт командой промежуточного программного обеспечения Alibaba. понижение разрядности выключателя и защита системы от нагрузки, стабильность.
Вы можете спросить: в чем сходство и различие между Sentinel и Netflix Hystrix, широко используемой библиотекой понижения версии автоматических выключателей? На официальном сайте Sentinel есть сравнительная статья, вот выдержка из сводной таблицы, конкретное сравнение можно посмотреть по этой ссылке.
Сравнить содержимое | Sentinel | Hystrix |
---|---|---|
Карантинная политика | Изоляция семафора | Изоляция пула потоков/изоляция семафора |
стратегия понижения рейтинга автоматических выключателей | На основе времени отклика или частоты отказов | на основе частоты отказов |
Реализация индикатора в реальном времени | раздвижное окно | Скользящее окно (на основе RxJava) |
Конфигурация правила | Поддерживает несколько источников данных | Поддерживает несколько источников данных |
Расширяемость | несколько точек расширения | форма плагина |
Поддержка на основе аннотаций | служба поддержки | служба поддержки |
Ограничение | На основе количества запросов в секунду, поддержка текущего ограничения на основе отношения вызовов | не поддерживается |
формирование трафика | Поддержка медленного старта, режим постоянной скорости | не поддерживается |
Защита от нагрузки на систему | служба поддержки | не поддерживается |
консоль | Из коробки можно настраивать правила, просматривать мониторинг второго уровня, обнаружение машин и т. д. | несовершенный |
Адаптация общих фреймворков | Сервлет, Spring Cloud, Dubbo, gRPC и т. д. | Сервлет, Spring Cloud Netflix |
Как видно из сравнительной таблицы, Sentinel функциональнее Hystrix.В этой статье давайте разберемся с исходным кодом Sentinel и раскроем тайну Sentinel.
Структура проекта
Разветвите исходный код Sentinel в свой собственный репозиторий github, затем клонируйте исходный код в локальный, а затем начните путешествие по чтению исходного кода.
Сначала давайте взглянем на всю структуру проекта Sentinel:
-
Базовый модуль Sentinel-core, ограничение тока, понижение версии, защита системы и т. д. реализованы здесь.
-
модуль консоли sentinel-dashboard, который может реализовать визуальное управление подключенными клиентами Sentinel
-
Модуль передачи Sentinel-Transport предоставляет базовые серверные и клиентские API-интерфейсы мониторинга, а также некоторые реализации на основе различных библиотек.
-
модуль расширения sentinel-extension, который в основном реализует частичное расширение DataSource
-
Модуль адаптера Sentinel-адаптера в основном реализует адаптацию некоторых распространенных фреймворков.
-
Пример модуля Sentinel-demo, пожалуйста, обратитесь к тому, как использовать Sentinel для ограничения тока, перехода на более раннюю версию и т. д.
-
Модуль тестирования Sentinel-Benchmark, который предоставляет тесты точности основного кода.
Запустите образец
По сути, каждый фреймворк будет иметь примеры модулей, некоторые из которых называются примерами, некоторые — демонстрационными, и Sentinel не является исключением.
Затем давайте найдем пример из демонстрации Sentinel и запустим его, чтобы увидеть общую ситуацию.Как упоминалось выше, основная основная функция Sentinel заключается в том, чтобы выполнять ограничение тока, понижение версии и защиту системы, затем мы начнем с «ограничения тока», чтобы увидеть Sentinel. Осознайте принцип.
Вы можете видеть, что в модуле sentinel-demo есть много разных примеров.Мы находим пакет потока под базовым модулем.Под этим пакетом находится соответствующий пример ограничения тока, но есть много типов ограничения тока, поэтому мы просто ищем В соответствии с классом ограничения тока qps другие методы ограничения тока в основном одинаковы.
public class FlowQpsDemo {
private static final String KEY = "abc";
private static AtomicInteger pass = new AtomicInteger();
private static AtomicInteger block = new AtomicInteger();
private static AtomicInteger total = new AtomicInteger();
private static volatile boolean stop = false;
private static final int threadCount = 32;
private static int seconds = 30;
public static void main(String[] args) throws Exception {
initFlowQpsRule();
tick();
// first make the system run on a very low condition
simulateTraffic();
System.out.println("===== begin to do flow control");
System.out.println("only 20 requests per second can pass");
}
private static void initFlowQpsRule() {
List<FlowRule> rules = new ArrayList<FlowRule>();
FlowRule rule1 = new FlowRule();
rule1.setResource(KEY);
// set limit qps to 20
rule1.setCount(20);
// 设置限流类型:根据qps
rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
rule1.setLimitApp("default");
rules.add(rule1);
// 加载限流的规则
FlowRuleManager.loadRules(rules);
}
private static void simulateTraffic() {
for (int i = 0; i < threadCount; i++) {
Thread t = new Thread(new RunTask());
t.setName("simulate-traffic-Task");
t.start();
}
}
private static void tick() {
Thread timer = new Thread(new TimerTask());
timer.setName("sentinel-timer-task");
timer.start();
}
static class TimerTask implements Runnable {
@Override
public void run() {
long start = System.currentTimeMillis();
System.out.println("begin to statistic!!!");
long oldTotal = 0;
long oldPass = 0;
long oldBlock = 0;
while (!stop) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
long globalTotal = total.get();
long oneSecondTotal = globalTotal - oldTotal;
oldTotal = globalTotal;
long globalPass = pass.get();
long oneSecondPass = globalPass - oldPass;
oldPass = globalPass;
long globalBlock = block.get();
long oneSecondBlock = globalBlock - oldBlock;
oldBlock = globalBlock;
System.out.println(seconds + " send qps is: " + oneSecondTotal);
System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal
+ ", pass:" + oneSecondPass
+ ", block:" + oneSecondBlock);
if (seconds-- <= 0) {
stop = true;
}
}
long cost = System.currentTimeMillis() - start;
System.out.println("time cost: " + cost + " ms");
System.out.println("total:" + total.get() + ", pass:" + pass.get()
+ ", block:" + block.get());
System.exit(0);
}
}
static class RunTask implements Runnable {
@Override
public void run() {
while (!stop) {
Entry entry = null;
try {
entry = SphU.entry(KEY);
// token acquired, means pass
pass.addAndGet(1);
} catch (BlockException e1) {
block.incrementAndGet();
} catch (Exception e2) {
// biz exception
} finally {
total.incrementAndGet();
if (entry != null) {
entry.exit();
}
}
Random random2 = new Random();
try {
TimeUnit.MILLISECONDS.sleep(random2.nextInt(50));
} catch (InterruptedException e) {
// ignore
}
}
}
}
}
скопировать код
После выполнения приведенного выше кода выводятся следующие результаты:
Видно, что в приведенных выше результатах количество проходов не совпадает с тем, что мы ожидали.Мы ожидали, что количество запросов в секунду, разрешенных для прохождения, равно 20, но в настоящее время есть много запросов на проходы, которые превышают 20. .
Причина в том, что тестируемый нами код использует многопоточность, обратите внимание наthreadCount
Значение , всего 32 потока для имитации,При выполнении защиты ресурсов в методе run RunTask, то есть вSphU.entry
Внутренний не заблокирован, поэтому при высоком уровне параллелизма количество проходов будет больше 20.
Это можно описать следующей моделью: есть поток TimeTicker, который ведет статистику каждую 1 секунду. Есть N потоков RunTask, имитирующих запросы, а доступный бизнес-код защищен ресурсным ключом, по правилам разрешено проходить только 20 запросов в секунду.
Поскольку такие счетчики, как количество проходов, блокировок и итогов, используются глобально, и когда несколько потоков RunTask выполняют SphU.entry для подачи заявки на вход, защита от внутренней блокировки отсутствует, поэтому количество проходов превышает установленный порог.
Для того, чтобы доказать правильность и достоверность нижнего предела тока в одном потоке, наша модель должна стать такой:
Затем я положилthreadCount
Значение изменено на 1. Существует только один поток для выполнения этого метода. Посмотрите на конкретные результаты ограничения тока. После выполнения приведенного выше кода напечатанные результаты выглядят следующим образом:
Видно, что количество проходов в основном сохраняется на уровне 20, но проходное значение первой статистики все еще превышает 20. Что является причиной этого?
На самом деле, если вы внимательно посмотрите на код в демо, вы можете обнаружить, что один поток используется для запроса моделирования, а другой поток используется для статистических результатов.Статистический поток подсчитывает результаты каждую 1 секунду, и есть ошибка времени между двумя потоками. Из временной метки, напечатанной потоком TimeTicker, видно, что, хотя статистика выполняется каждую секунду, все еще существует ошибка между текущим временем печати и последним временем, а не ровно 1000 мс.
Чтобы действительно проверить ограничение в 20 запросов в секунду и обеспечить точность данных, вам необходимо выполнить тестовый тест. Это не является предметом этой статьи. Заинтересованные студенты могут узнать о jmh. Тестовый тест в Sentinel также проводится. от jmh.
Углубленные принципы
На простом примере программы мы узнали, что sentinel может ограничивать поток запросов.Помимо текущего ограничения, он также имеет такие функции, как даунгрейд и защита системы. Теперь давайте рассеем облака и углубимся в исходный код, чтобы увидеть принцип реализации Sentinel.
Сначала начните с записи:SphU.entry()
. Этот метод будет применяться для входа.Если приложение прошло успешно, это означает, что ток не ограничен, иначе будет выброшено BlockException, и поверхность была ограничена.
отSphU.entry()
Метод будет выполняться до входаSph.entry()
, классом реализации Sph по умолчанию являетсяCtSph
, в CtSph в конечном итоге выполнится доentry(ResourceWrapperresourceWrapper,intcount,Object...args)throwsBlockException
Сюда.
Давайте посмотрим на конкретную реализацию этого метода:
public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
// Init the entry only. No rule checking will occur.
return new CtEntry(resourceWrapper, null, context);
}
if (context == null) {
context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType());
}
// Global switch is close, no rule checking will do.
if (!Constants.ON) {
return new CtEntry(resourceWrapper, null, context);
}
// 获取该资源对应的SlotChain
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
/*
* Means processor cache size exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, so no
* rule checking will be done.
*/
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
// 执行Slot的entry方法
chain.entry(context, resourceWrapper, null, count, args);
} catch (BlockException e1) {
e.exit(count, args);
// 抛出BlockExecption
throw e1;
} catch (Throwable e1) {
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
скопировать код
Этот метод можно разделить на следующие части:
-
1. Определить параметры и глобальные элементы конфигурации.Если они не соответствуют требованиям, объект CtEntry будет возвращен напрямую, и последующее определение ограничения тока не будет выполнено, в противном случае будет запущен следующий процесс обнаружения.
-
2. Получите соответствующий SlotChain в соответствии с упакованным ресурсным объектом.
-
3. Выполните метод входа в SlotChain.
-
3.1. Если метод входа SlotChain выдает BlockException, исключение будет продолжать выбрасываться вверх.
-
3.2. Если метод входа SlotChain выполняется нормально, объект входа будет возвращен в конце
-
4. Если метод верхнего уровня ловит BlockException, это означает, что запрос ограничен, в противном случае запрос может быть выполнен нормально
Наиболее важными из них являются шаги 2 и 3. Давайте разберем эти два шага.
Создать слотчейн
Сначала посмотрите на реализацию метода lookProcessChain:
private ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
// Entry size limit.
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
// 具体构造chain的方法
chain = Env.slotsChainbuilder.build();
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
скопировать код
Этот метод использует HashMap для кэширования, а ключом является объект ресурса. Заперто здесь и сделаноdoublecheck
. Конкретный метод построения цепи заключается в следующем:Env.slotsChainbuilder.build()
созданный этим кодом. Затем перейдите в этот метод и посмотрите.
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
chain.addLast(new NodeSelectorSlot());
chain.addLast(new ClusterBuilderSlot());
chain.addLast(new LogSlot());
chain.addLast(new StatisticSlot());
chain.addLast(new SystemSlot());
chain.addLast(new AuthoritySlot());
chain.addLast(new FlowSlot());
chain.addLast(new DegradeSlot());
return chain;
}
скопировать код
Цепь означает цепочку.Из метода сборки видно, что ProcessorSlotChain представляет собой связанный список с добавленным в него множеством слотов. Конкретную реализацию необходимо увидеть в DefaultProcessorSlotChain.
public class DefaultProcessorSlotChain extends ProcessorSlotChain {
AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, Object... args)
throws Throwable {
super.fireEntry(context, resourceWrapper, t, count, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
super.fireExit(context, resourceWrapper, count, args);
}
};
AbstractLinkedProcessorSlot<?> end = first;
@Override
public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {
protocolProcessor.setNext(first.getNext());
first.setNext(protocolProcessor);
if (end == first) {
end = protocolProcessor;
}
}
@Override
public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
end.setNext(protocolProcessor);
end = protocolProcessor;
}
}
скопировать код
В DefaultProcessorSlotChain есть две переменные типа AbstractLinkedProcessorSlot: первая и конечная, которые являются головным и конечным узлами связанного списка.
При создании объекта DefaultProcessorSlotChain сначала создайте первый узел, а затем назначьте первый узел хвостовому узлу, что может быть представлено следующим рисунком:
После добавления первого узла в связанный список структура всего связанного списка становится следующей:
После добавления всех узлов в связанный список структура всего связанного списка становится такой, как показано на следующем рисунке:
Это добавляет все объекты Slot в связанный список, и каждый Slot наследуется от AbstractLinkedProcessorSlot. AbstractLinkedProcessorSlot представляет собой структуру цепочки ответственности.Каждый объект имеет следующий атрибут, который указывает на другой объект AbstractLinkedProcessorSlot. На самом деле модель цепочки ответственности существует во многих фреймворках, таких как Netty, которая реализуется через пайплайн.
Теперь, когда вы знаете, как создается SlotChain, давайте посмотрим, как выполнить метод входа Slot.
Выполните метод входа SlotChain
Экземпляром ProcessorSlotChain, полученным методом lookProcessChain, является DefaultProcessorSlotChain, затем при выполнении метода chain.entry будет выполняться метод входа DefaultProcessorSlotChain, а метод входа DefaultProcessorSlotChain выглядит следующим образом:
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, Object... args)
throws Throwable {
first.transformEntry(context, resourceWrapper, t, count, args);
}
скопировать код
То есть запись DefaultProcessorSlotChain на самом деле является методом transformEntry исполняемого первого атрибута.
Метод transformEntry выполнит метод входа текущего узла, а первый узел в DefaultProcessorSlotChain перезапишет метод входа следующим образом:
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, Object... args)
throws Throwable {
super.fireEntry(context, resourceWrapper, t, count, args);
}
скопировать код
Метод входа первого узла на самом деле является методом fireEntry исполняемого super, поэтому продолжайте обращать внимание на метод fireEntry следующим образом:
@Override
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, Object... args)
throws Throwable {
if (next != null) {
next.transformEntry(context, resourceWrapper, obj, count, args);
}
}
скопировать код
Как видно отсюда, запись выполнения передается из метода fireEntry.Здесь будет выполняться метод transformEntry следующего узла текущего узла.Как было проанализировано выше, метод transformEntry вызовет запись текущего узла. , то есть метод fireEntry на самом деле является методом входа, который запускает следующий узел. Конкретный процесс показан на следующем рисунке:
Как видно из рисунка, с начального вызова метода entry() цепочки, он изменился на вызов метода entry() слота в SlotChain. Из приведенного выше анализа мы можем узнать, что первым узлом Slot в SlotChain является NodeSelectorSlot.
Выполните метод входа слота
Теперь можно обратить внимание на метод входа первого узла NodeSelectorSlot в SlotChain, конкретный код выглядит следующим образом:
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, Object... args)
throws Throwable {
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
node = Env.nodeBuilder.buildTreeNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
}
// Build invocation tree
((DefaultNode)context.getLastNode()).addChild(node);
}
}
context.setCurNode(node);
// 由此触发下一个节点的entry方法
fireEntry(context, resourceWrapper, node, count, args);
}
скопировать код
Как видно из кода, узел NodeSelectorSlot выполнил некоторую собственную обработку бизнес-логики. Для получения подробной информации вы можете углубиться в исходный код, чтобы продолжить отслеживание. Вот общее введение в функциональные обязанности каждого слота:
-
NodeSelectorSlot
Отвечает за сбор путей к ресурсам и сохранение путей вызова этих ресурсов в древовидной структуре для текущего ограничения и понижения в соответствии с путями вызова; -
ClusterBuilderSlot
Он используется для хранения статистической информации о ресурсе и информации о вызывающем абоненте, такой как RT, QPS, количество потоков и т. д. ресурса, которые будут использоваться в качестве основы для многомерного ограничения тока и понижения; -
StatistcSlot
Он используется для записи и подсчета информации о времени выполнения на разных широтах; -
FlowSlot
Он используется для ограничения тока в соответствии с предустановленными правилами ограничения тока и состоянием статистики предыдущего слота; -
AuthorizationSlot
Затем по черному и белому списку сделать управление черным и белым списком; -
DegradeSlot
Затем с помощью статистической информации и заданных правил автоматический выключатель деградирует; -
SystemSlot
Затем общий входящий поток контролируется состоянием системы, например load1 и т. д.;
После выполнения обработки бизнес-логики вызывается метод fireEntry(), тем самым инициируя метод входа следующего узла. На данный момент мы знаем, что цепочка ответственности дозорного передается следующим образом: после того, как каждый узел Slot выполнит свое собственное дело, он вызовет fireEntry, чтобы вызвать метод входа следующего узла.
Таким образом, приведенную выше картину можно завершить следующим образом:
В этот момент через SlotChain был вызван метод entry() каждого узла.Каждый узел будет выполнять свою собственную логическую обработку в соответствии с созданными правилами.Когда статистический результат достигает установленного порога, он инициирует ограничение тока и понижение рейтинга. другие события, в частности создание BlockException.
Суммировать
Sentinel в основном основан на 7 различных слотах для формирования связанного списка.Каждый слот выполняет свои собственные обязанности.После выполнения своих собственных задач он передает запрос следующему слоту, пока не столкнется с правилом в слоте.Завершает работу, вызывая исключение BlockException.
Первые три слота отвечают за статистику, а последние слоты отвечают за конкретное управление на основе результатов статистики в сочетании с настроенными правилами, будь то блокировка запроса или освобождение.
Также есть много вариантов типа контроля: по qps, количеству потоков, холодный старт и т.д.
Затем на основе этого основного метода выводятся многие другие функции:
-
1. Консоль информационной панели может визуально управлять каждым подключенным дозорным клиентом (путем отправки пульсирующего сообщения) и обмениваться данными между информационной панелью и клиентом по протоколу http.
-
2. Сохранение правил.Реализуя интерфейс DataSource, настроенные правила могут сохраняться различными способами.Правила по умолчанию находятся в памяти
-
3. Адаптируйтесь к основным платформам, включая сервлет, dubbo, rRpc и т. д.
Панель управленияКонсоль
Sentinel-dashboard — это отдельное приложение, запускаемое через spring-boot и в основном предоставляющее облегченную консоль, которая обеспечивает обнаружение машин, мониторинг ресурсов одной машины в реальном времени, агрегацию ресурсов кластера и функции управления правилами.
Нам нужна только простая конфигурация приложения, чтобы использовать эти функции.
1 Запустите консоль
1.1 Скачайте код и скомпилируйте консоль
-
Скачать консольный проект
-
Упакуйте код в толстую банку с:
mvn cleanpackage
1.2 Старт
Запустите скомпилированную консоль следующей командой:
$ java -Dserver.port=8080 -Dcsp.sentinel.dashboard.server=localhost:8080 -jar target/sentinel-dashboard.jar
скопировать код
В приведенной выше команде мы указали параметр JVM,-Dserver.port=8080
Используется для указания порта запуска Spring Boot как8080
.
2 Консоль клиентского доступа
После запуска консоли клиент должен выполнить следующие шаги, чтобы получить доступ к консоли.
2.1 Знакомство с клиентским jar-пакетом
пройти черезpom.xml
Импортируйте пакет jar:
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-transport-simple-http</artifactId>
<version>x.y.z</version>
</dependency>
скопировать код
2.2 Настройка параметров запуска
Добавить параметры JVM при запуске-Dcsp.sentinel.dashboard.server=consoleIp:port
Укажите адрес консоли и порт. Если вы запускаете несколько приложений, вам нужно пройти-Dcsp.sentinel.api.port=xxxx
Указывает порт для клиентского API мониторинга (по умолчанию 8719).
Помимо изменения параметров JVM, того же эффекта можно добиться и с помощью конфигурационных файлов. Дополнительные сведения см. в разделе Элементы конфигурации запуска.
2.3 Инициация инициализации клиента
Убедитесь, что у клиентов есть трафик, Страж будетКогда клиент звонит в первый разИнициализируйте и начните отправлять пакеты пульса на консоль.
Sentinel-dashboard — это независимое веб-приложение, которое принимает подключения от клиентов, а затем связывается с клиентами, используя для связи протокол http. Связь между ними показана на следующем рисунке:
dashboard
После того, как дашборд запустится, он будет ждать подключения клиента.MachineRegistryController
существует одинreceiveHeartBeat
метод, клиент отправляет сообщение пульса, которое должно запросить этот метод через http.
После получения сообщения пульса от клиента панель мониторинга инкапсулирует IP-адрес, порт и другую информацию, переданную клиентом, вMachineInfo
объект, а затем передать этот объект черезMachineDiscovery
интерфейсaddMachine
Метод добавляется в ConcurrentHashMap и сохраняется.
Здесь будет проблема, потому что информация о клиенте хранится в памяти панели мониторинга, поэтому при перезапуске приложения панели мониторинга информация о клиенте, которая была отправлена ранее, будет потеряна.
client
Когда клиент запускается, он выбирает один через CommandCenterInitFunc и выбирает только один CommandCenter для запуска.
Перед запуском все классы реализации CommandHandler будут просканированы и получены с помощью spi, а затем все CommandHandler будут зарегистрированы в HashMap для последующего использования.
PS: Подумайте, почему CommandHandler не нужно делать персистентность, а напрямую хранится в памяти.
После регистрации CommandHandler немедленно запускается CommandCenter.В настоящее время CommandCenter имеет два класса реализации:
-
SimpleHttpCommandCenter запускает сервер через ServerSocket и принимает подключения через сокеты.
-
NettyHttpCommandCenter запускает сервер через Netty и принимает соединения канала
После запуска CommandCenter ожидает, пока панель мониторинга отправит сообщение.Когда сообщение будет получено, он обработает сообщение с помощью определенного CommandHandler, а затем вернет результат обработки на панель мониторинга.
Здесь следует отметить, что сообщение, отправляемое панелью управления клиенту, отправляется через асинхронный httpClient в классе HttpHelper.
Но странно то, что, поскольку оно отправляется асинхронно, а CountDownLatch используется для ожидания возврата сообщения, а затем для получения результата, не теряет ли это значение асинхронности? Конкретный код выглядит следующим образом:
private String httpGetContent(String url) {
final HttpGet httpGet = new HttpGet(url);
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<String> reference = new AtomicReference<>();
httpclient.execute(httpGet, new FutureCallback<HttpResponse>() {
@Override
public void completed(final HttpResponse response) {
try {
reference.set(getBody(response));
} catch (Exception e) {
logger.info("httpGetContent " + url + " error:", e);
} finally {
latch.countDown();
}
}
@Override
public void failed(final Exception ex) {
latch.countDown();
logger.info("httpGetContent " + url + " failed:", ex);
}
@Override
public void cancelled() {
latch.countDown();
}
});
try {
latch.await(5, TimeUnit.SECONDS);
} catch (Exception e) {
logger.info("wait http client error:", e);
}
return reference.get();
}
скопировать код
Адаптация основных фреймворков
Sentinel также адаптировал некоторые основные фреймворки, так что при использовании основных фреймворков вы также можете пользоваться защитой Sentinel. В настоящее время поддерживаются следующие адаптеры:
-
Web Servlet
-
Dubbo
-
Spring Boot / Spring Cloud
-
gRPC
-
Apache RocketMQ
На самом деле, адаптация заключается в том, чтобы передать точки расширения этих основных фреймворков, а затем добавить к точкам расширения дозорный текущий код ограничения и понижения версии. Взгляните на код адаптации сервлета, конкретный код:
public class CommonFilter implements Filter {
@Override
public void init(FilterConfig filterConfig) {
}
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
HttpServletRequest sRequest = (HttpServletRequest)request;
Entry entry = null;
try {
// 根据请求生成的资源
String target = FilterUtil.filterTarget(sRequest);
target = WebCallbackManager.getUrlCleaner().clean(target);
// “申请”该资源
ContextUtil.enter(target);
entry = SphU.entry(target, EntryType.IN);
// 如果能成功“申请”到资源,则说明未被限流
// 则将请求放行
chain.doFilter(request, response);
} catch (BlockException e) {
// 否则如果捕获了BlockException异常,说明请求被限流了
// 则将请求重定向到一个默认的页面
HttpServletResponse sResponse = (HttpServletResponse)response;
WebCallbackManager.getUrlBlockHandler().blocked(sRequest, sResponse);
} catch (IOException e2) {
// 省略部分代码
} finally {
if (entry != null) {
entry.exit();
}
ContextUtil.exit();
}
}
@Override
public void destroy() {
}
}
скопировать код
Расширьте фильтр сервлетов, чтобы реализовать фильтр, а затем ограничьте поток запроса в методе doFilter.Если запрос ограничен потоком, запрос будет перенаправлен на страницу по умолчанию, в противном случае запрос будет передан следующему фильтру. .
Правила постоянны и динамичны
Философия Sentinel заключается в том, что разработчикам нужно обращать внимание только на определение ресурсов.Когда определение ресурсов выполнено успешно, можно динамически добавлять различные правила деградации управления потоком.
Sentinel предоставляет два способа изменения правил:
-
Изменить напрямую через API (
loadRules
) -
пройти через
DataSource
Адаптироваться к различным модификациям источника данных
Изменение через API более интуитивно понятно, и вы можете изменять различные правила с помощью следующих трех API:
FlowRuleManager.loadRules(List<FlowRule> rules); // 修改流控规则
DegradeRuleManager.loadRules(List<DegradeRule> rules); // 修改降级规则
SystemRuleManager.loadRules(List<SystemRule> rules); // 修改系统规则
скопировать код
Расширение источника данных
вышесказанноеloadRules()
Метод принимает только объекты правил в памяти, но правила в памяти будут потеряны после перезапуска приложения.Чаще правила могут храниться в файле, базе данных или центре конфигурации.
DataSource
Интерфейс предоставляет нам возможность взаимодействовать с любым источником конфигурации. По сравнению с изменением правил непосредственно через API реализуйтеDataSource
Интерфейсы являются более надежным подходом.
официальныйРекомендуется устанавливать правила через консоль и отправлять правила в единый центр правил.Пользователям нужно только реализовать интерфейс DataSource для отслеживания изменений правил в центре правил, чтобы получать измененные правила в режиме реального времени..
DataSource
Общие реализации расширений:
-
режим вытягивания: клиент регулярно опрашивает и извлекает правила из центра управления правилами. Этим центром правил может быть SQL, файлы или даже система контроля версий. Способ сделать это прост, но недостатком является то, что изменения не могут быть получены вовремя;
-
толкающий режим: Центр правил равномерно отправляет изменения, а клиент постоянно отслеживает изменения, регистрируя слушателей, например, используя центры конфигурации, такие как Nacos и Zookeeper. Этот метод имеет лучшие гарантии согласованности и реального времени.
До сих пор была проанализирована основная ситуация Sentinel.Для более подробного содержания вы можете продолжить чтение исходного кода для изучения.
Меня зовут Хоуи. Если статья была вам полезна, вы можете поставить лайк и подписаться, а также можете обратить внимание на мой публичный аккаунт: