如何理解 Redis 是單線程的
一、在文章開頭
你剛剛說redis是單線程的,那你能不能告訴我它是如何基于單個線程完成指令處理與客戶端連接接?
基于這個問題,筆者會直接通過3.0.0源碼分析的角度來剖析一下redis單線程的設計與實現。
二、詳解redis的單線程模型
1. 單線程處理核心任務
當我們通過./redis-server啟動redis時,如果我們配置了后臺啟動,那么shell進程線程就會調用系統函數即fork方法創建一個子進程,再通過execve方法將子進程主體替換成redis可執行文件也就是我們的redis-server,而子進程執行時會保持從父進程集成過來的標準輸入和輸出,最后redis就會調用main方法開始執行自己的啟動邏輯了。
到這為止,我們不難看出,在啟動階段redis的啟動并不是多線程的,它會根據我們的配置來決定啟動邏輯,以我們上文所說的后臺啟動,它本質是通過父進程fork的方式完成創建與初始化的,這一點我們也可以直接從redis的main方法印證:
int main(int argc, char **argv) {
//命令參數解析與初始化
//......
//如果配置后臺啟動,則調用daemonize從父進程中fork出來執行
if (server.daemonize) daemonize();
//......
}
我們步入daemonize方法,可以看到其內部如果子進程fork成功,后續的標準輸入、輸出、錯誤都會重定向到/dev/null,此后的各項工作也都是交由我們的redis server的主線程進行負責處理:
void daemonize(void) {
int fd;
//fork返回0說明fork成功,創建新會話,然后父進程exit(0)直接退出
if (fork() != 0) exit(0); /* parent exits */
setsid(); /* create a new session */
//將標準輸入、輸出、錯誤重定向寫到/dev/null中,由此和終端分離
if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
dup2(fd, STDIN_FILENO);
dup2(fd, STDOUT_FILENO);
dup2(fd, STDERR_FILENO);
if (fd > STDERR_FILENO) close(fd);
}
}
此時,主線程的socket就會注冊到epoll中,通過非阻塞調用epoll函數獲取就緒的連接和指令完成與多個客戶端的交互:
而上述所說這種工作模式,也就是我們的aeMain函數,這里筆者也給出的對應的的代碼實現,如下所示,aeMain的本質邏輯就是調用無限循環,在循環中調用aeApiPoll即epoll非阻塞輪詢獲取就緒的事件并交給對應的讀寫事件處理器(rfileProc/wfileProc)進行處理:
//無限循環調用aeProcessEvents處理讀寫事件
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
//輪詢標識沒有停止則無限循環
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
//輪詢并處理事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
//......
//通過epoll完成非阻塞調用
numevents = aeApiPoll(eventLoop, tvp);
//遍歷拿到的事件將其交給讀寫處理器處理
for (j = 0; j < numevents; j++) {
//解析出該文件對應的類型
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
//如果事件fe是讀事件則交給rfileProc
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
//如果事件包含寫標志,則交給wfileProc處理器處理
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
//......
//返回處理事件數
return processed; /* return the number of processed file/time events */
}
2. 多線程執行IO事件
截至到上述的片段,redis大體上我們可以認為是單線程執行,但是在3.0.0之后源碼中,為了避免某些IO任務對主線程的執行效率的影響,redis還是創建了一些異步線程處理這些任務。
如下圖所示,我們以aof為例,redis主線程會通過定時任務的方法serverCron會按照用戶的配置檢查當前是否需要進行aof寫入,如果需要則通過bioCreateBackgroundJob提交一個任務到AOF異步刷盤的任務列表中,此時redis創建的io線程就會無限循環調用bioProcessBackgroundJobs從該列表中取出自己綁定的任務進行異步消費,通過這種簡單的多線程模式,保證了耗時的IO操作不會阻塞主線程:
這里我們先給出對應的事件宏定義,可以看到事件總數為REDIS_BIO_NUM_OPS 即2,然后0是文件關閉事件,1的AOF異步刷盤事件,通過這樣的順序完成了事件的類型碼和總量的定義:
/* Background job opcodes */
#define REDIS_BIO_CLOSE_FILE 0 /* Deferred close(2) syscall. */
#define REDIS_BIO_AOF_FSYNC 1 /* Deferred AOF fsync. */
#define REDIS_BIO_NUM_OPS 2
對應的這些線程的初始化工作我們可以在main方法調用的initServer中可以看到這樣一段調用,其內部的調用bioInit本質就是完成上述IO任務的線程的創建:
void initServer(void) {
int j;
//......
//創建bio任務線程
bioInit();
}
bioInit它會初始化2個線程以及棧大小(最大不會超過4M),為每個線程各自分配一個隊列,分配隊列這一步就會按照循環遍歷得到的值進行分配,遍歷時用REDIS_BIO_NUM_OPS作為范圍控制,遍歷到0的處理文件關閉事件,1則是AOF刷盤事件。 完成事件類型隊列分配之后,redis會為每個線程分配消費任務的方法指針bioProcessBackgroundJobs,后續的線程的任務消費和處理都是調用這個方法執行的:
void bioInit(void) {
pthread_attr_t attr;
pthread_t thread;
size_t stacksize;
int j;
//循環2次,剛剛好對應2個事件即0是文件關閉事件、1是aof刷盤事件
for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
//互斥數組初始化
pthread_mutex_init(&bio_mutex[j],NULL);
//條件數組初始化
pthread_cond_init(&bio_condvar[j],NULL);
//bio任務數組初始化,每個數組元素都是一個任務列表
bio_jobs[j] = listCreate();
//表示每種任務列表待處理的任務數為0
bio_pending[j] = 0;
}
//設置線程最大的棧屬性大小,默認為1,若小于REDIS_THREAD_STACK_SIZE即4M則乘2
pthread_attr_init(&attr);
pthread_attr_getstacksize(&attr,&stacksize);
if (!stacksize) stacksize = 1;
while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
pthread_attr_setstacksize(&attr, stacksize);
//創建線程并,為每一個線程分配一個任務列表
for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
//循環兩次 j為0即代表文件關閉事件、1是aof刷盤事件,這個arg會作為事件類型綁定到線程pthread上
void *arg = (void*)(unsigned long) j;
//調用pthread_create完成線程屬性初始化和事件類型的綁定
if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs.");
exit(1);
}
bio_threads[j] = thread;
}
}
這里我們也給出bioProcessBackgroundJobs邏輯可以看到,每個線程調用該方法時,會在無限循環中根據任務的type按需消費處理:
void *bioProcessBackgroundJobs(void *arg) {
struct bio_job *job;
//每個線程都會根據自己傳入的arg決定任務的type,0為文件關閉事件、1為aof刷盤事件
unsigned long type = (unsigned long) arg;
sigset_t sigset;
//......
//按照類型到bio_jobs取任務執行
while(1) {
listNode *ln;
//......
//取出自己需要處理的類型的隊列任務
ln = listFirst(bio_jobs[type]);
job = ln->value;
//上互斥鎖
pthread_mutex_unlock(&bio_mutex[type]);
//線程按照自己的類型進行消費
if (type == REDIS_BIO_CLOSE_FILE) {
close((long)job->arg1);
} else if (type == REDIS_BIO_AOF_FSYNC) {
aof_fsync((long)job->arg1);
} else {
redisPanic("Wrong job type in bioProcessBackgroundJobs().");
}
//完成后釋放任務對象
zfree(job);
//線程解鎖 任務移除
pthread_mutex_lock(&bio_mutex[type]);
//任務處理完成后的收尾工作
listDelNode(bio_jobs[type],ln);
bio_pending[type]--;
}
}
了解的任務消費的源碼之后,我們再來看看任務的投遞的邏輯,我們以aof文件刷盤的任務為例,從定時任務函數serverCron,其內部會判斷aof_child_pid的pid不為-1,若不為-1說明當前存在aof子進程,對此redis-server就會獲取當前aof子進程的pid,調用backgroundRewriteDoneHandler提交一個aof重寫完成的回調任務,等待aof重寫完成后該任務就會被消費,從而完成aof緩沖區刷盤:
這里我們直接從serverCron為入口查看上述邏輯,可以看到其內部會查看rdb_child_pid 或者aof_child_pid 的值,這兩個變量分別記錄rdb或者aof異步持久化進程的id值,若達到以下兩個條件則說明存在aof重寫任務,需要提交一個aof重寫后的刷盤任務:
- aof_child_pid 不是-1
- wait3獲取到的pid也為aof重寫的子進程id
符合上述條件則調用backgroundRewriteDoneHandler提交一個aof重寫完成后的異步刷盤任務:
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
int j;
REDIS_NOTUSED(eventLoop);
REDIS_NOTUSED(id);
REDIS_NOTUSED(clientData);
//......
//檢查后臺的aof重寫進程是否結束,若結束的步入循環
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) {
int statloc;
pid_t pid;
//獲取當前子進程pid
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
//......
if (pid == server.rdb_child_pid) {
//......
} else if (pid == server.aof_child_pid) {//如果pid為aof的子進程值則調用backgroundRewriteDoneHandler
backgroundRewriteDoneHandler(exitcode,bysignal);
} else {
redisLog(REDIS_WARNING,
"Warning, detected child with unmatched pid: %ld",
(long)pid);
}
updateDictResizePolicy();
}
} else {
//......
}
//......
}
步入backgroundRewriteDoneHandler可以看到,如果AOF刷盤策略是AOF_FSYNC_EVERYSEC即異步刷盤則會調用aof_background_fsync進行文件刷盤,而該方法內部的邏輯就是調用我們上文的所說的提交后臺任務方法bioCreateBackgroundJob:
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
//......
if (server.aof_fd == -1) {
//......
} else {
/* AOF enabled, replace the old fd with the new one. */
oldfd = server.aof_fd;
server.aof_fd = newfd;
if (server.aof_fsync == AOF_FSYNC_ALWAYS)
aof_fsync(newfd);
else if (server.aof_fsync == AOF_FSYNC_EVERYSEC)//如果是異步刷盤則將任務提交到對應的隊列中
//提交異步刷盤任務到REDIS_BIO_AOF_FSYNC隊列中
aof_background_fsync(newfd);
//......
}
server.aof_lastbgrewrite_status = REDIS_OK;
//......
} else if (!bysignal && exitcode != 0) {
//......
} else {
//......
}
//......
}
//調用bioCreateBackgroundJob提交任務到AOF刷盤隊列中
void aof_background_fsync(int fd) {
bioCreateBackgroundJob(REDIS_BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
}
最終,我們就可以在bioCreateBackgroundJob看到aof異步刷盤的任務提交核心步驟:
- 獲取任務參數,以我們aof異步刷盤的邏輯第一個參數就是aof子進程的文件句柄。
- 線程上鎖。
- 任務入隊。
- 喚醒相應線程。
- 釋放互斥鎖。
對應源碼如下,讀者可參考上述說明和注釋了解邏輯:
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
struct bio_job *job = zmalloc(sizeof(*job));
job->time = time(NULL);
//獲取aof子進程的fd
job->arg1 = arg1;
//以本文為例都說null
job->arg2 = arg2;
job->arg3 = arg3;
//上鎖
pthread_mutex_lock(&bio_mutex[type]);
//追加任務到對應job的數組中
listAddNodeTail(bio_jobs[type],job);
bio_pending[type]++;
//通知相關線程消費
pthread_cond_signal(&bio_condvar[type]);
//釋放互斥鎖
pthread_mutex_unlock(&bio_mutex[type]);
}
三、小結
自此我們把redis中主線程和IO任務的線程都以圖解和源碼印證的方式分析完成了,以筆者的理解,設計者所說的redis是單線程的本質上的是強調對于核心的連接建立和指令處理是通過極致壓榨單個線程高效完成,而其余的一些非核心的IO耗時邏輯還是需要多個線程進行異步處理。