Redis源碼學習之Redis事務
Redis作為一個內存型數據庫,同樣支持傳統數據庫的事務特性。這篇文章會從源代碼角度來分析Redis中事務的實現原理。
What
Redis事務提供了一種將多個命令請求打包,然后一次性、按照順序地執行多個命令的機制,并且在事務執行的期間,服務器不會中斷事務而去執行其他不在事務中的命令請求,它會把事務中所有的命令都執行完畢才會去執行其他的命令。
How
Redis中提供了multi、discard、exec、watch、unwatch這幾個命令來實現事務的功能。
Redis的事務始于multi命令,之后跟著要在事務中執行的命令,終于exec命令或者discard命令。加入事務中的所有命令會原子的執行,中間不會穿插執行其他沒有加入事務的命令。
multi、exec和discard
multi命令告訴Redis客戶端要開始一個事物,然后Redis會返回一個OK,接下來所有的命令Redis都不會立即執行,只會返回QUEUED結果,直到遇到了exec命令才會去執行之前的所有的命令,或者遇到了discard命令,會拋棄執行之前加入事務的命令。
- 127.0.0.1:6379> get name
- (nil)
- 127.0.0.1:6379> get gender
- (nil)
- 127.0.0.1:6379> multi
- OK
- 127.0.0.1:6379> set name Slogen
- QUEUED
- 127.0.0.1:6379> set gender male
- QUEUED
- 127.0.0.1:6379> exec
- 1) OK
- 2) OK
- 127.0.0.1:6379> mget name gender
- 1) "Slogen"
- 2) "male"
watch
watch命令是Redis提供的一個樂觀鎖,可以在exec執行之前,監視任意數量的數據庫key,并在exec命令執行的時候,檢測被監視的key是否至少有一個已經被修改,如果是的話,服務器將拒絕執行事務,并向客戶端返回代表事務執行失敗的空回復。
首先在client1執行下列命令:
- 127.0.0.1:6379> get name
- (nil)
- 127.0.0.1:6379> watch name
- OK
- 127.0.0.1:6379> multi
- OK
- 127.0.0.1:6379> set name slogen
- QUEUED
- 127.0.0.1:6379> set gender male
- QUEUED
- 127.0.0.1:6379> get name
- QUEUED
這個時候client還沒有執行exec命令,接下來在client2下執行下面命令修改name:
- 127.0.0.1:6379> set name rio
- OK
- 127.0.0.1:6379> get name
- "rio"
接下來在client1下執行exec命令:
- 127.0.0.1:6379> exec
- (nil)
- 127.0.0.1:6379> get name
- "rio"
從執行結果可以看到,在client1中執行exec命令的時候,Redis會檢測到name字段已經被其他客戶端修改了,所以拒絕執行事務中所有的命令,直接返回nil表示執行失敗。這個時候獲取到的name的值還是在client2中設置的rio。
Why
multi
Redis的事務始于multi命令,那么就從multi命令的源代碼開始分析。
當Redis接收到客戶端發送過來的命令之后會執行multiCommand()這個方法,這個方法在multi.c文件中。
- void multiCommand(client *c) {
- // 1. 如果檢測到flags里面已經包含了CLIENT_MULTI
- // 表示對應client已經處于事務的上下文中,返回錯誤
- if (c->flags & CLIENT_MULTI) {
- addReplyError(c,"MULTI calls can not be nested");
- return;
- }
- // 2. 開啟flags的CLIENT_MULTI標識
- c->flags |= CLIENT_MULTI;
- // 3. 返回ok,告訴客戶端已經成功開啟事務
- addReply(c,shared.ok);
- }
從源代碼中可以看到,multiCommand()主要完成下面三件事:
- 檢測發送multi命令的client是否已經處于事務中,如果是則直接返回錯誤。從這里可以看到,Redis不支持事務嵌套執行。
- 給對應client的flags標志位中增加MULTI_CLIENT標志,表示已經進入事務中。
- 返回OK告訴客戶端已經成功開啟事務。
從前面的文章中可以知道,Redis接收到所有的Client發送過來的命令后都會執行到processCommand()這個方法中,在processCommand()中有下面這部分代碼:
在processCommand()執行實際的命令之前會先判斷對應的client是否已經處于事務的上下文中,如果是的話,且需要執行的命令不是exec、discard、multi和watch這四個命令中的任何一個,則調用queueMultiCommand()方法把需要執行的命令加入隊列中,否則的話調用call()直接執行命令。
queueMultiCommand()
Redis調用queueMultiCommand()方法把加入事務的命令加入Redis隊列中,實現如下:
queueMultiCommand()方法主要是把要加入事務的命令封裝在multiCmd結構的變量,然后放置到client->mstate.commands數組中去,multiCmd的定義如下:
- typedef struct multiCmd {
- robj **argv; // 命令的參數數組
- int argc; // 命令的參數個數
- struct redisCommand *cmd; // 要執行的命令
- } multiCmd;
而mstate字段定義為:
- typedef struct client {
- // 其他省略代碼
- multiState mstate; /* MULTI/EXEC state */
- } client;
multiState的結構為:
- typedef struct multiState {
- multiCmd *commands; /* Array of MULTI commands */
- int count; /* Total number of MULTI commands */
- int minreplicas; /* MINREPLICAS for synchronous replication */
- time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */
- } multiState;
- commands:multiCmd類型的數組,存放著事務中所有的要執行的命令
- count:當前事務中所有已經存放的命令的個數
另外兩個字段當前版本中(3.2.28)沒用上。
假設當前事務隊列中已經存在set name slogen和lpush num 20這兩個命令的時候,client中的mstate的數據如下:
這個時候再往事務中添加get name這個命令的時候結構圖如下:
錯誤命令:CLIENT_DIRTY_EXEC
那么有個問題,比如我往事務中添加的命令是個不存在的命令,或者命令使用方式,比如命令參數不對,這個時候這個命令會被加入事務嗎?
前面說了,Redis接收到的所有的命令都是執行到processCommand()這個方法,在實際執行對應的命令前,processCommand()方法都會對將要執行的命令進行一系列的檢查,代碼如下:
從上面代碼可以看到,processCommand()在對要執行的命令進行的一系列檢查的時候如果有任何一項檢測失敗都會調用flagTransaction()函數然后返回對應的信息給客戶端,flagTransaction()實現如下:
- void flagTransaction(client *c) {
- if (c->flags & CLIENT_MULTI)
- // 如果flags包含CLIENT_MULTI標志位,表示已經處于事務上下文中
- // 則給對應的client的flags開啟CLIENT_DIRTY_EXEC標志位
- c->flags |= CLIENT_DIRTY_EXEC;
- }
flagTransaction()方法會檢測對應的client是否處于事務的上下文中,如果是的話就給對應的client的flags字段開啟CLIENT_DIRTY_EXEC標志位。
也就是說,如果命令在加入事務的時候由于各種原因,比如命令不存在,或者對應的命令參數不正確,則對應的命令不會被添加到mstate.commands數組中,且同時給對應的client的flags字段開啟CLIENT_DIRTY_EXEC標志位。
watch命令
當client處于事務的上下文中時,watch命令屬于可以被立即執行的幾個命令之一,watch命令對應的代碼為watchCommand()函數,實現如下:
- void watchCommand(client *c) {
- int j;
- if (c->flags & CLIENT_MULTI) {
- // 如果執行watch命令的client處于事務的上下文中則直接返回
- addReplyError(c,"WATCH inside MULTI is not allowed");
- return;
- }
- for (j = 1; j < c->argc; j++)
- // 對傳入的每個要watch的可以調用watchForKey()
- watchForKey(c,c->argv[j]);
- addReply(c,shared.ok);
- }
watchCommand()方法會首先判斷執行watch的命令是否已經處于事務的上下文中,如果是的話則直接報錯返回,說明在Redis事務中不能調用watch命令。
接下來對于watch命令傳入的所有的key,依次調用watchForKey()方法,定義如下:
watchForKey()方法會做下面幾件事:
- 判斷對應的key是否已經存在于client->watched_keys列表中,如果已經存在則直接返回。client->watched_keys保存著對應的client對象所有的要監視的key。
- 如果不存在,則去client->db->watched_keys中查找所有的已經監視了這個key的client對象。client->db->watched_keys以dict的結構保存了所有的監視這個key的client列表。
- 如果第二步中的列表存在,則把執行watch命令的client添加到這個列表的尾部,如果不存在,表示還沒有任何一個client監視這個key,則新建一個列表,添加到client->db->watched_keys中,然后把執行watch命令的client添加到新生成的列表的尾部。
- 把傳入的key封裝成一個watchedKey結構的變量,添加到client->watched_key列表的最后面。
假設當前client->db->watched_keys的監測情況如下圖所示:
而client->watched_keys的監測情況如下:
這個時候client_A執行watch key1 key2 key3這個命令,執行完命令之后client->db->watched_keys結果為
而client->watched_keys結果為

對于key1,目前還沒有client對key1進行監視,所以這個時候client_A會新建一個列表,把自己添加到這個列表中然后把映射關系添加到client->db->watched_keys中去,之后會把key1添加到client->watched_keys列表的最后。
對于key2,由于已經存在于watched_keys列表中,所以會直接返回不做任何處理。
對于key3,由于client->db->watched_keys中已經有client_B和client_C在監視它,所以會直接把client_A添加到監視列表的末尾之后再把key3添加到client_A的監視列表中去。
修改數據:CLIENT_DIRTY_CAS
watch命令的作用就是用在事務中檢測被監視的key是否被其他的client修改了,如果已經被修改,則阻止事務的執行,那么這個功能是怎么實現的呢?
這里以set命令為例進行分析。
假設client_A執行了watch name這個命令然后執行multi命令開啟了事務但是還沒有執行exec命令,這個時候client_B執行了set name slogen這個命令,整個過程如下:
時間 | client_A | client_B |
---|---|---|
T1 | watch name | |
T2 | multi | |
T3 | get name | |
T4 | set name slogen | |
T5 | exec |
在T4的時候client_B執行了set命令修改了name,Redis收到set命令之后會執行setCommand方法,實現如下:
- void setCommand(client *c) {
- // 其他省略代碼
- setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
- }
在setCommand()最后會調用setGenericCommand()方法,改方法實現如下:
- void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
- // 其他省略代碼
- setKey(c->db,key,val);
- // 其他省略代碼
- }
在setGenericCommand()方法中會調用setKey()這個方法,接著看下setKey()這個方法:
- void setKey(redisDb *db, robj *key, robj *val) {
- if (lookupKeyWrite(db,key) == NULL) {
- dbAdd(db,key,val);
- } else {
- dbOverwrite(db,key,val);
- }
- incrRefCount(val);
- removeExpire(db,key);
- // 通知修改了key
- signalModifiedKey(db,key);
- }
在setKey()方法最后會調用signaleModifiedKey()通知redis數據庫中有數據被修改,signaleModifiedKey()方法實現如下:
- void signalModifiedKey(redisDb *db, robj *key) {
- touchWatchedKey(db,key);
- }
可以看到signalModifiedKey()也僅僅是調用touchWatchedKey()方法,代碼如下:
- void touchWatchedKey(redisDb *db, robj *key) {
- list *clients;
- listIter li;
- listNode *ln;
- if (dictSize(db->watched_keys) == 0) return;
- // 1. 從redisDb->watched_keys中找到對應的client列表
- clients = dictFetchValue(db->watched_keys, key);
- if (!clients) return;
- /* Mark all the clients watching this key as CLIENT_DIRTY_CAS */
- /* Check if we are already watching for this key */
- listRewind(clients,&li);
- while((ln = listNext(&li))) {
- // 2.依次遍歷client列表,給每個client的flags字段
- // 開啟CLIENT_DIRTY_CAS標識位
- client *c = listNodeValue(ln);
- c->flags |= CLIENT_DIRTY_CAS;
- }
- }
touchWatchedKey()方法會做下面兩件事:
- 從redisDb->watched_keys中找到監視這個key的client列表。前面在分析watch命令的時候說過,如果有client執行了watch keys命令,那么redis會以鍵值對的形式把(key,client)的對應關系保存在redisDb->watched_key這個字段里面。
- 對于第一步中找到的每個client對象,都會給這個client的flags 字段開啟CLIENT_DIRTY_CAS標志位。
在Redis里面所有會修改數據庫內容的命令最后都會調用signalModifiedKey()這個方法,而在signalModifiedKey()會給所有的監視這個key的client增加CLIENT_DIRTY_CAS標志位。
exec命令
exec命令用來執行事務,對應的代碼為execCommand()這個方法,實現如下:
- void execCommand(client *c) {
- int j;
- robj **orig_argv;
- int orig_argc;
- struct redisCommand *orig_cmd;
- int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */
- // 1. 判斷對應的client是否屬于事務中
- if (!(c->flags & CLIENT_MULTI)) {
- addReplyError(c,"EXEC without MULTI");
- return;
- }
- /**
- * 2. 檢查是否需要執行事務,在下面兩種情況下不會執行事務
- * 1) 有被watch的key被其他的客戶端修改了,對應于CLIENT_DIRTY_CAS標志位被開啟
- * ,這個時候會返回一個nil,表示沒有執行事務
- * 2) 有命令在加入事務隊列的時候發生錯誤,對應于CLIENT_DIRTY_EXEC標志位被開啟
- * ,這個時候會返回一個execaborterr錯誤
- */
- if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) {
- addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr :
- shared.nullmultibulk);
- // 取消所有的事務
- discardTransaction(c);
- goto handle_monitor;
- }
- /* Exec all the queued commands */
- // 3. unwatch所有被這個client watch的key
- unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
- orig_argv = c->argv;
- orig_argc = c->argc;
- orig_cmd = c->cmd;
- addReplyMultiBulkLen(c,c->mstate.count);
- // 4. 依次執行事務隊列中所有的命令
- for (j = 0; j < c->mstate.count; j++) {
- c->argc = c->mstate.commands[j].argc;
- c->argv = c->mstate.commands[j].argv;
- c->cmd = c->mstate.commands[j].cmd;
- /* Propagate a MULTI request once we encounter the first write op.
- * This way we'll deliver the MULTI/..../EXEC block as a whole and
- * both the AOF and the replication link will have the same consistency
- * and atomicity guarantees. */
- if (!must_propagate && !(c->cmd->flags & CMD_READONLY)) {
- execCommandPropagateMulti(c);
- must_propagate = 1;
- }
- call(c,CMD_CALL_FULL);
- /* Commands may alter argc/argv, restore mstate. */
- c->mstate.commands[j].argc = c->argc;
- c->mstate.commands[j].argv = c->argv;
- c->mstate.commands[j].cmd = c->cmd;
- }
- c->argv = orig_argv;
- c->argc = orig_argc;
- c->cmd = orig_cmd;
- // 5. 重置這個client對應的事務相關的所有的數據
- discardTransaction(c);
- /* Make sure the EXEC command will be propagated as well if MULTI
- * was already propagated. */
- if (must_propagate) server.dirty++;
- handle_monitor:
- if (listLength(server.monitors) && !server.loading)
- replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
- }
execCommand()方法會做下面幾件事:
- 判斷對應的client是否已經處于事務中,如果不是,則直接返回錯誤。
- 判斷時候需要執行事務中的命令。在下面兩種情況下不會執行事務而是返回錯誤。
- 有被監視的key被其他的客戶端修改了,對應于CLIENT_DIRTY_CAS標志位被開啟,這個時候會返回一個nil,表示沒有執行事務。
- 有命令在加入事務隊列的時候發生錯誤,對應于CLIENT_DIRTY_EXEC標志位被開啟,這個時候會返回一個execaborterr錯誤。
- unwatch所有被這個client監視的key。
- 依次執行事務隊列中所有的命令。
- 重置這個client對應的事務相關的所有的數據。
discard
使用discard命令可以取消一個事務,對應的方法為discardCommand(),實現如下:
- void discardCommand(client *c) {
- // 1. 檢查對應的client是否處于事務中
- if (!(c->flags & CLIENT_MULTI)) {
- addReplyError(c,"DISCARD without MULTI");
- return;
- }
- // 2. 取消事務
- discardTransaction(c);
- addReply(c,shared.ok);
- }
discardCommand()方法首先判斷對應的client是否處于事務中,如果不是則直接返回錯誤,否則的話會調用discardTransaction()方法取消事務,該方法實現如下:
- void discardTransaction(client *c) {
- // 1. 釋放所有跟MULTI/EXEC狀態相關的資源
- freeClientMultiState(c);
- // 2. 初始化相應的狀態
- initClientMultiState(c);
- // 3. 取消對應client的3個標志位
- c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC);
- // 4.unwatch所有已經被watch的key
- unwatchAllKeys(c);
- }
Other
Atomic:原子性
原子性是指一個事務(transaction)中的所有操作,要么全部完成,要么全部不完成,不會結束在中間某個環節。
對于Redis的事務來說,事務隊列中的命令要么全部執行完成,要么一個都不執行,因此Redis的事務是具有原子性的。
注意Redis不提供事務回滾機制。
Consistency:一致性
事務的一致性是指事務的執行結果必須是使事務從一個一致性狀態變到另一個一致性狀態,無論事務是否執行成功。
- 命令加入事務隊列失敗(參數個數不對?命令不存在?),整個事務不會執行。所以事務的一致性不會被影響。
- 使用了watch命令監視的key只事務期間被其他客戶端修改,整個事務不會執行。也不會影響事務的一致性。
- 命令執行錯誤。如果事務執行過程中有一個活多個命令錯誤執行失敗,服務器也不會中斷事務的執行,會繼續執行事務中剩下的命令,并且已經執行的命令不會受任何影響。出錯的命令將不會執行,也就不會對數據庫做出修改,因此這種情況下事物的一致性也不會受到影響。
- 服務器宕機。服務器宕機的情況下的一致性可以根據服務器使用的持久化方式來分析。
- 無持久化模式下,事務是一致的。這種情況下重啟之后的數據庫沒有任何數據,因此總是一致的。
- RDB模式下,事務也是一致的。服務器宕機重啟之后可以根據RDB文件來恢復數據,從而將數據庫還原到一個一致的狀態。如果找不到可以使用的RDB文件,那么重啟之后數據庫是空白的,那也是一致的。
- AOF模式下,事務也是一致的。服務器宕機重啟之后可以根據AOF文件來恢復數據,從而將數據庫還原到一個一直的狀態。如果找不到可以使用的AOF文件,那么重啟之后數據庫是空白的,那么也是一致的。
Isolation:隔離性
Redis 是單進程程序,并且它保證在執行事務時,不會對事務進行中斷,事務可以運行直到執行完所有事務隊列中的命令為止。因此,Redis 的事務是總是帶有隔離性的。
Durability:持久性
Redis事務并沒有提供任何的持久性功能,所以事務的持久性是由Redis本身所使用的持久化方式來決定的。
- 在單純的內存模式下,事務肯定是不持久的。
- 在RDB模式下,服務器可能在事務執行之后RDB文件更新之前的這段時間失敗,所以RDB模式下的Redis事務也是不持久的。
- 在AOF的always模式下,事務的每條命令在執行成功之后,都會立即調用fsync或fdatasync將事務數據寫入到AOF文件。但是,這種保存是由后臺線程進行的,主線程不會阻塞直到保存成功,所以從命令執行成功到數據保存到硬盤之間,還是有一段非常小的間隔,所以這種模式下的事務也是不持久的。
- 其他AOF模式也和always模式類似,所以它們都是不持久的。
結論:Redis的事務滿足原子性、一致性和隔離性,但是不滿足持久性。
Reference
- Redis源碼(3.2.28)
- 《Redis設計與實現》