聊聊 Redis 哨兵選舉與故障轉移的實現
上一篇文章我們將哨兵主觀下線的核心流程都分析完成,這一篇我們將接著之前的思路,將哨兵獲取客觀下線結果并結合raft協議完成哨兵leader選舉完成故障轉移的流程分析完成,希望對你有幫助。
詳解哨兵選舉與故障轉移流程
1. 獲取客觀下線結果判斷
當前哨兵主觀認定master下線之后,為了明確知曉master節點是否真的下線,哨兵節點還會通過cc即異步命令指針所維護的socket連接發起is-master-down-by-addr的sentinel指令進行詢問,其他哨兵所回復的結果都會通過回調函數sentinelReceiveIsMasterDownReply函數處理。
這段請求最終會被其他哨兵sentinel命令所對應的函數sentinelCommand執行,他們各自會在內部查看自己對于master判斷是否是主觀下線,如果是則返回1。
最后我們的哨兵收到這個結果1,則通過位運算加master節點狀態flags類加上客觀下線的判斷標識64,這里redis為了提升運算效率,采用的二進制|=運算,這一點我們在閱讀大量的redis中源碼都會看到二進制運算這一點優化:
對此我們也給出哨兵處理每一個master實例的函數入口,可以看到在調用sentinelCheckSubjectivelyDown完成主觀下線的檢查之后,又會調用sentinelAskMasterStateToOtherSentinels并傳入SENTINEL_NO_FLAGS即僅僅檢查其他哨兵對于當前master的主觀判斷結果:
//這個入參包含恰哨兵實例和當前主節點的從節點信息
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
//......
//3. 主觀判斷是否下線
sentinelCheckSubjectivelyDown(ri);
//......
/* Only masters */
if (ri->flags & SRI_MASTER) {
//......
//傳入master信息ri以及標識SENTINEL_NO_FLAGS意味僅了解其他哨兵對于master節點狀態的判斷
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
}
}
步入sentinelAskMasterStateToOtherSentinels即可看到哨兵詢問其他哨兵對于master判斷的邏輯,可以看到它遍歷出每一個哨兵實例,通過異步連接cc指針所指向的連接發起SENTINEL is-master-down-by-addr指令獲取其他哨兵節點對于master下線的看法,并注冊sentinelReceiveIsMasterDownReply函數處理返回結果:
#define SENTINEL_ASK_FORCED (1<<0)
void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) {
dictIterator *di;
dictEntry *de;
di = dictGetIterator(master->sentinels);
//遍歷哨兵實例
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
//......
/* Ask */
ll2string(port,sizeof(port),master->addr->port);
//發送is-master-down-by-addr命令獲取其他哨兵客觀下線的結果,并通過sentinelReceiveIsMasterDownReply作為回調處理接收結果
retval = redisAsyncCommand(ri->cc,
sentinelReceiveIsMasterDownReply, NULL,
"SENTINEL is-master-down-by-addr %s %s %llu %s",
master->addr->ip, port,
sentinel.current_epoch,
//若大于SENTINEL_FAILOVER_STATE_NONE則說明執行故障切換,傳入server.runid
(master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ?
server.runid : "*");
if (retval == REDIS_OK) ri->pending_commands++;
}
dictReleaseIterator(di);
}
其他哨兵收到sentinel指令后就會調用sentinelCommand處理這條指令,其內部會判斷自己所維護的master的flags二進制位是否包含SRI_S_DOWN,如果是則說明被請求的哨兵節點同樣認為master已下線,則直接回復master的leaderid以及shared.cone即1(代表確認當前master確實下線):
void sentinelCommand(redisClient *c) {
//......
else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {//處理客觀下線請求
//......
//如果master主觀判定下線即flags包含SRI_S_DOWN這個主觀下線標識,則isdown設置為1
if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
(ri->flags & SRI_MASTER))
isdown = 1;
//上文isdown 設置為1,返回 shared.cone告知對應leaderid的master被我方認定為下線
//響應3部分內容,下線狀態、leader id以及當前leader的紀元
addReplyMultiBulkLen(c,3);
addReply(c, isdown ? shared.cone : shared.czero);
addReplyBulkCString(c, leader ? leader : "*");
addReplyLongLong(c, (long long)leader_epoch);
if (leader) sdsfree(leader);
} //......
return;
//......
}
最終我們的sentinel的回調函數sentinelReceiveIsMasterDownReply處理對端的結果,發現返回值為1,說明該節點對于我們的來說客觀認為master下線了。
所以我們的哨兵就需要記錄這個消息,因為我們維護master->sentinels的字典記錄其他哨兵信息,所以定位到其他哨兵客觀下線的回復后,我們就會從這個字典中找到這個哨兵的結構體將其flags累加一個SRI_MASTER_DOWN的常數值64,意味這個哨兵客觀認定這個master下線了:
void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
//......
if ( //......)
{
//更新上次響應時間
ri->last_master_down_reply_time = mstime();
if (r->element[0]->integer == 1) {//如果返回(cone默認設置為1)1則說明其他哨兵認為master下線,累加將當前維護的哨兵字段的flags累加SRI_MASTER_DOWN
ri->flags |= SRI_MASTER_DOWN;
} else {
//......
}
//......
}
}
2. 啟動故障轉移
上一步收集其他哨兵的判斷并更新到各自的flags位后,當前哨兵的定時任務再次遍歷master調用sentinelHandleRedisInstance處理當前master,其內部會遍歷當前哨兵維護的哨兵數組獲取這些哨兵對于master下線的看法,如果累加到的哨兵對于下線的看法大于或者等于我們配置quorum之后,則會判定會客觀下線:
我們還是從sentinelHandleRedisInstance方法查看方法入口,可以看到哨兵定時執行該方法時會調用sentinelCheckObjectivelyDown檢查客觀下線狀態:
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
//......
if (ri->flags & SRI_MASTER) {
//......
//檢查其當前是否客觀下線
sentinelCheckObjectivelyDown(ri);
//......
}
}
步入其內部即可看到筆者所說的,遍歷哨兵查看下線結果并更新master下線狀態的邏輯:
void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
//......
//如果是主觀下線,步入該邏輯
if (master->flags & SRI_S_DOWN) {
//自己的票數設置進去,quorum為1
quorum = 1; /* the current sentinel. */
//遍歷其他哨兵,如果為客觀下線則累加quorum
di = dictGetIterator(master->sentinels);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
if (ri->flags & SRI_MASTER_DOWN) quorum++;
}
//如果投票數大于配置的quorum,則odown 為1,即說明客觀認定下線了
if (quorum >= master->quorum) odown = 1;
}
//如果明確客觀下線,則廣播+odown事件
if (odown) {
if ((master->flags & SRI_O_DOWN) == 0) {
sentinelEvent(REDIS_WARNING,"+odown",master,"%@ #quorum %d/%d",
quorum, master->quorum);
//累加標識,并更新master下線時間
master->flags |= SRI_O_DOWN;
master->o_down_since_time = mstime();
}
} else {
//......
}
}
3. 發起新紀元leader選舉
基于上述結果redis會判斷是否發起故障轉移,若需要則通知其他哨兵進行leader選舉,收到通知的哨兵會檢查當前紀元是否小于發起選舉的哨兵紀元,若符合要求且在此期間沒有別的哨兵發起選舉,則向其投票。
后續我們的哨兵收到并收集這些響應之后,更新自己所維護的哨兵數組中的leader_epoch,通過遍歷這個哨兵數組中的leader_epoch是否和自己所生成的leader_epoch一致,如果統計結果超過半數,則說明自己當選leader,由此開始進行故障轉移:
(1) 選舉源碼入口
我們還是以sentinelHandleRedisInstance作為程序入口,可以看到其內部調用sentinelStartFailoverIfNeeded判斷是否需要進行故障轉移,然后調用sentinelAskMasterStateToOtherSentinels并傳入SENTINEL_ASK_FORCED發起leader選舉請求:
//這個入參包含恰哨兵實例和當前主節點的從節點信息
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
//......
if (ri->flags & SRI_MASTER) {
//......
// 判斷是否要進行故障切換,若需要則調用sentinelAskMasterStateToOtherSentinels傳入SENTINEL_ASK_FORCED進行leader選舉
if (sentinelStartFailoverIfNeeded(ri))
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
// 執行故障切換
sentinelFailoverStateMachine(ri);
//......
}
}
(2) 確認故障轉移
我們步入sentinelStartFailoverIfNeeded即可看到其對于是否進行故障轉移的判斷,邏輯比較簡單:
- 明確是否客觀認定下線。
- 明確是否處于故障轉移。
- 近期是否有進行故障轉移。
如果傷處條件都排除則:
- failover_state 即故障轉移狀態設置為等待故障轉移,后續的函數狀態機會根據這個標識進行故障轉移處理。
- flags標識累加處于故障轉移中。
- 更新master紀元為哨兵紀元+1,用于后續哨兵leader選舉后更新紀元使用。
對此我們給出sentinelStartFailoverIfNeeded的判斷,可以看到它會按照上文所說的流程進行判斷,明確排除三種情況后調用sentinelStartFailover設置故障轉移狀態:
int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {
//是否客觀下線,若不是則返回0
if (!(master->flags & SRI_O_DOWN)) return 0;
//是否處于故障轉移中,如果是則直接返回0
if (master->flags & SRI_FAILOVER_IN_PROGRESS) return 0;
//距離上次故障轉移時間是否小于2倍的超時時間,如果是則返回0
if (mstime() - master->failover_start_time <
master->failover_timeout*2)
{
if (master->failover_delay_logged != master->failover_start_time) {
time_t clock = (master->failover_start_time +
master->failover_timeout*2) / 1000;
char ctimebuf[26];
ctime_r(&clock,ctimebuf);
ctimebuf[24] = '\0'; /* Remove newline. */
master->failover_delay_logged = master->failover_start_time;
redisLog(REDIS_WARNING,
"Next failover delay: I will not start a failover before %s",
ctimebuf);
}
return 0;
}
//啟動故障轉移 并返回1
sentinelStartFailover(master);
return 1;
}
步入sentinelStartFailover即可看到我們上文所說故障轉移狀態更新:
void sentinelStartFailover(sentinelRedisInstance *master) {
redisAssert(master->flags & SRI_MASTER);
//故障轉移等待啟動
master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
//設置為正在故障轉移
master->flags |= SRI_FAILOVER_IN_PROGRESS;
//更新紀元
master->failover_epoch = ++sentinel.current_epoch;
//......
}
結果上述步驟明確知曉redis需要進行故障轉移之后,哨兵會再次調用sentinelAskMasterStateToOtherSentinels方法傳入當前哨兵的server.runid向其他哨兵發起投票請求,并通過sentinelReceiveIsMasterDownReply處理響應結果:
void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) {
//遍歷其他哨兵
di = dictGetIterator(master->sentinels);
while((de = dictNext(di)) != NULL) {
//......
//因為failover_state 在上一步已經改為傳入自己的SENTINEL_FAILOVER_STATE_WAIT_START即等待故障轉移,故大于SENTINEL_FAILOVER_STATE_NONE,于是傳入哨兵的server.runid發起投票選舉
retval = redisAsyncCommand(ri->cc,
sentinelReceiveIsMasterDownReply, NULL,
"SENTINEL is-master-down-by-addr %s %s %llu %s",
master->addr->ip, port,
sentinel.current_epoch,
//若大于SENTINEL_FAILOVER_STATE_NONE則說明執行故障切換,傳入server.runid
(master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ?
server.runid : "*");
if (retval == REDIS_OK) ri->pending_commands++;
}
dictReleaseIterator(di);
}
(4) 對端哨兵處理發起選舉的投票結果
上述步驟發起投票的哨兵節點發起投票后,收到投票請求的哨兵實例就會進行如下檢查:
- master紀元小于發起投票請求的哨兵紀元req_epoch。
- 當前哨兵紀元小于req_epoch。
如果符合要求則說明發起投票請求的哨兵可以作為leader,當前實例將leader 設置為該節點,然后回復結果給發送結果的實例:
char *sentinelVoteLeader(sentinelRedisInstance *master, uint64_t req_epoch, char *req_runid, uint64_t *leader_epoch) {
//發起選舉的哨兵紀元大于當前紀元,則修改當前紀元
if (req_epoch > sentinel.current_epoch) {
sentinel.current_epoch = req_epoch;
sentinelFlushConfig();
sentinelEvent(REDIS_WARNING,"+new-epoch",master,"%llu",
(unsigned long long) sentinel.current_epoch);
}
//如果master紀元小于發起選舉的紀元且當前哨兵紀元小于等于發起選舉的紀元
if (master->leader_epoch < req_epoch && sentinel.current_epoch <= req_epoch)
{
sdsfree(master->leader);
//設置當前的master為candidate的runid
master->leader = sdsnew(req_runid);
//更新紀元
master->leader_epoch = sentinel.current_epoch;
sentinelFlushConfig();
//投票給發起選舉的人
sentinelEvent(REDIS_WARNING,"+vote-for-leader",master,"%s %llu",
master->leader, (unsigned long long) master->leader_epoch);
/* If we did not voted for ourselves, set the master failover start
* time to now, in order to force a delay before we can start a
* failover for the same master. */
if (strcasecmp(master->leader,server.runid))
master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
}
*leader_epoch = master->leader_epoch;
return master->leader ? sdsnew(master->leader) : NULL;
}
(5) 處理投票結果
收到響應后sentinelReceiveIsMasterDownReply回調函數就會解析出其他哨兵的leader_epoch 信息,作為后續選舉leader的依據,如果半數以上的leader_epoch 為當前哨兵所設置的run_id,則說明當前哨兵作為leader進行故障轉移:
void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
sentinelRedisInstance *ri = c->data;
redisReply *r;
REDIS_NOTUSED(privdata);
//......
if (strcmp(r->element[1]->str,"*")) {//不為*則采集投票結果
//......
//基于返回結果更新當前哨兵維護的哨兵數組中leader的leader_epoch 信息(記錄的是作為leader的哨兵的run_id),作為后續選舉leader使用
ri->leader = sdsnew(r->element[1]->str);
ri->leader_epoch = r->element[2]->integer;
}
}
}
最后基于狀態機模式,根據當前master狀態為SENTINEL_FAILOVER_STATE_WAIT_START于是調用sentinelFailoverWaitStart選舉leader
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
redisAssert(ri->flags & SRI_MASTER);
if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
switch(ri->failover_state) {
//如果狀態為SENTINEL_FAILOVER_STATE_WAIT_START,則調用sentinelFailoverWaitStart選舉出leader
case SENTINEL_FAILOVER_STATE_WAIT_START:
sentinelFailoverWaitStart(ri);
break;
//......
}
}
步入sentinelFailoverWaitStart即可看到該方法調用sentinelGetLeader,如果發現是自己則發送廣播告知自己為leader進行故障轉移:
void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
//......
//獲取leader
leader = sentinelGetLeader(ri, ri->failover_epoch);
isleader = leader && strcasecmp(leader,server.runid) == 0;
sdsfree(leader);
//......
//告知當選的leader是自己
sentinelEvent(REDIS_WARNING,"+elected-leader",ri,"%@");
ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
ri->failover_state_change_time = mstime();
sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
}
對此我們也給出選舉哨兵leader的核心方法sentinelGetLeader,核心步驟為:
- 如果投票結果給出的leader值不為空(這個leader記錄的是其他哨兵投票的實例的run_id)且紀元和當前選舉紀元一致,則給對應的leader票數+1。
- 將這個投票結果存入counter這個字典中。
- 遍歷counter如果這個值大于配置的quorum或哨兵的半數以上,則將其設置為winner,即最后的leader,由此讓這個leader哨兵進行故障轉移:
對應的我們也給出這段代碼的實現:
char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t epoch) {
//......
//設置voters 為哨兵數+1
voters = dictSize(master->sentinels)+1; /* All the other sentinels and me. */
/* Count other sentinels votes */
//根據紀元遍歷其他哨兵的選票結果
di = dictGetIterator(master->sentinels);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
//如果其他哨兵投票的leader值不為空,且紀元和當前投票紀元一致,則給這個leader的對應的run_id對應的投票數做個自增
if (ri->leader != NULL && ri->leader_epoch == sentinel.current_epoch)
sentinelLeaderIncr(counters,ri->leader);
}
//......
//找到得票最多的
di = dictGetIterator(counters);
while((de = dictNext(di)) != NULL) {
uint64_t votes = dictGetUnsignedIntegerVal(de);
if (votes > max_votes) {
max_votes = votes;
winner = dictGetKey(de);
}
}
dictReleaseIterator(di);
//......
//如果票數大于一半+1或大于配置的quorum則設置為leader
voters_quorum = voters/2+1;
if (winner && (max_votes < voters_quorum || max_votes < master->quorum))
winner = NULL;
winner = winner ? sdsnew(winner) : NULL;
//......
return winner;
}
小結
自此我們來小結一下哨兵選舉與故障轉移的大體過程:
- 當前哨兵主觀認定下線之后,通過異步連接詢問其它哨兵是否客觀認定master下線。
- 超過半數的哨兵認為下線則當前哨兵就認為master下線于是開啟發起投票選舉。
- 更新自己的紀元并攜帶runid到其它哨兵節點上拉票。
- 基于回調函數獲取其它哨兵選票結果進行遍歷匯總,用以一個字典以哨兵runid為key,投票值為value進行維護。
- 匯總后通知全局哨兵leader。
- leader進行故障轉移。