Протокол сплетен кластера Redis
Всем привет, это Ли Сяобин. Сегодня я расскажу о протоколе Gossip и кластерной работе Reids Cluster. Карта разума статьи показана ниже.
Введение в кластерный режим и сплетни
В области хранения данных, когда объем данных или трафика запросов достигает определенного уровня, неизбежно внедрение распределенных. Например, Redis, хотя его автономная производительность очень хороша, должен ввести кластер по следующим причинам.
- Одна машина не может гарантировать высокую доступность, и для обеспечения высокой доступности необходимо ввести несколько экземпляров.
- Одна машина может обеспечить QPS примерно до 8 Вт, а для еще более высокого QPS требуется введение нескольких экземпляров.
- Объем данных, которые может поддерживать одна машина, ограничен, и для обработки большего количества данных необходимо ввести несколько экземпляров;
- Сетевой трафик, обрабатываемый одной машиной, превысил верхний предел сетевой карты сервера, и для распределения трафика необходимо ввести несколько экземпляров.
При наличии кластера ему часто необходимо поддерживать определенные метаданные, такие как IP-адрес экземпляра, информацию о слоте кэшированного сегмента и т. д., поэтому для поддержания согласованности метаданных необходим распределенный механизм. Такие механизмы обычно имеют два режима: децентрализованный и централизованный.
Децентрализованный механизм хранит метаданные на некоторых или всех узлах и поддерживает изменения и согласованность метаданных посредством постоянной связи между различными узлами. Redis Cluster, Consul и т. д. относятся к этому режиму.
Централизованный тип предназначен для централизованного хранения метаданных кластера на внешних узлах или промежуточном программном обеспечении, таком как zookeeper. Этот режим используется в более старых версиях kafka и storm.
Оба режима имеют свои преимущества и недостатки, как показано в следующей таблице:
модель | преимущество | недостаток |
---|---|---|
централизованный | Обновление данных своевременное и своевременность хорошая.Обновление и чтение метаданных очень своевременные.Как только метаданные изменены, они немедленно обновляются на централизованном внешнем узле, и другие узлы могут сразу же воспринимать их, когда они читаются; | Большое давление обновления данных, давление обновления сосредоточено на внешних узлах, воздействуя на всю систему как на единую точку. |
Децентрализованный | Давление обновления данных разбросано, и обновление метаданных относительно разбросано.Вместо того, чтобы сосредоточиться на определенном узле, запросы на обновление относительно разбросаны, и есть разные узлы для обработки с определенной задержкой, что снижает давление параллелизма. | Задержка обновления данных может вызвать некоторое отставание в восприятии кластера. |
Схема децентрализованных метаданных имеет несколько альтернативных алгоритмов синхронизации метаданных, таких как Paxos, Raft и Gossip. Paxos и Raft требуют, чтобы все узлы или большинство узлов (более половины) работали нормально, чтобы весь кластер работал стабильно, в то время как Gossip не требует для работы более половины узлов.
Протокол Gossip, как следует из названия, подобен сплетням, использующим случайный и заразительный способ распространения информации по сети и обеспечения согласованности всех узлов в системе в течение определенного периода времени. Для вас освоение этого протокола не только даст вам хорошее понимание этого наиболее часто используемого алгоритма для достижения согласованности в конечном счете, но также позволит вам легко добиться согласованности данных в конечном счете в последующей работе.
Протокол сплетен, также известный как эпидемический протокол (эпидемический протокол), представляет собой протокол обмена информацией между узлами или процессами на основе режима передачи эпидемий.Он широко используется в P2P-сетях и распределенных системах.Его методология также очень проста:
В кластере в ограниченной сети, если каждый узел случайным образом обменивается определенной информацией с другими узлами, через достаточно долгое время познание каждого узла в кластере в конечном итоге сведется к одной и той же информации.
«Конкретная информация» здесь обычно относится к статусу кластера, статусу каждого узла и другим метаданным. Протокол Gossip полностью совместим с принципом BASE и может использоваться в любой области, требующей окончательной согласованности, например, в распределенных хранилищах и реестрах. Кроме того, он позволяет легко реализовать эластичные кластеры, позволяет узлам подключаться к сети и отключаться в любое время, а также обеспечивает быстрое обнаружение сбоев и динамическую балансировку нагрузки.
Кроме того, самым большим преимуществом протокола Gossip является то, что даже при увеличении количества узлов кластера нагрузка на каждый узел увеличивается ненамного и практически постоянна. Это позволяет масштабировать до тысяч узлов, управляемых кластером Redis Cluster или Consul.
Механизм обмена сплетнями Redis Cluster
Redis Cluster представил функциональность кластеризации в версии 3.0. Чтобы каждый экземпляр в кластере знал информацию о состоянии всех других экземпляров, кластер Redis предусматривает, что каждый экземпляр взаимодействует и передает информацию в соответствии с протоколом Gossip.
На приведенном выше рисунке показана схематическая диаграмма кластера Redis с архитектурой «главный-подчиненный», в которой сплошная линия представляет отношения репликации «главный-подчиненный» между узлами, а пунктирная линия представляет собой обмен сообщениями между каждым узлом.
Каждый узел в кластере Redis имеетПоддерживайте текущее состояние всего кластера с вашей точки зрения, в основном включает:
- Текущее состояние кластера
- Информация о слотах, за которые отвечает каждый узел в кластере, и его статус миграции.
- Статус master-slave каждого узла в кластере
- Статус выживания и предполагаемый статус сбоя каждого узла в кластере
То есть вышеуказанная информация является предметом сплетен и сплетен, распространяемых среди Узлов в кластере, и она является более полной, включая свои собственные и чужие.
Узлы Redis Cluster будут отправлять друг другу различные сообщения, наиболее важными из которых являются следующие:
- ВСТРЕЧА: через команду «кластер встретить ip-порт» узел существующего кластера отправит приглашение новому узлу присоединиться к существующему кластеру, а затем новый узел начнет взаимодействовать с другими узлами;
- PING: Узел отправляет пинг-сообщение другим узлам в кластере с настроенным интервалом времени.Сообщение содержит свое собственное состояние, а также поддерживаемые им метаданные кластера и метаданные некоторых других узлов;
- PONG: узел используется для ответа на сообщения PING и MEET, структура аналогична сообщению PING, а также содержит собственный статус и другую информацию, а также может использоваться для передачи и обновления информации;
- FAIL: после того, как узел PING не сможет достичь узла, он рассылает сообщение о том, что узел зависает, всем узлам в кластере. После того как другие узлы получат сообщение, они будут помечены как отключенные.
Файл cluster.h в исходном коде Redis определяет все типы сообщений, и это код redis 4.0.
// 注意,PING 、 PONG 和 MEET 实际上是同一种消息。
// PONG 是对 PING 的回复,它的实际格式也为 PING 消息,
// 而 MEET 则是一种特殊的 PING 消息,用于强制消息的接收者将消息的发送者添加到集群中(如果节点尚未在节点列表中的话)
#define CLUSTERMSG_TYPE_PING 0 /* Ping 消息 */
#define CLUSTERMSG_TYPE_PONG 1 /* Pong 用于回复Ping */
#define CLUSTERMSG_TYPE_MEET 2 /* Meet 请求将某个节点添加到集群中 */
#define CLUSTERMSG_TYPE_FAIL 3 /* Fail 将某个节点标记为 FAIL */
#define CLUSTERMSG_TYPE_PUBLISH 4 /* 通过发布与订阅功能广播消息 */
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 5 /* 请求进行故障转移操作,要求消息的接收者通过投票来支持消息的发送者 */
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 6 /* 消息的接收者同意向消息的发送者投票 */
#define CLUSTERMSG_TYPE_UPDATE 7 /* slots 已经发生变化,消息发送者要求消息接收者进行相应的更新 */
#define CLUSTERMSG_TYPE_MFSTART 8 /* 为了进行手动故障转移,暂停各个客户端 */
#define CLUSTERMSG_TYPE_COUNT 9 /* 消息总数 */
С помощью этих сообщений каждый экземпляр в кластере может получать информацию о состоянии всех других экземпляров. Таким образом, даже в случае присоединения нового узла, сбоя узла или изменения слота и т. д. экземпляры все равно могут синхронизировать состояние кластера на каждом экземпляре посредством передачи сообщений PING и PONG. Ниже мы по очереди рассмотрим несколько распространенных сценариев.
Сообщения PING/PONG по времени
Узлы в Redis Cluster будут периодически отправлять сообщения PING другим узлам для обмена информацией о состоянии каждого узла и проверки состояния каждого узла, включая онлайн-статус, предполагаемый автономный статус PFAIL и автономный статус FAIL.
Рабочий принцип синхронизации PING/PONG кластера Redis можно свести к двум пунктам:
- Во-первых, каждый экземпляр будет случайным образом выбирать несколько экземпляров из кластера с определенной частотой и отправлять сообщения PING выбранным экземплярам, чтобы определить, находятся ли эти экземпляры в сети, и обмениваться информацией о состоянии друг с другом. Сообщение PING инкапсулирует информацию о состоянии экземпляра, отправляющего сообщение, информацию о состоянии некоторых других экземпляров и таблицу сопоставления слотов.
- Во-вторых, после того, как экземпляр получит сообщение PING, он отправит сообщение PONG экземпляру, отправившему сообщение PING. Сообщение PONG содержит то же содержимое, что и сообщение PING.
На следующем рисунке показана передача сообщений PING и PONG между двумя экземплярами, где экземпляр 1 является передающим узлом, а экземпляр 2 — принимающим узлом.
новый узел онлайн
Когда Redis Cluster присоединяется к новому узлу, клиенту необходимо выполнить команду CLUSTER MEET, как показано на следующем рисунке.
Когда узел 1 выполняет команду CLUSTER MEET, он сначала создаст данные clusterNode для нового узла и добавит их в словарь узлов clusterState, поддерживаемый им самим. Что касается взаимосвязи между clusterState и clusterNode, у нас будет подробная схематическая диаграмма и исходный код для объяснения в последнем разделе.
Затем Node One отправит сообщение MEET новому узлу на основе IP-адреса и номера порта в команде CLUSTER MEET. После того, как новый узел получит сообщение MEET, отправленное первым узлом, новый узел также создаст структуру clusterNode для первого узла и добавит эту структуру в словарь узлов clusterState, поддерживаемый им самим.
Затем новый узел возвращает сообщение PONG первому узлу. После того, как узел 1 получает сообщение PONG, возвращенное узлом B, он узнает, что новый узел успешно получил сообщение MEET, отправленное им самим.
Наконец, первый узел также отправляет сообщение PING новому узлу. После того, как новый узел получит сообщение PING, он может узнать, что узел A успешно получил сообщение PONG, возвращенное им самим, тем самым завершив операцию квитирования доступа к новому узлу.
После успешного завершения операции MEET узел отправит информацию о новом узле другим узлам в кластере с помощью упомянутого ранее механизма синхронизации PING, чтобы другие узлы также обменялись рукопожатием с новым узлом. время новый узел будет известен всем узлам в кластере.
Подозреваемые и реальные автономные узлы
Узлы в кластере Redis будут периодически проверять, был ли узел приемника, который отправил Ping Message, вернул сообщение PONG в течение указанного времени (Cluster-Node-Timeout), если нет, он будет помечен как предполагаемое автономное состояние, что это состояние PFaile, как показано ниже.
Затем узел 1 передаст информацию о том, что узел 2 находится в предположительно автономном состоянии, другим узлам, таким как узел 3, посредством сообщения PING. После того, как узел 3 получит сообщение PING узла 1 и узнает, что узел 2 перешел в состояние PFAIL, он найдет структуру clusterNode, соответствующую узлу 2, в словаре узлов clusterState, поддерживаемом им самим, и добавит автономный отчет главного узла 1. в структуру clusterNode в списке fail_reports.
Со временем, если узел десять (например) также считает, что узел два подозревается в отключенном состоянии из-за тайм-аута PONG, и обнаруживает, что fail_reports кластерного узла второго узла, который он поддерживает, содержится в fail_reportsБолее половины основных узлов не устарели, пометить второй узел как журнал отчета о состоянии PFAIL., то десятый узел пометит второй узел как офлайн-состояние FAIL, а десятый узелнемедленноПередайте сообщение FAIL о том, что главный узел 2 перешел в автономный режим, другим узлам в кластере, и все узлы, получившие сообщение FAIL, немедленно пометят состояние узла 2 как отключенное. Как показано ниже.
Следует отметить, что отчет о подозрительных офлайн-записях чувствителен ко времени, если он превышает время cluster-node-timeout *2, этот отчет будет проигнорирован, а узел 2 вернется в нормальное состояние.
Реализация исходного кода связи Redis Cluster
Таким образом, мы поняли принцип и процесс работы Redis Cluster в отношении времени PING/PONG, нового узла в сети, узла, подозреваемого в автономном режиме, и реального автономного режима и т. д. Давайте реально рассмотрим реализацию исходного кода и конкретные операции Redis в этих ссылки. .
Задействованные структуры данных
Прежде всего, давайте объясним задействованные структуры данных, то есть ClusterNode и другие структуры, упомянутые выше.
Каждый узел поддерживает структуру clusterState, который представляет общее состояние текущего кластера, которое определяется следующим образом.
typedef struct clusterState {
clusterNode *myself; /* 当前节点的clusterNode信息 */
....
dict *nodes; /* name到clusterNode的字典 */
....
clusterNode *slots[CLUSTER_SLOTS]; /* slot 和节点的对应关系*/
....
} clusterState;
В нем есть еще три ключевых поля, конкретная схема выглядит следующим образом:
- Поле себя представляет собой структуру clusterNode, используемую для записи собственного состояния;
- Словарь узлов, который записывает сопоставление имени со структурой clusterNode для записи состояния других узлов;
- Массив слотов, запишите структуру clusterNode узла, соответствующего слоту.
структура узла кластерасохраняет текущее состояние узла,НапримерВремя создания узла, имя узла, текущая эпоха конфигурации узла, IP-адрес и номер порта узла и т. д.. Кроме того, атрибутом link структуры clusterNode является структура clusterLink, которая содержит соответствующую информацию, необходимую для подключения к узлу, такую как дескрипторы сокетов, входные и выходные буферы. В clusterNode также есть список fail_reports для записи подозрительных автономных отчетов. Конкретные определения следующие.
typedef struct clusterNode {
mstime_t ctime; /* 创建节点的时间 */
char name[CLUSTER_NAMELEN]; /* 节点的名字 */
int flags; /* 节点标识,标记节点角色或者状态,比如主节点从节点或者在线和下线 */
uint64_t configEpoch; /* 当前节点已知的集群统一epoch */
unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */
int numslots; /* Number of slots handled by this node */
int numslaves; /* Number of slave nodes, if this is a master */
struct clusterNode **slaves; /* pointers to slave nodes */
struct clusterNode *slaveof; /* pointer to the master node. Note that it
may be NULL even if the node is a slave
if we don't have the master node in our
tables. */
mstime_t ping_sent; /* 当前节点最后一次向该节点发送 PING 消息的时间 */
mstime_t pong_received; /* 当前节点最后一次收到该节点 PONG 消息的时间 */
mstime_t fail_time; /* FAIL 标志位被设置的时间 */
mstime_t voted_time; /* Last time we voted for a slave of this master */
mstime_t repl_offset_time; /* Unix time we received offset for this node */
mstime_t orphaned_time; /* Starting time of orphaned master condition */
long long repl_offset; /* 当前节点的repl便宜 */
char ip[NET_IP_STR_LEN]; /* 节点的IP 地址 */
int port; /* 端口 */
int cport; /* 通信端口,一般是端口+1000 */
clusterLink *link; /* 和该节点的 tcp 连接 */
list *fail_reports; /* 下线记录列表 */
} clusterNode;
clusterNodeFailReport — это структура, которая записывает автономный отчет узла, узел — это информация об узле, создающем отчет, а время — время создания отчета.
typedef struct clusterNodeFailReport {
struct clusterNode *node; /* 报告当前节点已经下线的节点 */
mstime_t time; /* 报告时间 */
} clusterNodeFailReport;
структура сообщения
После понимания структуры данных, поддерживаемой узлом Reids, давайте посмотрим на структуру сообщений, с которыми взаимодействуют узлы. Самой внешней структурой коммуникационного сообщения является clusterMsg, которая включает в себя большое количество информации о записи сообщения, включая флаг RCmb, общую длину сообщения, версию протокола сообщения и тип сообщения, а также информацию о записи узла. отправка сообщения, такого как имя узла, информация о слоте, за который отвечает узел, IP-адрес и порт узла и т. д.; наконец, он содержит clusterMsgData для передачи определенных типов сообщений.
typedef struct {
char sig[4]; /* 标志位,"RCmb" (Redis Cluster message bus). */
uint32_t totlen; /* 消息总长度 */
uint16_t ver; /* 消息协议版本 */
uint16_t port; /* 端口 */
uint16_t type; /* 消息类型 */
uint16_t count; /* */
uint64_t currentEpoch; /* 表示本节点当前记录的整个集群的统一的epoch,用来决策选举投票等,与configEpoch不同的是:configEpoch表示的是master节点的唯一标志,currentEpoch是集群的唯一标志。 */
uint64_t configEpoch; /* 每个master节点都有一个唯一的configEpoch做标志,如果和其他master节点冲突,会强制自增使本节点在集群中唯一 */
uint64_t offset; /* 主从复制偏移相关信息,主节点和从节点含义不同 */
char sender[CLUSTER_NAMELEN]; /* 发送节点的名称 */
unsigned char myslots[CLUSTER_SLOTS/8]; /* 本节点负责的slots信息,16384/8个char数组,一共为16384bit */
char slaveof[CLUSTER_NAMELEN]; /* master信息,假如本节点是slave节点的话,协议带有master信息 */
char myip[NET_IP_STR_LEN]; /* IP */
char notused1[34]; /* 保留字段 */
uint16_t cport; /* 集群的通信端口 */
uint16_t flags; /* 本节点当前的状态,比如 CLUSTER_NODE_HANDSHAKE、CLUSTER_NODE_MEET */
unsigned char state; /* Cluster state from the POV of the sender */
unsigned char mflags[3]; /* 本条消息的类型,目前只有两类:CLUSTERMSG_FLAG0_PAUSED、CLUSTERMSG_FLAG0_FORCEACK */
union clusterMsgData data;
} clusterMsg;
clusterMsgData — это структура объединения, которая может быть телом сообщения, например PING, MEET, PONG или FAIL. Среди них, когда сообщение имеет тип PING, MEET и PONG, назначается поле ping, но когда сообщение имеет тип FAIL, назначается поле сбоя.
// 注意这是 union 关键字
union clusterMsgData {
/* PING, MEET 或者 PONG 消息时,ping 字段被赋值 */
struct {
/* Array of N clusterMsgDataGossip structures */
clusterMsgDataGossip gossip[1];
} ping;
/* FAIL 消息时,fail 被赋值 */
struct {
clusterMsgDataFail about;
} fail;
// .... 省略 publish 和 update 消息的字段
};
clusterMsgDataGossip — это структура сообщений PING, PONG и MEET. Она будет включать другую информацию об узле, поддерживаемую узлом-отправителем сообщения, то есть информацию, содержащуюся в поле узлов в clusterState выше. Конкретный код выглядит следующим образом, и вы также найти поля двух похожи.
typedef struct {
/* 节点的名字,默认是随机的,MEET消息发送并得到回复后,集群会为该节点设置正式的名称*/
char nodename[CLUSTER_NAMELEN];
uint32_t ping_sent; /* 发送节点最后一次给接收节点发送 PING 消息的时间戳,收到对应 PONG 回复后会被赋值为0 */
uint32_t pong_received; /* 发送节点最后一次收到接收节点发送 PONG 消息的时间戳 */
char ip[NET_IP_STR_LEN]; /* IP address last time it was seen */
uint16_t port; /* IP*/
uint16_t cport; /* 端口*/
uint16_t flags; /* 标识*/
uint32_t notused1; /* 对齐字符*/
} clusterMsgDataGossip;
typedef struct {
char nodename[CLUSTER_NAMELEN]; /* 下线节点的名字 */
} clusterMsgDataFail;
После прочтения структуры данных, поддерживаемой узлом, и структуры отправленного сообщения, давайте взглянем на исходный код конкретного поведения Redis.
Отправляйте сообщения PING случайным образом и периодически
Функция clusterCron Redis будет вызываться регулярно, и каждый раз, когда она выполняется 10 раз, она будет готовиться к отправке сообщения PING на случайный узел.
Сначала он случайным образом выберет 5 узлов, затем выберет узел, который не связывался с ним дольше всего, и вызовет функцию clusterSendPing для отправки сообщения типа CLUSTERMSG_TYPE_PING.
// cluster.c 文件
// clusterCron() 每执行 10 次(至少间隔一秒钟),就向一个随机节点发送 gossip 信息
if (!(iteration % 10)) {
int j;
/* 随机 5 个节点,选出其中一个 */
for (j = 0; j < 5; j++) {
de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);
/* 不要 PING 连接断开的节点,也不要 PING 最近已经 PING 过的节点 */
if (this->link == NULL || this->ping_sent != 0) continue;
if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
continue;
/* 对比 pong_received 字段,选出更长时间未收到其 PONG 消息的节点(表示好久没有接受到该节点的PONG消息了) */
if (min_pong_node == NULL || min_pong > this->pong_received) {
min_pong_node = this;
min_pong = this->pong_received;
}
}
/* 向最久没有收到 PONG 回复的节点发送 PING 命令 */
if (min_pong_node) {
serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);
clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
}
}
О конкретном поведении функции clusterSendPing мы узнаем позже, потому что эта функция часто используется в других ссылках.
Узел присоединяется к кластеру
Когда узел выполняет команду CLUSTER MEET, он будет поддерживать структуру clusterNode для самого нового узла.Ссылка структуры, то есть поле TCP-соединения, имеет значение null, что указывает на то, что новый узел еще не установил соединение.
Функция clusterCron также обработает эти новые узлы, которые не установили соединение, вызовет createClusterLink для создания соединения, а затем вызовет функцию clusterSendPing для отправки сообщения MEET.
/* cluster.c clusterCron 函数部分,为未创建连接的节点创建连接 */
if (node->link == NULL) {
int fd;
mstime_t old_ping_sent;
clusterLink *link;
/* 和该节点建立连接 */
fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
node->cport, NET_FIRST_BIND_ADDR);
/* .... fd 为-1时的异常处理 */
/* 建立 link */
link = createClusterLink(node);
link->fd = fd;
node->link = link;
aeCreateFileEvent(server.el,link->fd,AE_READABLE,
clusterReadHandler,link);
/* 向新连接的节点发送 PING 命令,防止节点被识进入下线 */
/* 如果节点被标记为 MEET ,那么发送 MEET 命令,否则发送 PING 命令 */
old_ping_sent = node->ping_sent;
clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ?
CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
/* .... */
/* 如果当前节点(发送者)没能收到 MEET 信息的回复,那么它将不再向目标节点发送命令。*/
/* 如果接收到回复的话,那么节点将不再处于 HANDSHAKE 状态,并继续向目标节点发送普通 PING 命令*/
node->flags &= ~CLUSTER_NODE_MEET;
}
Предотвращение ложного тайм-аута узла и истечения срока действия состояния
Предотвращение ложных тайм-аутов узла и пометка подозрительных меток в автономном режиме также находятся в функции clusterCron, как показано ниже. Он проверит текущий список всех узлов.Если он обнаружит, что время последней связи PONG между узлом и самим собой превышает половину заданного порога, чтобы предотвратить ложный тайм-аут узла, он будет активно освобождать соединение связи с ним, а затем будет активно отправлять ему сообщение PING.
/* cluster.c clusterCron 函数部分,遍历节点来检查 fail 的节点*/
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
now = mstime(); /* Use an updated time at every iteration. */
mstime_t delay;
/* 如果等到 PONG 到达的时间超过了 node timeout 一半的连接 */
/* 因为尽管节点依然正常,但连接可能已经出问题了 */
if (node->link && /* is connected */
now - node->link->ctime >
server.cluster_node_timeout && /* 还未重连 */
node->ping_sent && /* 已经发过ping消息 */
node->pong_received < node->ping_sent && /* 还在等待pong消息 */
/* 等待pong消息超过了 timeout/2 */
now - node->ping_sent > server.cluster_node_timeout/2)
{
/* 释放连接,下次 clusterCron() 会自动重连 */
freeClusterLink(node->link);
}
/* 如果目前没有在 PING 节点*/
/* 并且已经有 node timeout 一半的时间没有从节点那里收到 PONG 回复 */
/* 那么向节点发送一个 PING ,确保节点的信息不会太旧,有可能一直没有随机中 */
if (node->link &&
node->ping_sent == 0 &&
(now - node->pong_received) > server.cluster_node_timeout/2)
{
clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
continue;
}
/* .... 处理failover和标记遗失下线 */
}
Обработка аварийного переключения и пометка подозреваемых в автономном режиме
Если узел по-прежнему не получает сообщение PONG от целевого узла после того, как обработка ложного тайм-аута узла предотвращена, и время превышает время ожидания cluster_node_timeout, то узел помечается как подозрительное автономное состояние.
/* 如果这是一个主节点,并且有一个从服务器请求进行手动故障转移,那么向从服务器发送 PING*/
if (server.cluster->mf_end &&
nodeIsMaster(myself) &&
server.cluster->mf_slave == node &&
node->link)
{
clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
continue;
}
/* 后续代码只在节点发送了 PING 命令的情况下执行*/
if (node->ping_sent == 0) continue;
/* 计算等待 PONG 回复的时长 */
delay = now - node->ping_sent;
/* 等待 PONG 回复的时长超过了限制值,将目标节点标记为 PFAIL (疑似下线)*/
if (delay > server.cluster_node_timeout) {
/* 超时了,标记为疑似下线 */
if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) {
redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing",
node->name);
// 打开疑似下线标记
node->flags |= REDIS_NODE_PFAIL;
update_state = 1;
}
}
на самом деле отправить сообщение сплетни
Ниже приведен исходный код метода clusterSendPing(), который многократно вызывался во фронте, в коде есть подробные комментарии, можете сами прочитать. Основная операция заключается в преобразовании состояния clusterState, поддерживаемого самим узлом, в соответствующую структуру сообщения.
/* 向指定节点发送一条 MEET 、 PING 或者 PONG 消息 */
void clusterSendPing(clusterLink *link, int type) {
unsigned char *buf;
clusterMsg *hdr;
int gossipcount = 0; /* Number of gossip sections added so far. */
int wanted; /* Number of gossip sections we want to append if possible. */
int totlen; /* Total packet length. */
// freshnodes 是用于发送 gossip 信息的计数器
// 每次发送一条信息时,程序将 freshnodes 的值减一
// 当 freshnodes 的数值小于等于 0 时,程序停止发送 gossip 信息
// freshnodes 的数量是节点目前的 nodes 表中的节点数量减去 2
// 这里的 2 指两个节点,一个是 myself 节点(也即是发送信息的这个节点)
// 另一个是接受 gossip 信息的节点
int freshnodes = dictSize(server.cluster->nodes)-2;
/* 计算要携带多少节点的信息,最少3个,最多 1/10 集群总节点数量*/
wanted = floor(dictSize(server.cluster->nodes)/10);
if (wanted < 3) wanted = 3;
if (wanted > freshnodes) wanted = freshnodes;
/* .... 省略 totlen 的计算等*/
/* 如果发送的信息是 PING ,那么更新最后一次发送 PING 命令的时间戳 */
if (link->node && type == CLUSTERMSG_TYPE_PING)
link->node->ping_sent = mstime();
/* 将当前节点的信息(比如名字、地址、端口号、负责处理的槽)记录到消息里面 */
clusterBuildMessageHdr(hdr,type);
/* Populate the gossip fields */
int maxiterations = wanted*3;
/* 每个节点有 freshnodes 次发送 gossip 信息的机会
每次向目标节点发送 2 个被选中节点的 gossip 信息(gossipcount 计数) */
while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
/* 从 nodes 字典中随机选出一个节点(被选中节点) */
dictEntry *de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);
/* 以下节点不能作为被选中节点:
* Myself:节点本身。
* PFAIL状态的节点
* 处于 HANDSHAKE 状态的节点。
* 带有 NOADDR 标识的节点
* 因为不处理任何 Slot 而被断开连接的节点
*/
if (this == myself) continue;
if (this->flags & CLUSTER_NODE_PFAIL) continue;
if (this->flags & (CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_NOADDR) ||
(this->link == NULL && this->numslots == 0))
{
freshnodes--; /* Tecnically not correct, but saves CPU. */
continue;
}
// 检查被选中节点是否已经在 hdr->data.ping.gossip 数组里面
// 如果是的话说明这个节点之前已经被选中了
// 不要再选中它(否则就会出现重复)
if (clusterNodeIsInGossipSection(hdr,gossipcount,this)) continue;
/* 这个被选中节点有效,计数器减一 */
clusterSetGossipEntry(hdr,gossipcount,this);
freshnodes--;
gossipcount++;
}
/* .... 如果有 PFAIL 节点,最后添加 */
/* 计算信息长度 */
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
/* 将被选中节点的数量(gossip 信息中包含了多少个节点的信息)记录在 count 属性里面*/
hdr->count = htons(gossipcount);
/* 将信息的长度记录到信息里面 */
hdr->totlen = htonl(totlen);
/* 发送网络请求 */
clusterSendMessage(link,buf,totlen);
zfree(buf);
}
void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) {
clusterMsgDataGossip *gossip;
/* 指向 gossip 信息结构 */
gossip = &(hdr->data.ping.gossip[i]);
/* 将被选中节点的名字记录到 gossip 信息 */
memcpy(gossip->nodename,n->name,CLUSTER_NAMELEN);
/* 将被选中节点的 PING 命令发送时间戳记录到 gossip 信息 */
gossip->ping_sent = htonl(n->ping_sent/1000);
/* 将被选中节点的 PONG 命令回复的时间戳记录到 gossip 信息 */
gossip->pong_received = htonl(n->pong_received/1000);
/* 将被选中节点的 IP 记录到 gossip 信息 */
memcpy(gossip->ip,n->ip,sizeof(n->ip));
/* 将被选中节点的端口号记录到 gossip 信息 */
gossip->port = htons(n->port);
gossip->cport = htons(n->cport);
/* 将被选中节点的标识值记录到 gossip 信息 */
gossip->flags = htons(n->flags);
gossip->notused1 = 0;
}
Далее следует функция clusterBuildMessageHdr, которая в основном отвечает за заполнение базовой информации в структуре сообщения и информации о состоянии текущего узла.
/* 构建消息的 header */
void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
int totlen = 0;
uint64_t offset;
clusterNode *master;
/* 如果当前节点是salve,则master为其主节点,如果当前节点是master节点,则master就是当前节点 */
master = (nodeIsSlave(myself) && myself->slaveof) ?
myself->slaveof : myself;
memset(hdr,0,sizeof(*hdr));
/* 初始化协议版本、标识、及类型, */
hdr->ver = htons(CLUSTER_PROTO_VER);
hdr->sig[0] = 'R';
hdr->sig[1] = 'C';
hdr->sig[2] = 'm';
hdr->sig[3] = 'b';
hdr->type = htons(type);
/* 消息头设置当前节点id */
memcpy(hdr->sender,myself->name,CLUSTER_NAMELEN);
/* 消息头设置当前节点ip */
memset(hdr->myip,0,NET_IP_STR_LEN);
if (server.cluster_announce_ip) {
strncpy(hdr->myip,server.cluster_announce_ip,NET_IP_STR_LEN);
hdr->myip[NET_IP_STR_LEN-1] = '\0';
}
/* 基础端口及集群内节点通信端口 */
int announced_port = server.cluster_announce_port ?
server.cluster_announce_port : server.port;
int announced_cport = server.cluster_announce_bus_port ?
server.cluster_announce_bus_port :
(server.port + CLUSTER_PORT_INCR);
/* 设置当前节点的槽信息 */
memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots));
memset(hdr->slaveof,0,CLUSTER_NAMELEN);
if (myself->slaveof != NULL)
memcpy(hdr->slaveof,myself->slaveof->name, CLUSTER_NAMELEN);
hdr->port = htons(announced_port);
hdr->cport = htons(announced_cport);
hdr->flags = htons(myself->flags);
hdr->state = server.cluster->state;
/* 设置 currentEpoch and configEpochs. */
hdr->currentEpoch = htonu64(server.cluster->currentEpoch);
hdr->configEpoch = htonu64(master->configEpoch);
/* 设置复制偏移量 */
if (nodeIsSlave(myself))
offset = replicationGetSlaveOffset();
else
offset = server.master_repl_offset;
hdr->offset = htonu64(offset);
/* Set the message flags. */
if (nodeIsMaster(myself) && server.cluster->mf_end)
hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED;
/* 计算并设置消息的总长度 */
if (type == CLUSTERMSG_TYPE_FAIL) {
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgDataFail);
} else if (type == CLUSTERMSG_TYPE_UPDATE) {
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgDataUpdate);
}
hdr->totlen = htonl(totlen);
}
постскриптум
Изначально я просто хотел написать о протоколе Gossip Redis Cluster, но я не ожидал, что чем больше будет написана статья, тем больше будет контента, а окончательный анализ исходного кода будет немного разочаровывающим, так что можете глянуть на нем, и я надеюсь, что вы продолжите обращать внимание на мои последующие вопросы.