聊聊 Redis 集群數據遷移
詳解redis cluster數據遷移過程
節點基本結構定義
redis集群提供16384個slot,我們可以按需分配給節點上,后續進行鍵值對存儲時,我們就可以按照算法將鍵值對存到對應slot上的redis服務器上:
集群節點本質就是通過slots這個數組記錄當前節點的所管理的情況,這里我們可以看到slots是一個char 數組,長度為REDIS_CLUSTER_SLOTS(16384)除8,這樣做的原因是因為:
- char占1個字節,每個字節8位。
- 每個char可以記錄8個slot的情況,如果是自己的slot則對應char的某一個位置記錄為1:
我們以node-1為例,因為它負責0-5460的節點,所以它的slots0-5460都為1,對應的圖解如下所示,可以看到筆者這里省略了后半部分,僅僅表示了0-15位置為1:
對此我們也給出這段redis中節點的定義,即位于cluster.h中的clusterNode這個結構體中,可以看slots這段定義:
typedef struct clusterNode {
//......
//記錄集群負責的槽,總的為16384
unsigned char slots[REDIS_CLUSTER_SLOTS/8];
//......
}
設置slot后續節點走向
以本文示例為例,我們希望后續節點2的數據全部存到節點1中,那么我們首先需要鍵入如下兩條配置:
# 在節點1上執行,將節點2數據導入到節點1上
CLUSTER SETSLOT 3 IMPORTING node2
# 在節點2上執行,將自己的數據遷移到節點1
CLUSTER SETSLOT 3 MIGRATING node1
這兩條指最終都會被各自的服務端解析,并調用clusterCommand執行,我們以節點1導入為例,假設我們執行clusterCommand解析到setslot 關鍵字和importing關鍵字,即知曉要導入其他節點的數據。對應的節點1就會通過importing_slots_from數組標記自己將導入這個slot的數據,而節點2也會通過migrating_slots_to數組標記自己要將數據導出給其他節點的slot:
對此我們給出clusterCommand的執行流程,可以看到該函數解析出migrating或者importing關鍵字時就會將對的migrating_slots_to或者importing_slots_from數組對應slot位置的索引位置設置為當前上述命令傳入的node id:
void clusterCommand(redisClient *c) {
//......
if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {//處理遷出的邏輯
//看看自己是否有遷出的slot,沒有則報錯
if (server.cluster->slots[slot] != myself) {
addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
return;
}
//查看自己是否知曉這個node id,如果沒有則報錯
if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
addReplyErrorFormat(c,"I don't know about node %s",
(char*)c->argv[4]->ptr);
return;
}
//標記遷出到slot為傳入的node
server.cluster->migrating_slots_to[slot] = n;
} else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {//處理遷入的邏輯
//查看遷入的slot是否已經配置,如果有則報錯
if (server.cluster->slots[slot] == myself) {
addReplyErrorFormat(c,
"I'm already the owner of hash slot %u",slot);
return;
}
//查看自己是否知曉要遷入數據的node的信息,如果不知道則報錯
if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
addReplyErrorFormat(c,"I don't know about node %s",
(char*)c->argv[3]->ptr);
return;
}
//標記遷入slot位置為傳入的nodeid
server.cluster->importing_slots_from[slot] = n;
} //......
}
后續的我們假設還是將set key value請求發送到節點2,因為上述命令的原因,節點會返回move/ask告知客戶端這個鍵值對現在要存到節點1上。對應節點1收到這個key請求時,通過key計算得slot正是自己,它就會將這個鍵值對存儲到自己的數據庫中:
這里我們以節點1的角度查看這個問題,當客戶端收到move指令后,繼續向節點1發送指令,節點1通過收到指令調用processCommand,其內部調用getNodeByQuery獲取當前key對應的slot,發現是自己則直接存儲數據到當前節點的內存數據庫中:
int processCommand(redisClient *c) {
//......
//如果開啟了集群模式,且發送者不是master且參數帶key則進入邏輯
if (server.cluster_enabled &&
!(c->flags & REDIS_MASTER) &&
!(c->flags & REDIS_LUA_CLIENT &&
server.lua_caller->flags & REDIS_MASTER) &&
!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
{
int hashslot;
if (server.cluster->state != REDIS_CLUSTER_OK) {
//......
} else {
int error_code;
//查找鍵值對對應的slot和這個slot負責的節點
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
//如果為空且或者非自己,則轉交出去給別人處理
if (n == NULL || n != server.cluster->myself) {
flagTransaction(c);
clusterRedirectClient(c,n,hashslot,error_code);
return REDIS_OK;
}
}
}
//......
//將鍵值對存儲到當前數據庫中
}
我們以節點的視角再次直接步入getNodeByQuery查看這段邏輯,可以看到其內部會基于key計算slot然后將得到對應的node,如果發現這個node是自己且屬于importing_slots_from,即說明是客戶端通過move或者ask請求找到自己的,則進行進一步是否是多條指令執行且存在key找不到存儲位置的情況,若存在則返回空,反之都是直接返回當前節點信息,即node2的新數據直接遷移過來:
clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
//......
//遍歷命令
for (i = 0; i < ms->count; i++) {
//.....
//獲取指令、參數個數、參數
mcmd = ms->commands[i].cmd;
margc = ms->commands[i].argc;
margv = ms->commands[i].argv;
//解析出key以及個數
keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);
for (j = 0; j < numkeys; j++) {
//拿到key
robj *thiskey = margv[keyindex[j]];
//計算slot
int thisslot = keyHashSlot((char*)thiskey->ptr,
sdslen(thiskey->ptr));
//.....
//如果就是當前節點正在做遷出或者遷入,則migrating_slot/importing_slot設置為1
if (n == myself &&
server.cluster->migrating_slots_to[slot] != NULL)
{
migrating_slot = 1;
} else if (server.cluster->importing_slots_from[slot] != NULL) {
importing_slot = 1;
}
} else {
//.....
//.....
}
//.....
}
//如果設置了導入標識為1且標識為asking則步入這段邏輯,
if (importing_slot &&
(c->flags & REDIS_ASKING || cmd->flags & REDIS_CMD_ASKING))
{ //當前指令有多個key且存在未命中的則返回空,反之返回自己
if (multiple_keys && missing_keys) {
if (error_code) *error_code = REDIS_CLUSTER_REDIR_UNSTABLE;
return NULL;
} else {
return myself;
}
}
//.....
//返回節點信息以本示例來說就是返回當前節點信息
return n;
}
完成節點遷移
上述操作僅僅針對新節點的遷移,對于舊的節點我們就需要通過節點2鍵入CLUSTER GETKEYSINSLOT slot count要遷移的舊的key的slot,然后通過MIGRATE host port key dbid timeout [COPY | REPLACE]將數據遷移到節點1上。 這里我們補充一下MIGRATE 中copy和replace的區別,前者是遇到重復直接報錯,后者是遷移時直接覆蓋。 最終這條指令回基于要遷移的key而生成一條RESTORE-ASKING key ttl serialized-value [REPLACE] [ABSTTL] [IDLETIME seconds] [FREQ frequency]指令發送給導入的節點,以本文例子來說就是節點1:
這里我們給出MIGRATE 指令對應的處理函數migrateCommand,邏輯和我上文說的差不多,基于指令解析出replace或者copy等信息,然后用argv[3]即我們的key得出這個鍵值對的信息生成RESTORE指令將鍵值對轉存給節點1:
/* 命令 MIGRATE host port key dbid timeout [COPY | REPLACE] */
void migrateCommand(redisClient *c) {
//......
//解析拷貝和替代選項,前者重復會報錯
for (j = 6; j < c->argc; j++) {
if (!strcasecmp(c->argv[j]->ptr,"copy")) {
copy = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"replace")) {
replace = 1;
} else {
addReply(c,shared.syntaxerr);
return;
}
}
//......
//查看要遷移的key是否存在嗎,如果不存則直接報錯返回
if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) {
addReplySds(c,sdsnew("+NOKEY\r\n"));
return;
}
/* Connect */
//建立socket連接
cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
//......
//cmd初始化一個buf緩沖區
rioInitWithBuffer(&cmd,sdsempty());
/* Send the SELECT command if the current DB is not already selected. */
//如果尚未選擇當前DB,則發送SELECT命令。
int select = cs->last_dbid != dbid; /* Should we emit SELECT? */
if (select) {
redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
}
/* Create RESTORE payload and generate the protocol to call the command. */
//獲取key的過期時效
expireat = getExpire(c->db,c->argv[3]);
if (expireat != -1) {
ttl = expireat-mstime();
if (ttl < 1) ttl = 1;
}
//集群用RESTORE-ASKING發送key給目標
if (server.cluster_enabled)
redisAssertWithInfo(c,NULL,
rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
else
redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
//填充key和value ttl等
redisAssertWithInfo(c,NULL,sdsEncodedObject(c->argv[3]));
redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,
sdslen(c->argv[3]->ptr)));
redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));
//......
//遷移指令字符串寫入緩沖區
redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,
sdslen(payload.io.buffer.ptr)));
//......
//如果是replace發出 REPLACE
if (replace)
redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));
//......
}
最后調整
最后我們只需在節點1和2都執行CLUSTER SETSLOT <SLOT> NODE <NODE ID> 完成slot指派,這指令最終就會走到clusterCommand中,節點1和節點2格子的處理邏輯為:
- 節點2看看遷移的key是否不存則且migrating_slots_to數據不為空,若符合要求說明遷移完成但狀態未修改,直接將migrating_slots_to置空完成指派最后調整。
- 節點1查看節點id是否是自己且importing_slots_from是否有數據,若有則說明節點導入完成,直接將importing_slots_from置空。
void clusterCommand(redisClient *c) {
//......
else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {//處理setslot指令
//......
else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
/* CLUSTER SETSLOT <SLOT> NODE <NODE ID> 標記最終遷移的節點 */
clusterNode *n = clusterLookupNode(c->argv[4]->ptr);
//......
//如果發現對應的key為0,且migrating_slots_to不為空,則說明遷出完成但狀態還未修改,節點2會將migrating_slots_to設置為空
if (countKeysInSlot(slot) == 0 &&
server.cluster->migrating_slots_to[slot])
server.cluster->migrating_slots_to[slot] = NULL;
//如果是節點1則會看指令的nodeid是否是自己且importing_slots_from是否有數據,若有則說明導入成功直接將importing_slots_from設置為空
if (n == myself &&
server.cluster->importing_slots_from[slot])
{
//......
server.cluster->importing_slots_from[slot] = NULL;
}
}
//......
}