Эта статья взята из«Введение в весенние облако микросервисов практические боевые и продвинутые»Эта книга.
1 Настройте дизайн push-уведомлений в реальном времени после публикации
Одной из наиболее важных функций центра конфигурации является push-уведомление в режиме реального времени.Благодаря этой функции мы можем положиться на центр конфигурации, чтобы делать множество вещей. В центре конфигурации Smconf, разработанном мной, Smconf полагается на механизм Zookeeper Watch для выполнения push-уведомлений в реальном времени.
На схеме выше кратко описан общий процесс настройки публикации:
- Редактирование и публикация пользовательских конфигураций на портале
- Портал вызовет интерфейс, предоставленный службой администрирования, для публикации операции.
- После того, как служба администрирования получает запрос, она отправляет сообщение ReleaseMessage каждой службе конфигурации, чтобы уведомить об изменении конфигурации службы конфигурации.
- После получения Config Service ReleaseMessage уведомление, соответствующее клиенту, в зависимости от длины реализованного соединения Http
2 Реализация ReleaseMessage
Сообщение ReleaseMessage представляет собой простую очередь сообщений, реализованную Mysql. Причина отказа от использования промежуточного программного обеспечения сообщений состоит в том, чтобы сделать Apollo максимально простым при развертывании и максимально уменьшить внешние зависимости.
На приведенном выше рисунке кратко описан общий процесс отправки ReleaseMessage:
- Служба администрирования вставит запись сообщения в таблицу ReleaseMessage после публикации конфигурации.
- Служба конфигурации запустит поток для регулярного сканирования таблицы ReleaseMessage на наличие новых записей сообщений.
- Когда служба конфигурации находит новую запись сообщения, она уведомляет об этом все прослушиватели сообщений.
- После того, как прослушиватель сообщений получит освобожденную информацию, соответствующий клиент уведомит
3 Реализация клиента уведомлений Config Service
Уведомление реализовано на основе длинного HTTP-соединения, которое в основном делится на следующие этапы:
- Клиент инициирует HTTP-запрос к интерфейсу уведомлений/v2 службы конфигурации.
- Интерфейс v2 приостанавливает запрос через Spring DeferredResult и не возвращает сразу
- Если в течение 60 секунд не будет опубликована ни одна конфигурация, которая интересует клиента, он вернет клиенту код состояния Http 304.
- Если будет обнаружено, что конфигурация изменена, будет вызван метод setResult DeferredResult, и будет передана информация о пространстве имен изменения конфигурации, и запрос будет немедленно возвращен.
- После того как клиент получит пространство имен изменения конфигурации из возвращенного результата, он немедленно запросит у службы конфигурации получение последней конфигурации пространства имен.
4 Дизайн push-уведомлений в реальном времени для анализа исходного кода
Apollo отправляет много кода, поэтому я не буду подробно анализировать его в этой книге, я немного упростил push-код и объяснил его вам, чтобы его было легче понять. Конечно, моя часть будет относительно простой, и многие детали не будут рассматриваться, просто чтобы все поняли основной принцип толчка Аполлона.
Для логики отправки ReleaseMessage мы пишем простой интерфейс, сохраняем его в очереди и вызываем этот интерфейс во время тестирования для имитации обновления конфигурации и отправки сообщения ReleaseMessage.
@RestController
public class NotificationControllerV2 implements ReleaseMessageListener {
// 模拟配置更新,往里插入数据表示有更新
public static Queue<String> queue = new LinkedBlockingDeque<>();
@GetMapping("/addMsg")
public String addMsg() {
queue.add("xxx");
return "success";
}
}
Как мы упоминали ранее, после отправки сообщения служба конфигурации запустит поток для регулярного сканирования таблицы ReleaseMessage, чтобы увидеть, есть ли новая запись сообщения, а затем уведомит об этом клиент.Здесь мы также запускаем поток для сканирования:
@Component
public class ReleaseMessageScanner implements InitializingBean {
@Autowired
private NotificationControllerV2 configController;
@Override
public void afterPropertiesSet() throws Exception {
// 定时任务从数据库扫描有没有新的配置发布
new Thread(() -> {
for (;;) {
String result = NotificationControllerV2.queue.poll();
if (result != null) {
ReleaseMessage message = new ReleaseMessage();
message.setMessage(result);
configController.handleMessage(message);
}
}
}).start();;
}
}
Цикл чтения очереди в NotificationControllerV2, создание объекта ReleaseMessage, если есть сообщение, а затем вызов метода handleMessage() в NotificationControllerV2 для обработки сообщения.
ReleaseMessage — это поле, имитирующее содержимое сообщения:
public class ReleaseMessage {
private String message;
public void setMessage(String message) {
this.message = message;
}
public String getMessage() {
return message;
}
}
Далее, какую работу мы проделали, увидев HandleMessage
NotificationControllerV2 реализует интерфейс ReleaseMessageListener, а метод handleMessage() определен в ReleaseMessageListener.
public interface ReleaseMessageListener {
void handleMessage(ReleaseMessage message);
}
handleMessage — это прослушиватель сообщений уведомления при изменении конфигурации.После того, как прослушиватель сообщений получит информацию, опубликованную конфигурацией, он уведомит соответствующего клиента:
@RestController
public class NotificationControllerV2 implements ReleaseMessageListener {
private final Multimap<String, DeferredResultWrapper> deferredResults = Multimaps
.synchronizedSetMultimap(HashMultimap.create());
@Override
public void handleMessage(ReleaseMessage message) {
System.err.println("handleMessage:"+ message);
List<DeferredResultWrapper> results = Lists.newArrayList(deferredResults.get("xxxx"));
for (DeferredResultWrapper deferredResultWrapper : results) {
List<ApolloConfigNotification> list = new ArrayList<>();
list.add(new ApolloConfigNotification("application", 1));
deferredResultWrapper.setResult(list);
}
}
}
Отправка Apollo в реальном времени основана на Spring DeferredResult.В методе handleMessage() вы можете видеть, что DeferredResult получается через deferredResults, deferredResults — это Multimap в первой строке, Key — это фактически содержимое сообщения, а Value — это DeferredResultWrapper, класс бизнес-оболочки DeferredResult. Давайте посмотрим на код DeferredResultWrapper:
public class DeferredResultWrapper {
private static final long TIMEOUT = 60 * 1000;// 60 seconds
private static final ResponseEntity<List<ApolloConfigNotification>> NOT_MODIFIED_RESPONSE_LIST =
new ResponseEntity<>(HttpStatus.NOT_MODIFIED);
private DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> result;
public DeferredResultWrapper() {
result = new DeferredResult<>(TIMEOUT, NOT_MODIFIED_RESPONSE_LIST);
}
public void onTimeout(Runnable timeoutCallback) {
result.onTimeout(timeoutCallback);
}
public void onCompletion(Runnable completionCallback) {
result.onCompletion(completionCallback);
}
public void setResult(ApolloConfigNotification notification) {
setResult(Lists.newArrayList(notification));
}
public void setResult(List<ApolloConfigNotification> notifications) {
result.setResult(new ResponseEntity<>(notifications, HttpStatus.OK));
}
public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> getResult() {
return result;
}
}
Установить возвращаемый результат клиенту с помощью метода setResult().Вышеизложенный принцип уведомления клиента через прослушиватель сообщений при изменении конфигурации.Когда клиент обращается к нему?
@RestController
public class NotificationControllerV2 implements ReleaseMessageListener {
// 模拟配置更新,往里插入数据表示有更新
public static Queue<String> queue = new LinkedBlockingDeque<>();
private final Multimap<String, DeferredResultWrapper> deferredResults = Multimaps
.synchronizedSetMultimap(HashMultimap.create());
@GetMapping("/getConfig")
public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> getConfig() {
DeferredResultWrapper deferredResultWrapper = new DeferredResultWrapper();
List<ApolloConfigNotification> newNotifications = getApolloConfigNotifications();
if (!CollectionUtils.isEmpty(newNotifications)) {
deferredResultWrapper.setResult(newNotifications);
} else {
deferredResultWrapper.onTimeout(() -> {
System.err.println("onTimeout");
});
deferredResultWrapper.onCompletion(() -> {
System.err.println("onCompletion");
});
deferredResults.put("xxxx", deferredResultWrapper);
}
return deferredResultWrapper.getResult();
}
private List<ApolloConfigNotification> getApolloConfigNotifications() {
List<ApolloConfigNotification> list = new ArrayList<>();
String result = queue.poll();
if (result != null) {
list.add(new ApolloConfigNotification("application", 1));
}
return list;
}
}
NotificationControllerV2 предоставляет интерфейс /getConfig. Клиент будет вызывать этот интерфейс при запуске. В это время будет выполнен метод getApolloConfigNotifications() для получения информации об изменении конфигурации. Если есть, это доказывает, что конфигурация была изменена. deferredResultWrapper .setResult(newNotifications); Результат возвращается клиенту. После получения результата клиент повторно извлекает информацию о конфигурации, чтобы перезаписать локальную конфигурацию.
Если метод getApolloConfigNotifications() не возвращает информацию об изменении конфигурации, что доказывает, что конфигурация не была изменена, объект DeferredResultWrapper добавляется к deferredResults, а прослушиватель сообщений уведомляется об изменении последующей конфигурации.
При этом запрос будет приостановлен и вернется не сразу.Приостановка реализована следующим кодом в DeferredResultWrapper:
private static final long TIMEOUT = 60 * 1000;// 60 seconds
private static final ResponseEntity<List<ApolloConfigNotification>> NOT_MODIFIED_RESPONSE_LIST =
new ResponseEntity<>(HttpStatus.NOT_MODIFIED);
private DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> result;
public DeferredResultWrapper() {
result = new DeferredResult<>(TIMEOUT, NOT_MODIFIED_RESPONSE_LIST);
}
При создании объекта DeferredResult указываются время ожидания и код ответа, возвращаемый по истечении времени ожидания.Если в течение 60 секунд нет уведомления от прослушивателя сообщений, время ожидания запроса истечет, а код ответа, полученный клиентом после истечения времени ожидания, тайм-аут 304.
На этом весь процесс Config Service закончен, теперь давайте посмотрим, как реализован клиент, просто напишем тестовый класс для имитации регистрации клиента:
public class ClientTest {
public static void main(String[] args) {
reg();
}
private static void reg() {
System.err.println("注册");
String result = request("http://localhost:8081/getConfig");
if (result != null) {
// 配置有更新,重新拉取配置
// ......
}
// 重新注册
reg();
}
private static String request(String url) {
HttpURLConnection connection = null;
BufferedReader reader = null;
try {
URL getUrl = new URL(url);
connection = (HttpURLConnection) getUrl.openConnection();
connection.setReadTimeout(90000);
connection.setConnectTimeout(3000);
connection.setRequestMethod("GET");
connection.setRequestProperty("Accept-Charset", "utf-8");
connection.setRequestProperty("Content-Type", "application/json");
connection.setRequestProperty("Charset", "UTF-8");
System.out.println(connection.getResponseCode());
if (200 == connection.getResponseCode()) {
reader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
StringBuilder result = new StringBuilder();
String line = null;
while ((line = reader.readLine()) != null) {
result.append(line);
}
System.out.println("结果 " + result);
return result.toString();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (connection != null) {
connection.disconnect();
}
}
return null;
}
}
Сначала запустите службу, в которой расположен интерфейс /getConfig, а затем запустите клиент.Клиент инициирует запрос на регистрацию.Если есть какая-либо модификация, результат будет получен напрямую, и будет выполнена операция обновления конфигурации. Если изменений нет, запрос будет приостановлен.Тайм-аут чтения, установленный клиентом здесь, составляет 90 секунд, что больше, чем 60-секундный тайм-аут на сервере.
После получения каждого результата, независимо от того, изменен он или нет, его необходимо перерегистрировать, таким образом можно добиться эффекта настройки push в реальном времени.
Мы можем вызвать интерфейс /addMsg, написанный ранее, для имитации изменения конфигурации, и клиент может получить возвращаемый результат сразу после вызова.
Эта статья принимается в ** "Spring Cloud Services Portal Combat и Advanced Micro" ** книга.
Книга «Весенние облако микросистемы: технология полного стека и анализ корпуса», опубликованные в прошлом году, получили поддержку и обратную связь от всех. На основании ваших отзывов, исправления и улучшения были сделаны.
На основе относительно стабильной версии Spring Cloud Finchley.SR2 и версии Spring Boot 2.0.6.RELEASE.
При этом перечисленные коды заархивированы стандартно, а предыдущие все вместе, что неудобно читателям для обращения и запуска.
В то же время был добавлен новый контент, такой как Apollo, Spring Cloud Gateway, опыт производственной практики и т. д.