Эта статья поможет вам создать легкий сервер обмена мгновенными сообщениями с нуля.Общие идеи дизайна и архитектураЯ уже говорил об этом в моем последнем блоге, если вы не видели, пожалуйста, нажмитеРазработать сервер обмена мгновенными сообщениями с нуля.
Эта статья даст вам более подробную информацию. Я объясню, как построить полную и надежную систему обмена мгновенными сообщениями с трех аспектов.
- надежность
- безопасность
- дизайн хранения
надежность
Что такое надежность? Для системы IM надежное определение, по крайней мере,Не теряйте сообщения,сообщение не повторяется,вышел из строя, Удовлетворение этих трех пунктов, можно сказать, имеет хороший опыт общения в чате.
Не теряйте сообщения
Начнем с того, чтобы не терять сообщения.
Сначала просмотрите предыдущий дизайнСерверная архитектура:
Начнем с простого примера: когда Алиса отправляет сообщение Бобу, оно может пройти по такой ссылке:
- client-->connecter
- connector-->transfer
- transfer-->connector
- connector-->client
Проблемы могут быть в каждом звене во всем звене.Хотя протокол tcp надежен, он может гарантировать только надежность канального уровня, но не может гарантировать надежность прикладного уровня.
Например, на первом шагеconnector
полученные отclient
сообщение отправлено, но переадресованоtransfer
Если это не удастся, то сообщение Боб не сможет получить, а Алиса не поймет, что сообщение не удалось отправить.
Если состояние Боба не в сети, то цепочка сообщений будет следующей:
- client-->connector
- connector-->transfer
- transfer-->mq
Если на третьем шагеtransfer
полученные отconnector
, но автономное хранилище сообщений не удалось,
Тогда сообщение также не может быть доставлено.
Чтобы обеспечить надежность прикладного уровня, у нас должен быть механизм подтверждения, чтобы отправитель мог подтвердить, что другая сторона получила сообщение.
Для конкретной реализации мы имитируем протокол tcp, чтобы создать механизм подтверждения на прикладном уровне.
tcp-пакеты字节(byte)
единица, и мы беремmessage
Блок.Каждый раз, когда отправитель отправляет сообщение, он должен ждать ответа подтверждения от другой стороны.Сообщение подтверждения подтверждения должно содержать полученный идентификатор, чтобы отправитель мог его идентифицировать.
Во-вторых, отправитель должен поддерживать очередь, ожидающую подтверждения.Каждый раз, когда сообщение отправляется, сообщение и таймер ставятся в очередь.
Кроме того, есть поток, который все время опрашивает очередь, и если есть тайм-аут, а ack не получен, он достанет сообщение и отправит его повторно.
Есть два способа справиться с сообщением о том, что подтверждение не получено по истечении тайм-аута:
- Как и tcp, он продолжает отправлять до тех пор, пока не будет получен ack.
- Установите максимальное количество попыток.Если подтверждение не будет получено после этого количества раз, оно будет использовано.механизм отказаобработка, экономия ресурсов. Например, если это
connector
не получил в течение длительного времениclient
ack, то вы можете активно отключиться от клиента, а оставшиеся неотправленные сообщения сохраняются как автономные сообщения.После того, как клиент отключится, попробуйте повторно подключиться к серверу.
Нет повторения, нет беспорядка
Иногда из-за сетевых причин прием подтверждения может быть медленным, и отправитель будет отправлять его повторно, поэтому у получателя должен быть механизм дедупликации.
Способ дедупликации заключается в добавлении одного к каждому сообщению.уникальный идентификатор. Этот уникальный идентификатор не обязательно должен быть глобальным, он должен бытьуникальный в течение сеансаВот и все. Например, разговор между двумя людьми или группой. Если сеть отключена, после повторного подключения это будет новая сессия, и идентификатор снова начнется с 0.
Получатель должен поддерживать идентификатор последнего сообщения, полученного в текущем сеансе, который называетсяlastId
.
Каждый раз, когда поступает новое сообщение, идентификатор связывается сlastId
Сделайте сравнение, чтобы увидеть, является ли оно непрерывным, если нет, поставьтевременная очередь очередьразобрались позже.
Например:
-
текущей сессии
lastId
=1, то сервер получил сообщениеmsg(id=2)
, можно судить, что полученное сообщение является непрерывным, затем сообщение обрабатывается, иlastId
Модифицировано до 2. -
Но если сервер получает сообщение
msg(id=3)
, это означает, что сообщение приходит не по порядку, затем сообщение ставится в очередь и ожидаетlastId
После того, как оно станет равным 2, (то есть сервер получит сообщениеmsg(id=2)
и обработано), а затем извлеките сообщение для обработки.
Следовательно, чтобы судить о том, повторяется ли сообщение, нужно только судитьmsgId>lastId && !queue.contains(msgId)
Вот и все. Если получено дублирующее сообщение, можно сделать вывод, что подтверждение не было доставлено, и подтверждение отправляется снова.
Полный процесс обработки после получения сообщения получателем выглядит следующим образом:
Псевдокод выглядит следующим образом:
class ProcessMsgNode{
/**
* 接收到的消息
*/
private Message message;
/**
* 处理消息的方法
*/
private Consumer<Message> consumer;
}
public CompletableFuture<Void> offer(Long id,Message message,Consumer<Message> consumer) {
if (isRepeat(id)) {
//消息重复
sendAck(id);
return null;
}
if (!isConsist(id)) {
//消息不连续
notConsistMsgMap.put(id, new ProcessMsgNode(message, consumer));
return null;
}
//处理消息
return process(id, message, consumer);
}
private CompletableFuture<Void> process(Long id, Message message, Consumer<Message> consumer) {
return CompletableFuture
.runAsync(() -> consumer.accept(message))
.thenAccept(v -> sendAck(id))
.thenAccept(v -> lastId.set(id))
.thenComposeAsync(v -> {
Long nextId = nextId(id);
if (notConsistMsgMap.containsKey(nextId)) {
//队列中有下个消息
ProcessMsgNode node = notConsistMsgMap.get(nextId);
return process(nextId, node.getMessage(), consumer);
} else {
//队列中没有下个消息
CompletableFuture<Void> future = new CompletableFuture<>();
future.complete(null);
return future;
}
})
.exceptionally(e -> {
logger.error("[process received msg] has error", e);
return null;
});
}
безопасность
Будь то записи чата или офлайн-сообщения, резервные копии обязательно будут храниться на стороне сервера, поэтому безопасность сообщений и защита конфиденциальности клиентов также очень важны.
Поэтому все сообщения должны быть зашифрованы.
В модуле хранения есть две основные таблицы для ведения пользовательской информации и цепочек отношений, которыеim_user
пользовательская таблица иim_relation
Связанный список отношений.
-
im_user
Таблица используется для хранения общей информации о пользователе, такой как имя пользователя и пароль, и ее структура относительно проста. -
im_relation
Таблица используется для записи дружеских отношений и имеет следующую структуру:
CREATE TABLE `im_relation` (
`id` bigint(20) COMMENT '关系id',
`user_id1` varchar(100) COMMENT '用户1id',
`user_id2` varchar(100) COMMENT '用户2id',
`encrypt_key` char(33) COMMENT 'aes密钥',
`gmt_create` timestamp DEFAULT CURRENT_TIMESTAMP,
`gmt_update` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `USERID1_USERID2` (`user_id1`,`user_id2`)
);
-
user_id1
иuser_id2
Это идентификатор пользователя, который дружит друг с другом, во избежание повторения он хранится в соответствии сuser_id1
<user_id2
сохраняются последовательно, и добавляется совместный индекс. -
encrypt_key
является случайно сгенерированным ключом. Когда клиент входит в систему, он получает все данные пользователя.relation
, хранящиеся в памяти для последующего шифрования и дешифрования. - Когда клиент отправляет сообщение другу, он достает в памяти ключ связи, шифрует его и отправляет. Аналогичным образом, когда сообщение получено, выньте соответствующий ключ для расшифровки.
Полный процесс входа в клиент выглядит следующим образом:
- Клиент вызывает остальной интерфейс для входа в систему.
- Клиент вызывает оставшийся интерфейс, чтобы получить все данные пользователя.
relation
. - Клиент отправляет приветственное сообщение соединителю, чтобы уведомить о том, что он находится в сети.
- Соединитель извлекает автономные сообщения и отправляет их клиенту.
- Соединитель обновляет сеанс пользователя.
Так почему же коннектор сначала отправляет автономные сообщения, а затем обновляет сеанс? Давайте подумаем, что произошло бы, если бы порядок был обратным:
- Пользователь
Alice
сервер входа -
connector
сеанс обновления - Push-сообщения в автономном режиме
- В этот момент Боб отправляет сообщение Алисе
Если автономное сообщение все еще отправляется, Боб отправляет Алисе новое сообщение, а сервер получает сеанс Алисы и немедленно отправляет его. В это время новое сообщение может быть помещено в стопку офлайн-сообщений, и тогда сообщения, полученные Алисой, будут не по порядку.
и мыПорядок автономных сообщений должен быть гарантирован перед новыми сообщениями.
Затем, если вы сначала отправляете автономные сообщения, а затем обновляете сеанс. В процессе офлайн-пуша сообщения статус Алисы «не в сети», тогда новое сообщение Боба будет храниться только в базе данныхim_offline
,im_offline
После того, как данные в таблице будут прочитаны, она «выйдет в сеть» и начнет принимать новые сообщения. Это также позволяет избежать нарушения порядка.
дизайн хранения
Хранить офлайн-сообщения
Когда пользователь не в сети, автономные сообщения должны храниться на сервере, ожидая, пока пользователь подключится к сети, прежде чем отправлять их. После понимания предыдущего раздела хранение сообщений в автономном режиме очень просто. Добавить автономную таблицу сообщенийim_offline
, структура таблицы следующая:
CREATE TABLE `im_offline` (
`id` int(11) COMMENT '主键',
`msg_id` bigint(20) COMMENT '消息id',
`msg_type` int(2) COMMENT '消息类型',
`content` varbinary(5000) COMMENT '消息内容',
`to_user_id` varchar(100) COMMENT '收件人id',
`has_read` tinyint(1) COMMENT '是否阅读',
`gmt_create` timestamp COMMENT '创建时间',
PRIMARY KEY (`id`)
);
msg_type
Используется для различения типов сообщений (chat
,ack
),content
Зашифрованное содержимое сообщения хранится в виде массива байтов.
Когда пользователь выходит в сеть, согласно условиямto_user_id=用户id
Просто потяните запись.
Предотвратить повторную отправку автономных сообщений
Давайте подумаем о ситуации входа в систему с нескольких терминалов. Алиса имеет два устройства, которые одновременно вошли в систему. В этой параллельной ситуации нам нужен какой-то механизм, гарантирующий, что офлайн-сообщения читаются только один раз.
Используйте здесьCAS-механизмреализовать:
- Сначала удалите все
has_read=false
поле. - Проверять каждое сообщение
has_read
Является ли значение ложным, если да, измените его на истинное. Это атомарная операция.
update im_offline set has_read = true where id = ${msg_id} and has_read = false
- Если модификация прошла успешно, она будет отправлена, а если нет, то не будет отправлена.
Я считаю, что к этому моменту студенты уже могут самостоятельно построить полноценный и пригодный для использования сервер обмена мгновенными сообщениями. Для получения дополнительных вопросов, пожалуйста, оставьте сообщение в области комментариев~~
ссылка на гитхаб:
github.com/yuanrw/IM
Если вы считаете, что это полезно для вас, пожалуйста, нажмите звездочку~!