來聊聊去中心化 Redis 集群節點如何完成通信
一、寫在文章開頭
今天我們來聊點有意思的,關于redis中集群間通信的設計與實現,本文將從源碼的角度分析redis集群節點如何利用Gossip協議完成節點間的通信與傳播,希望對你有幫助。
二、詳解Redis集群節點通信的設計與實現
1. 詳解Gossip協議
在此之前我們先簡單介紹一下Gossip協議,該協議是分布式集群的一種通信協議,我們都知道管理集群的方式有中心化和去中心化兩種方式,中心化的方式是通過第一個第三方的管理中心,例如zookeeper等來維護一份集群節點的信息、狀態:
而redis采用的是去中心化的方式實現集群節點通信,即通過Gossip協議進行節點通信,讓各個節點之間兩兩通信,廣播與自己保持交流的節點,由此將節點串聯起來構成一張關系網:
我們以一個簡單的場景為例介紹一下Gossip協議,默認情況下我們的當前有3個節點的集群,各個節點彼此按照通信要求發送自己的信息和與自己保持交流的節點,由此將有限的資源共享出去構成一個集群。
此時,我們需要橫向擴展一個節點4,我們只需配置/redis-cli --cluster add-node 新節點IP:新節點端口 任意存活節點IP:任意存活節點端口,這個存活節點后續和其他節點通信時,就會將當前新添加的節點4發送出去,由此其他節點收到這個消息并存儲下來,經過各個節點的不斷反復通信,這個集群中的各個節點就會擁有集群中所有節點的信息。
2. 集群消息協定
任何通信都是需要按照協議規范進行,redis集群也一樣,為了保證節點間通信的規范,redis要求集群節點通信的消息的類型可以是以下幾種:
- ping消息,用來向其他節點發送節點信息。
- 回復ping的pong消息。
- 如果當前節點中存在新添加的節點,則通過meet格式的消息發送給其他節點。
- 如果節點出現故障,則發送fail消息告知集群其他節點。
對此我們給出消息的宏定義代碼,位于cluster.h中:
//集群中的ping
#define CLUSTERMSG_TYPE_PING 0 /* Ping */
//集群中的pong
#define CLUSTERMSG_TYPE_PONG 1 /* Pong (reply to Ping) */
//想加入集群的節點
#define CLUSTERMSG_TYPE_MEET 2 /* Meet "let's join" message */
//某個節點有故障
#define CLUSTERMSG_TYPE_FAIL 3 /* Mark node xxx as failing */
3. 集群節點消息體
后續集群都會通過clusterMsg來表示一條消息,它記錄消息長度以及發送節點名稱、負責的slots以及節點端口號等信息:
typedef struct {
char sig[4];
//消息總長度
uint32_t totlen;
//......
//消息類型
uint16_t type;
//......
//發送節點的名稱
char sender[REDIS_CLUSTER_NAMELEN];
//發送節點負責的slots
unsigned char myslots[REDIS_CLUSTER_SLOTS/8];
//......
char notused1[32];
//節點端口
uint16_t port;
//......
//記錄消息的消息體
union clusterMsgData data;
} clusterMsg;
這里我們對這個消息體clusterMsgData進行展開說明一下,可以看到他用一段共用體維護各種類型消息的結構,這其中我們只需要了解的是ping消息,從注釋可以看到ping消息這個結構體可以發送ping、meet、pong等類型消息,ping消息類型其內部用clusterMsgDataGossip數組維護,這一點這個消息可以包含多個節點信息存于數組中:
union clusterMsgData {
//可以發送ping meet pong的消息,該結構體內部有clusterMsgDataGossip數組,這意味這個結構體可以存放多個節點的消息
struct {
/* Array of N clusterMsgDataGossip structures */
clusterMsgDataGossip gossip[1];
} ping;
//......
};
步入clusterMsgDataGossip即可看到這個結構體存儲的是需要發送給它人的節點名稱、ping和收到ping的時間以及端口號等信息:
typedef struct {
char nodename[REDIS_CLUSTER_NAMELEN];//節點名稱
uint32_t ping_sent; //發送ping的事件
uint32_t pong_received;//收到pong的事件
char ip[REDIS_IP_STR_LEN]; //廣播的節點ip
uint16_t port; //節點與客戶端進行通信的端口
//......
} clusterMsgDataGossip;
我們來簡單小結一下,假設我們的某個節點向其他節點發送ping消息告知自己維護的節點信息和狀態,那么對應的消息格式大體如下圖所示:
4. 詳解集群節點ping流程
集群節點的指向流程也是交由redis的時間事件serverCron執行,它會每個100ms執行一次集群的定任務clusterCron方法,其內部會檢查這個定時任務是否執行了10次,一旦執行10次(也就是100ms*10即每1秒)后就會隨機從當前節點維護的其他節點信息字典表中抽取5個節點,找到最早回復pong給當前節點發送一條ping消息:
對此我們給出定時執行的serverCron函數,可以看到其內部每100ms執行一次集群定時任務clusterCron:
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
//......
//100ms執行一次集群的函數
run_with_period(100) {
if (server.cluster_enabled) clusterCron();
}
//......
}
我們步入clusterCron即可看到,該定時任務會隨機抽取5個節點然后找到最早給該節點發送pong的節點發送ping消息包:
void clusterCron(void) {
//......
// 每10次即每過去1s執行一次這段邏輯
if (!(iteration % 10)) {
int j;
//隨機選出5個節點
for (j = 0; j < 5; j++) {
de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);
/* Don't ping nodes disconnected or with a ping currently active. */
//斷連、或者自己、或者正在握手的節點不處理
if (this->link == NULL || this->ping_sent != 0) continue;
if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE))
continue;
//選擇最早收到pong的節點
if (min_pong_node == NULL || min_pong > this->pong_received) {
min_pong_node = this;
min_pong = this->pong_received;
}
}
//向最早收到pong的調用clusterSendPing發送消息
if (min_pong_node) {
redisLog(REDIS_DEBUG,"Pinging node %.40s", min_pong_node->name);
clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
}
}
//......
}
步入clusterSendPing即可看到我們所說的核心邏輯,即按照公式計算出要發送給最早回復pong的節點對應節點數,然后封裝成消息發送出去:
void clusterSendPing(clusterLink *link, int type) {
//......
//我們希望添加的最大節點數,集群總是減去自己和正在握手的
int freshnodes = dictSize(server.cluster->nodes)-2;
//......
//計算wanted
wanted = floor(dictSize(server.cluster->nodes)/10);
if (wanted < 3) wanted = 3;
if (wanted > freshnodes) wanted = freshnodes;
//......
/* Populate the header. */
//設置ping消息頭,構建端口號、slot等信息
if (link->node && type == CLUSTERMSG_TYPE_PING)
link->node->ping_sent = mstime();
clusterBuildMessageHdr(hdr,type);
/* Populate the gossip fields */
int maxiterations = wanted*3;
//基于maxiterations進行循環隨機抽取自己維護的節點信息并組裝
while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
dictEntry *de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);
clusterMsgDataGossip *gossip;
int j;
//如果是自己則跳過
if (this == myself) continue;
//故障節點不發送
if (maxiterations > wanted*2 &&
!(this->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL)))
continue;
//....
freshnodes--;
//組裝當前節點的名稱、ip、端口等信息存到hdr所指向的消息結構體
//指向gossip某個索引位置設置名稱、ip、端口等
gossip = &(hdr->data.ping.gossip[gossipcount]);
memcpy(gossip->nodename,this->name,REDIS_CLUSTER_NAMELEN);
gossip->ping_sent = htonl(this->ping_sent);
gossip->pong_received = htonl(this->pong_received);
memcpy(gossip->ip,this->ip,sizeof(this->ip));
gossip->port = htons(this->port);
gossip->flags = htons(this->flags);
gossip->notused1 = 0;
gossip->notused2 = 0;
gossipcount++;
}
//......
//創建一個發送事件提交給redis發送出去
clusterSendMessage(link,buf,totlen);
zfree(buf);
}
5. 等待pong消息回復并解析
每個集群的節點都會定時檢查和對端鏈接的連接是否斷開,如果斷開的嘗試異步非阻塞向其發送建立連接請求,并注冊一個處理器clusterReadHandler處理對端的ping等消息,所以我們上文的ping消息實際上就是通過這個函數進行解析讀取:
對此我們給出這段源碼的入口即可集群的定時任務clusterCron方法,可以看到其內部會便利當前節點通信的節點,查看連接是否為空,若為空則發起連接并注冊clusterReadHandler處理消息:
void clusterCron(void) {
//......
di = dictGetSafeIterator(server.cluster->nodes);
//遍歷與當前節點保持通信的節點
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
//如果連接為空則非阻塞發起連接,然后注冊clusterReadHandler處理對端節點的消息
if (node->link == NULL) {
int fd;
mstime_t old_ping_sent;
clusterLink *link;
fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
node->port+REDIS_CLUSTER_PORT_INCR, REDIS_BIND_ADDR);
//......
//創建鏈接對應存儲數據的空間
link = createClusterLink(node);
link->fd = fd;
node->link = link;
//為這個鏈接注冊clusterReadHandler處理發送的消息
aeCreateFileEvent(server.el,link->fd,AE_READABLE,
clusterReadHandler,link);
//......
}
}
}
步入clusterReadHandler即可看到redis服務端解析消息存儲到buf并通過clusterProcessPacket解析的邏輯:
void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
//......
while(1) { /* Read as long as there is data to read. */
//......
//hdr指向link->rcvbuf
hdr = (clusterMsg*) link->rcvbuf;
//讀取消息到buf即link->rcvbuf中
nread = read(fd,buf,readlen);
//......
if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
//調用clusterProcessPacket解析這個連接的消息,即 link->rcvbuf
if (clusterProcessPacket(link)) {
sdsfree(link->rcvbuf);
link->rcvbuf = sdsempty();
} else {
return; /* Link no longer valid. */
}
}
}
}
而clusterProcessPacket即是該方法的核心所在,它會將對端節點發送的消息進行解析與處理,這里我們就以收到pong消息為例說明一下流程,假設回復pong的是master節點,它會更新收到這條網絡連接pong響應時間,然后解析報文內容,如果發現有個節點不在我們的節點列表中,將其存入node字典表中:
int clusterProcessPacket(clusterLink *link) {
//......
/* Perform sanity checks */
//消息完整性校驗
//......
/* Check if the sender is a known node. */
//檢查發送節點是否是已知節點
sender = clusterLookupNode(hdr->sender);
//......
//......
/* PING, PONG, MEET:消息處理邏輯 */
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
type == CLUSTERMSG_TYPE_MEET)
{
//......
//如果收到pong則更新pong_received為當前時間
if (link->node && type == CLUSTERMSG_TYPE_PONG) {
link->node->pong_received = mstime();
link->node->ping_sent = 0;
//......
}
//......
//如果當前節點是已知節點,則調用clusterProcessGossipSection查看當前pong消息中的內容是否包含未知的、新加入的節點
if (sender) clusterProcessGossipSection(hdr,link);
} else if (type == CLUSTERMSG_TYPE_FAIL) {
//......
}
//......
return 1;
}
步入clusterProcessGossipSection即可看到該函數會遍歷消息中的節點,一旦發現該節點是新添加節點則調用clusterStartHandshake其存入nodes字典表中:
void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
uint16_t count = ntohs(hdr->count);
//解析當前節點gossip消息內容
clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
//遍歷node
while(count--) {
//......
//打印當前節點信息
redisLog(REDIS_DEBUG,"GOSSIP %.40s %s:%d %s",
g->nodename,
g->ip,
ntohs(g->port),
ci);
node = clusterLookupNode(g->nodename);
if (node) {//已知節點處理,如果不可通信才握手重連
//......
} else {//未知節點則發起握手,若握手建立通信成功則將其存入nodes字典中
//......
if (sender &&
!(flags & REDIS_NODE_NOADDR) &&
!clusterBlacklistExists(g->nodename))
{
clusterStartHandshake(g->ip,ntohs(g->port));
}
}
//走到下一個節點
g++;
}
}
我們給出clusterStartHandshake中將其存入server的cluster的nodes字典表的邏輯:
int clusterStartHandshake(char *ip, int port) {
//......
//如果處于握手中,則說明之前已經發現并進行通信了,直接返回
if (clusterHandshakeInProgress(norm_ip,port)) {
errno = EAGAIN;
return 0;
}
//基于消息創建node結構其,并調用clusterAddNode將其存入server.cluster->nodes字典表中
n = createClusterNode(NULL,REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET);
memcpy(n->ip,norm_ip,sizeof(n->ip));
n->port = port;
clusterAddNode(n);
return 1;
}
三、小結
來簡單小結一下Redis集群節點如何通過Gossip協議構建集群網絡的:
- 新節點通過meet和集群中某個節點a建立連接。
- 當前節點執行clusterCron定時任務時,隨機抽取5個節點并找到最早回復pong的實例,假設是節點a,發送ping消息。
- 注冊clusterReadHandler處理器其他節點發送的消息。
- 收到節點a的pong消息回復,判斷查看該節點是否是已知節點,如果是則調用clusterProcessGossipSection解析報文內容,如果存在新節點則進行握手通信,如果連接建立成功則將該節點存入當前實例的nodes節點中。