Анализ принципов проектирования серверов Apollo

Java

Эта статья взята из«Введение в весенние облако микросервисов практические боевые и продвинутые»Эта книга.

1 Настройте дизайн push-уведомлений в реальном времени после публикации

Одной из наиболее важных функций центра конфигурации является push-уведомление в режиме реального времени.Благодаря этой функции мы можем положиться на центр конфигурации, чтобы делать множество вещей. В центре конфигурации Smconf, разработанном мной, Smconf полагается на механизм Zookeeper Watch для выполнения push-уведомлений в реальном времени.

来源于Apollo 文档

На схеме выше кратко описан общий процесс настройки публикации:

  • Редактирование и публикация пользовательских конфигураций на портале
  • Портал вызовет интерфейс, предоставленный службой администрирования, для публикации операции.
  • После того, как служба администрирования получает запрос, она отправляет сообщение ReleaseMessage каждой службе конфигурации, чтобы уведомить об изменении конфигурации службы конфигурации.
  • После получения Config Service ReleaseMessage уведомление, соответствующее клиенту, в зависимости от длины реализованного соединения Http

2 Реализация ReleaseMessage

Сообщение ReleaseMessage представляет собой простую очередь сообщений, реализованную Mysql. Причина отказа от использования промежуточного программного обеспечения сообщений состоит в том, чтобы сделать Apollo максимально простым при развертывании и максимально уменьшить внешние зависимости.

release-message-design.png

На приведенном выше рисунке кратко описан общий процесс отправки ReleaseMessage:

  • Служба администрирования вставит запись сообщения в таблицу ReleaseMessage после публикации конфигурации.
  • Служба конфигурации запустит поток для регулярного сканирования таблицы ReleaseMessage на наличие новых записей сообщений.
  • Когда служба конфигурации находит новую запись сообщения, она уведомляет об этом все прослушиватели сообщений.
  • После того, как прослушиватель сообщений получит освобожденную информацию, соответствующий клиент уведомит

3 Реализация клиента уведомлений Config Service

Уведомление реализовано на основе длинного HTTP-соединения, которое в основном делится на следующие этапы:

  • Клиент инициирует HTTP-запрос к интерфейсу уведомлений/v2 службы конфигурации.
  • Интерфейс v2 приостанавливает запрос через Spring DeferredResult и не возвращает сразу
  • Если в течение 60 секунд не будет опубликована ни одна конфигурация, которая интересует клиента, он вернет клиенту код состояния Http 304.
  • Если будет обнаружено, что конфигурация изменена, будет вызван метод setResult DeferredResult, и будет передана информация о пространстве имен изменения конфигурации, и запрос будет немедленно возвращен.
  • После того как клиент получит пространство имен изменения конфигурации из возвращенного результата, он немедленно запросит у службы конфигурации получение последней конфигурации пространства имен.

4 Дизайн push-уведомлений в реальном времени для анализа исходного кода

Apollo отправляет много кода, поэтому я не буду подробно анализировать его в этой книге, я немного упростил push-код и объяснил его вам, чтобы его было легче понять. Конечно, моя часть будет относительно простой, и многие детали не будут рассматриваться, просто чтобы все поняли основной принцип толчка Аполлона.

Для логики отправки ReleaseMessage мы пишем простой интерфейс, сохраняем его в очереди и вызываем этот интерфейс во время тестирования для имитации обновления конфигурации и отправки сообщения ReleaseMessage.

@RestController
public class NotificationControllerV2 implements ReleaseMessageListener {
	
    // 模拟配置更新,往里插入数据表示有更新
    public static Queue<String> queue = new LinkedBlockingDeque<>();

    @GetMapping("/addMsg")
    public String addMsg() {
	    queue.add("xxx");
	    return "success";
    }

}

Как мы упоминали ранее, после отправки сообщения служба конфигурации запустит поток для регулярного сканирования таблицы ReleaseMessage, чтобы увидеть, есть ли новая запись сообщения, а затем уведомит об этом клиент.Здесь мы также запускаем поток для сканирования:

@Component
public class ReleaseMessageScanner implements InitializingBean {

	@Autowired
	private NotificationControllerV2 configController;
	
	@Override
	public void afterPropertiesSet() throws Exception {
		// 定时任务从数据库扫描有没有新的配置发布
		new Thread(() -> {
			for (;;) {
				String result = NotificationControllerV2.queue.poll();
				if (result != null) {
					ReleaseMessage message = new ReleaseMessage();
					message.setMessage(result);
					configController.handleMessage(message);
				}
			}
		}).start();;
	}

}

Цикл чтения очереди в NotificationControllerV2, создание объекта ReleaseMessage, если есть сообщение, а затем вызов метода handleMessage() в NotificationControllerV2 для обработки сообщения.

ReleaseMessage — это поле, имитирующее содержимое сообщения:

public class ReleaseMessage {
	private String message;
	
	public void setMessage(String message) {
		this.message = message;
	}
	public String getMessage() {
		return message;
	}
}

Далее, какую работу мы проделали, увидев HandleMessage

NotificationControllerV2 реализует интерфейс ReleaseMessageListener, а метод handleMessage() определен в ReleaseMessageListener.

public interface ReleaseMessageListener {
	void handleMessage(ReleaseMessage message);
}

handleMessage — это прослушиватель сообщений уведомления при изменении конфигурации.После того, как прослушиватель сообщений получит информацию, опубликованную конфигурацией, он уведомит соответствующего клиента:

@RestController
public class NotificationControllerV2 implements ReleaseMessageListener {
	
	private final Multimap<String, DeferredResultWrapper> deferredResults = Multimaps
			.synchronizedSetMultimap(HashMultimap.create());
	
	@Override
	public void handleMessage(ReleaseMessage message) {
		System.err.println("handleMessage:"+ message);
		List<DeferredResultWrapper> results = Lists.newArrayList(deferredResults.get("xxxx"));
		for (DeferredResultWrapper deferredResultWrapper : results) {
			List<ApolloConfigNotification> list = new ArrayList<>();
			list.add(new ApolloConfigNotification("application", 1));
			deferredResultWrapper.setResult(list);
		}
	}

}

Отправка Apollo в реальном времени основана на Spring DeferredResult.В методе handleMessage() вы можете видеть, что DeferredResult получается через deferredResults, deferredResults — это Multimap в первой строке, Key — это фактически содержимое сообщения, а Value — это DeferredResultWrapper, класс бизнес-оболочки DeferredResult. Давайте посмотрим на код DeferredResultWrapper:

public class DeferredResultWrapper {
	private static final long TIMEOUT = 60 * 1000;// 60 seconds
	
	private static final ResponseEntity<List<ApolloConfigNotification>> NOT_MODIFIED_RESPONSE_LIST = 
			new ResponseEntity<>(HttpStatus.NOT_MODIFIED);

	private DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> result;

	public DeferredResultWrapper() {
		result = new DeferredResult<>(TIMEOUT, NOT_MODIFIED_RESPONSE_LIST);
	}

	public void onTimeout(Runnable timeoutCallback) {
		result.onTimeout(timeoutCallback);
	}

	public void onCompletion(Runnable completionCallback) {
		result.onCompletion(completionCallback);
	}

	public void setResult(ApolloConfigNotification notification) {
		setResult(Lists.newArrayList(notification));
	}

	public void setResult(List<ApolloConfigNotification> notifications) {
		result.setResult(new ResponseEntity<>(notifications, HttpStatus.OK));
	}

	public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> getResult() {
		return result;
	}
}

Установить возвращаемый результат клиенту с помощью метода setResult().Вышеизложенный принцип уведомления клиента через прослушиватель сообщений при изменении конфигурации.Когда клиент обращается к нему?

@RestController
public class NotificationControllerV2 implements ReleaseMessageListener {
	
	// 模拟配置更新,往里插入数据表示有更新
	public static Queue<String> queue = new LinkedBlockingDeque<>();

	private final Multimap<String, DeferredResultWrapper> deferredResults = Multimaps
			.synchronizedSetMultimap(HashMultimap.create());
	
	@GetMapping("/getConfig")
	public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> getConfig() {
		DeferredResultWrapper deferredResultWrapper = new DeferredResultWrapper();
		List<ApolloConfigNotification> newNotifications = getApolloConfigNotifications();
		if (!CollectionUtils.isEmpty(newNotifications)) {
			deferredResultWrapper.setResult(newNotifications);
		} else {
			deferredResultWrapper.onTimeout(() -> {
				System.err.println("onTimeout");
			});

			deferredResultWrapper.onCompletion(() -> {
				System.err.println("onCompletion");
			});
			deferredResults.put("xxxx", deferredResultWrapper);
		}
		return deferredResultWrapper.getResult();
	}

	private List<ApolloConfigNotification> getApolloConfigNotifications() {
		List<ApolloConfigNotification> list = new ArrayList<>();
		String result = queue.poll();
		if (result != null) {
			list.add(new ApolloConfigNotification("application", 1));
		}
		return list;
	}
}

NotificationControllerV2 предоставляет интерфейс /getConfig. Клиент будет вызывать этот интерфейс при запуске. В это время будет выполнен метод getApolloConfigNotifications() для получения информации об изменении конфигурации. Если есть, это доказывает, что конфигурация была изменена. deferredResultWrapper .setResult(newNotifications); Результат возвращается клиенту. После получения результата клиент повторно извлекает информацию о конфигурации, чтобы перезаписать локальную конфигурацию.

Если метод getApolloConfigNotifications() не возвращает информацию об изменении конфигурации, что доказывает, что конфигурация не была изменена, объект DeferredResultWrapper добавляется к deferredResults, а прослушиватель сообщений уведомляется об изменении последующей конфигурации.

При этом запрос будет приостановлен и вернется не сразу.Приостановка реализована следующим кодом в DeferredResultWrapper:

private static final long TIMEOUT = 60 * 1000;// 60 seconds
	
private static final ResponseEntity<List<ApolloConfigNotification>> NOT_MODIFIED_RESPONSE_LIST = 
			new ResponseEntity<>(HttpStatus.NOT_MODIFIED);

private DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> result;

public DeferredResultWrapper() {
	result = new DeferredResult<>(TIMEOUT, NOT_MODIFIED_RESPONSE_LIST);
}

При создании объекта DeferredResult указываются время ожидания и код ответа, возвращаемый по истечении времени ожидания.Если в течение 60 секунд нет уведомления от прослушивателя сообщений, время ожидания запроса истечет, а код ответа, полученный клиентом после истечения времени ожидания, тайм-аут 304.

На этом весь процесс Config Service закончен, теперь давайте посмотрим, как реализован клиент, просто напишем тестовый класс для имитации регистрации клиента:

public class ClientTest {
	public static void main(String[] args) {
		reg();
	}

	private static void reg() {
		System.err.println("注册");
		String result = request("http://localhost:8081/getConfig");
		if (result != null) {
			// 配置有更新,重新拉取配置
			// ......
		}
		// 重新注册
		reg();
	}
	
	private static String request(String url) {
		HttpURLConnection connection = null;
		BufferedReader reader = null;
		try {
			URL getUrl = new URL(url);
			connection = (HttpURLConnection) getUrl.openConnection();
			connection.setReadTimeout(90000);
			connection.setConnectTimeout(3000);
			connection.setRequestMethod("GET");
			connection.setRequestProperty("Accept-Charset", "utf-8");
			connection.setRequestProperty("Content-Type", "application/json");
			connection.setRequestProperty("Charset", "UTF-8");
			System.out.println(connection.getResponseCode());
			if (200 == connection.getResponseCode()) {
				reader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
				StringBuilder result = new StringBuilder();
				String line = null;
				while ((line = reader.readLine()) != null) {
					result.append(line);
				}
				System.out.println("结果 " + result);
				return result.toString();
			}
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			if (connection != null) {
				connection.disconnect();
			}
		}
		return null;
	}
}

Сначала запустите службу, в которой расположен интерфейс /getConfig, а затем запустите клиент.Клиент инициирует запрос на регистрацию.Если есть какая-либо модификация, результат будет получен напрямую, и будет выполнена операция обновления конфигурации. Если изменений нет, запрос будет приостановлен.Тайм-аут чтения, установленный клиентом здесь, составляет 90 секунд, что больше, чем 60-секундный тайм-аут на сервере.

После получения каждого результата, независимо от того, изменен он или нет, его необходимо перерегистрировать, таким образом можно добиться эффекта настройки push в реальном времени.

Мы можем вызвать интерфейс /addMsg, написанный ранее, для имитации изменения конфигурации, и клиент может получить возвращаемый результат сразу после вызова.

Эта статья принимается в ** "Spring Cloud Services Portal Combat и Advanced Micro" ** книга.

Книга «Весенние облако микросистемы: технология полного стека и анализ корпуса», опубликованные в прошлом году, получили поддержку и обратную связь от всех. На основании ваших отзывов, исправления и улучшения были сделаны.

На основе относительно стабильной версии Spring Cloud Finchley.SR2 и версии Spring Boot 2.0.6.RELEASE.

При этом перечисленные коды заархивированы стандартно, а предыдущие все вместе, что неудобно читателям для обращения и запуска.

В то же время был добавлен новый контент, такой как Apollo, Spring Cloud Gateway, опыт производственной практики и т. д.