Redis 如何高效實現定時任務
Redis通過單線程結合非阻塞事件輪詢機制實現高效的網絡IO和時間事件處理,這篇文章我們將從源碼的角度深入分析一下redis時間事件的設計與實現。
詳解redis中的時間事件
時間事件的定義
時間事件可以是單次到期執行銷毀,也可以是定時任務,對此redis對于時間事件統一封裝為aeTimeEvent對象,通過id來唯一標識一個事件,結合when_sec和when_ms記錄任務到期執行的秒和分,而執行時間事件的函數也是交由timeProc指針所指向的函數執行。 我們以一個redis定時執行的任務為例,如下所示,該結果通過when_sec和when_ms記錄秒之前的時間和毫秒的時間,一旦這個時間到了就會執行timeProc這個函數指針所指向的方法serverCron,該函數會定期執行各種任務,這一點筆者會在后文展開:
對應的我們給出時間事件的代碼描述,即位于ae.h這個頭文件中的aeTimeEvent 結構體,這就是對時間事件的封裝結構體,可以看到它除了筆者上述提到的核心字段以外,還有一個next指針用于連接下一個注冊的時間事件:
//時間事件
typedef struct aeTimeEvent {
//時間事件的id全局遞增
long long id; /* time event identifier. */
long when_sec; /* seconds */
//時間到達的時間
long when_ms; /* milliseconds */
//對應時間時間的處理器
aeTimeProc *timeProc;
//......
//連接下一個時間時間
struct aeTimeEvent *next;
} aeTimeEvent;
上文提到redis的時間事件是以鏈表的形式關聯起來,這里我們也給出時間時間統一管理對象,即時間輪詢器aeEventLoop ,它通過timeEventHead記錄第一個時間時間而后續的時間時間統一用時間時間的next指針進行管理:
對應我們也給出這段時間代碼的定義,即位于ae.h中aeEventLoop 的定義:
typedef struct aeEventLoop {
//......
//管理時間事件的列表
aeTimeEvent *timeEventHead;
//......
} aeEventLoop;
注冊時間事件
redis在服務器初始化階段,會注冊一個定時的時間事件,大約每1毫秒觸發一次,該事件主要做的是:
- 更新redis全局時鐘,該時鐘用于全局變量獲取時間用的。
- 隨機抽取redis內存數據庫中的樣本刪除過期的鍵值對。
- 如果檢查到aof重寫完成,則進行刷盤操作。
- 如果發現當前aof大小過大,則fork子進程進行aof重寫操作。
- ......。
對應我們給出時間事件注冊的源碼段,即redis初始化時調用的方法initServer中的aeCreateTimeEvent,可以看到它將定時任務封裝為時間事件timeEvent,并設置時間間隔為1毫秒一次:
void initServer(void) {
//......
/* Create the serverCron() time event, that's our main way to process
* background operations. */
//創建時間事件注冊到eventLoop->timeEventHead中
if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
redisPanic("Can't create the serverCron time event.");
exit(1);
}
//......
}
輪詢處理時間事件
redis每次處理完所有用戶的請求之后,都會調用一次時間時間處理函數processTimeEvents,輪詢并處理就緒的時間事件,由此保證盡可能準時執行時間事件,如果事件時間非定時任務則執行完成直接刪除,反之設置下一次執行時間。這些步驟全部完成之后,返回本次處理的時間事件數:
我們給出處理時間循環的入口aeMain,可以看到該函數就是redis核心函數所在,它會循環調用aeProcessEvents處理各種事件:
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
//處理各種事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
不如aeProcessEvents可以看到該函數執行完所有用戶請求之后調用processTimeEvents方法獲取并執行就緒的時間事件:
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
//......
//處理就緒的客戶端事件
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
//上述核心網絡IO事件完成后處理時間事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
最后我們就可以看到處理時間事件的核心代碼段,其內部會從timeEventHead開始輪詢就緒的時間事件,比對當前時間是否大于或者等于到期時間,如果是則執行當前時間事件,再判斷這個事件是否是定時事件,如果是則更新下次執行時間,反之刪除,最后累加本次處理的時間時間數:
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
time_t now = time(NULL);
//......
if (now < eventLoop->lastTime) {
//從時間事件頭開始
te = eventLoop->timeEventHead;
while(te) {
te->when_sec = 0;
te = te->next;
}
}
eventLoop->lastTime = now;
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
//循環處理到期的時間事件
while(te) {
long now_sec, now_ms;
long long id;
if (te->id > maxId) {
te = te->next;
continue;
}
aeGetTime(&now_sec, &now_ms);
//如果現在的事件大于到達時間
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;
id = te->id;
//調用時間時間函數處理該事件
retval = te->timeProc(eventLoop, id, te->clientData);
//更新處理數
processed++;
//.....
if (retval != AE_NOMORE) {//如果事件類型不是AE_NOMORE則說明是定時事件更新周期,反之刪除
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
} else {
aeDeleteTimeEvent(eventLoop, id);
}
te = eventLoop->timeEventHead;
} else {
te = te->next;
}
}
return processed;
}
redis對于時間事件實現上的優化
因為時間事件有些要求定期執行,所以redis為了保證時間執行的實時性,做了如下兩個優化:
- 對于比較耗時的時間事件,例如AOF重寫,通過fork子進程異步完成:
- 對于返回給客戶端套接字的內容,如果長度超過預設的值,會主動讓出線程執行權,避免時間時間饑餓。
對應的我們給出第一點時間時間對于aof重寫的核心代碼段,可以看到serverCron內部判斷如果當前沒有rdb和aof子進程,且需要進行aof重寫則調用rewriteAppendOnlyFileBackground函數fork子進程進行aof重寫:
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
//......
/* Start a scheduled AOF rewrite if this was requested by the user while
* a BGSAVE was in progress. */
//aof_rewrite_scheduled設置為1,且沒有其他持久化子進程則進行aof重寫,通過異步避免耗時
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
server.aof_rewrite_scheduled)
{
rewriteAppendOnlyFileBackground();
}
//......
}
//fork子進程進行aof重寫
int rewriteAppendOnlyFileBackground(void) {
//......
if ((childpid = fork()) == 0) {//fork子進程進行aof重寫
char tmpfile[256];
/* Child */
closeListeningSockets(0);
redisSetProcTitle("redis-aof-rewrite");
//生成一個tmp文件
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {//重寫aof
size_t private_dirty = zmalloc_get_private_dirty();
//......
exitFromChild(0);
} else {
exitFromChild(1);
}
} else {
//......
}
return REDIS_OK; /* unreached */
}
而回復給客戶端結果的處理器sendReplyToClient內部也有一段,判斷如果寫入數totwritten 大于REDIS_MAX_WRITE_PER_EVENT (宏定義為64M),則直接中止寫入,break退出等到下一次循環處理,避免因為這個處理導致其他時間事件饑餓而導致事件執行延期:
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
//......
while(c->bufpos > 0 || listLength(c->reply)) {
//......
//對于文件事件數據寫入超長會讓出執行權讓時間事件能夠盡可能的執行
server.stat_net_output_bytes += totwritten;
if (totwritten > REDIS_MAX_WRITE_PER_EVENT &&
(server.maxmemory == 0 ||
zmalloc_used_memory() < server.maxmemory)) break;
}
//......
}