使用 Redis 實(shí)現(xiàn)分布式系統(tǒng)輕量級(jí)協(xié)調(diào)技術(shù)
在分布式系統(tǒng)中,各個(gè)進(jìn)程(本文使用進(jìn)程來(lái)描述分布式系統(tǒng)中的運(yùn)行主體,它們可以在同一個(gè)物理節(jié)點(diǎn)上也可以在不同的物理節(jié)點(diǎn)上)相互之間通常是需要協(xié)調(diào)進(jìn)行運(yùn)作的,有時(shí)是不同進(jìn)程所處理的數(shù)據(jù)有依賴關(guān)系,必須按照一定的次序進(jìn)行處理,有時(shí)是在一些特定的時(shí)間需要某個(gè)進(jìn)程處理某些事務(wù)等等,人們通常會(huì)使用分布式鎖、選舉算法等技術(shù)來(lái)協(xié)調(diào)各個(gè)進(jìn)程之間的行為。因?yàn)榉植际较到y(tǒng)本身的復(fù)雜特性,以及對(duì)于容錯(cuò)性的要求,這些技術(shù)通常是重量級(jí)的,比如 Paxos 算法,欺負(fù)選舉算法,ZooKeeper 等,側(cè)重于消息的通信而不是共享內(nèi)存,通常也是出了名的復(fù)雜和難以理解,當(dāng)在具體的實(shí)現(xiàn)和實(shí)施中遇到問(wèn)題時(shí)都是一個(gè)挑戰(zhàn)。
Redis 經(jīng)常被人們認(rèn)為是一種 NoSQL 軟件,但其本質(zhì)上是一種分布式的數(shù)據(jù)結(jié)構(gòu)服務(wù)器軟件,提供了一個(gè)分布式的基于內(nèi)存的數(shù)據(jù)結(jié)構(gòu)存儲(chǔ)服務(wù)。在實(shí)現(xiàn)上,僅使用一個(gè)線程來(lái)處理具體的內(nèi)存數(shù)據(jù)結(jié)構(gòu),保證它的數(shù)據(jù)操作命令的原子特性;它同時(shí)還支持基于 Lua 的腳本,每個(gè) Redis 實(shí)例使用同一個(gè) Lua 解釋器來(lái)解釋運(yùn)行 Lua 腳本,從而 Lua 腳本也具備了原子特性,這種原子操作的特性使得基于共享內(nèi)存模式的分布式系統(tǒng)的協(xié)調(diào)方式成了可能,而且具備了很大的吸引力,和復(fù)雜的基于消息的機(jī)制不同,基于共享內(nèi)存的模式對(duì)于很多技術(shù)人員來(lái)說(shuō)明顯容易理解的多,特別是那些已經(jīng)了解多線程或多進(jìn)程技術(shù)的人。在具體實(shí)踐中,也并不是所有的分布式系統(tǒng)都像分布式數(shù)據(jù)庫(kù)系統(tǒng)那樣需要嚴(yán)格的模型的,而所使用的技術(shù)也不一定全部需要有堅(jiān)實(shí)的理論基礎(chǔ)和數(shù)學(xué)證明,這就使得基于 Redis 來(lái)實(shí)現(xiàn)分布式系統(tǒng)的協(xié)調(diào)技術(shù)具備了一定的實(shí)用價(jià)值,實(shí)際上,人們也已經(jīng)進(jìn)行了不少嘗試。本文就其中的一些協(xié)調(diào)技術(shù)進(jìn)行介紹。
signal/wait 操作
在分布式系統(tǒng)中,有些進(jìn)程需要等待其它進(jìn)程的狀態(tài)的改變,或者通知其它進(jìn)程自己的狀態(tài)的改變,比如,進(jìn)程之間有操作上的依賴次序時(shí),就有進(jìn)程需要等待,有進(jìn)程需要發(fā)射信號(hào)通知等待的進(jìn)程進(jìn)行后續(xù)的操作,這些工作可以通過(guò) Redis 的 Pub/Sub 系列命令來(lái)完成,比如:
- import redis, time
- rc = redis.Redis()
- def wait( wait_for ):
- ps = rc.pubsub()
- ps.subscribe( wait_for )
- ps.get_message()
- wait_msg = None
- while True:
- msg = ps.get_message()
- if msg and msg['type'] == 'message':
- wait_msg = msg
- break
- time.sleep(0.001)
- ps.close()
- return wait_msg
- def signal_broadcast( wait_in, data ):
- wait_count = rc.publish(wait_in, data)
- return wait_count
用這個(gè)方法很容易進(jìn)行擴(kuò)展實(shí)現(xiàn)其它的等待策略,比如 try wait,wait 超時(shí),wait 多個(gè)信號(hào)時(shí)是要等待全部信號(hào)還是任意一個(gè)信號(hào)到達(dá)即可返回等等。因?yàn)?Redis 本身支持基于模式匹配的消息訂閱(使用 psubscribe 命令),設(shè)置 wait 信號(hào)時(shí)也可以通過(guò)模式匹配的方式進(jìn)行。
和其它的數(shù)據(jù)操作不同,訂閱消息是即時(shí)易逝的,不在內(nèi)存中保存,不進(jìn)行持久化保存,如果客戶端到服務(wù)端的連接斷開的話也是不會(huì)重發(fā)的,但是在配置了 master/slave 節(jié)點(diǎn)的情況下,會(huì)把 publish 命令同步到 slave 節(jié)點(diǎn)上,這樣我們就可以同時(shí)在 master 以及 slave 節(jié)點(diǎn)的連接上訂閱某個(gè)頻道,從而可以同時(shí)接收到發(fā)布者發(fā)布的消息,即使 master 在使用過(guò)程中出故障,或者到 master 的連接出了故障,我們?nèi)匀荒軌驈? slave 節(jié)點(diǎn)獲得訂閱的消息,從而獲得更好的魯棒性。另外,因?yàn)閿?shù)據(jù)不用寫入磁盤,這種方法在性能上也是有優(yōu)勢(shì)的。
上面的方法中信號(hào)是廣播的,所有在 wait 的進(jìn)程都會(huì)收到信號(hào),如果要將信號(hào)設(shè)置成單播,只允許其中一個(gè)收到信號(hào),則可以通過(guò)約定頻道名稱模式的方式來(lái)實(shí)現(xiàn),比如:
頻道名稱 = 頻道名前綴 (channel) + 訂閱者全局*** ID(myid)
其中*** ID 可以是 UUID,也可以是一個(gè)隨機(jī)數(shù)字符串,確保全局***即可。在發(fā)送 signal 之前先使用“pubsub channels channel*”命令獲得所有的訂閱者訂閱的頻道,然后發(fā)送信號(hào)給其中一個(gè)隨機(jī)指定的頻道;等待的時(shí)候需要傳遞自己的*** ID,將頻道名前綴和*** ID 合并為一個(gè)頻道名稱,然后同前面例子一樣進(jìn)行 wait。示例如下:
- import random
- single_cast_script="""
- local channels = redis.call('pubsub', 'channels', ARGV[1]..'*');
- if #channels == 0 then
- return 0;
- end;
- local index= math.mod(math.floor(tonumber(ARGV[2])), #channels) + 1;
- return redis.call( 'publish', channels[index], ARGV[3]);
- """
- def wait_single( channel, myid):
- return wait( channel + myid )
- def signal_single( channel, data):
- rand_num = int(random.random() * 65535)
- return rc.eval( single_cast_script, 0, channel, str(rand_num), str(data) )
#p#
分布式鎖 Distributed Locks
分布式鎖的實(shí)現(xiàn)是人們探索的比較多的一個(gè)方向,在 Redis 的官方網(wǎng)站上專門有一篇文檔介紹 基于 Redis 的分布式鎖 ,其中提出了 Redlock 算法,并列出了多種語(yǔ)言的實(shí)現(xiàn)案例,這里作一簡(jiǎn)要介紹。
Redlock 算法著眼于滿足分布式鎖的三個(gè)要素:
- 安全性:保證互斥,任何時(shí)間至多只有一個(gè)客戶端可以持有鎖
- 免死鎖:即使當(dāng)前持有鎖的客戶端崩潰或者從集群中被分開了,其它客戶端最終總是能夠獲得鎖。
- 容錯(cuò)性:只要大部分的 Redis 節(jié)點(diǎn)在線,那么客戶端就能夠獲取和釋放鎖。
鎖的一個(gè)簡(jiǎn)單直接的實(shí)現(xiàn)方法就是用 SET NX 命令設(shè)置一個(gè)設(shè)定了存活周期 TTL 的 Key 來(lái)獲取鎖,通過(guò)刪除 Key 來(lái)釋放鎖,通過(guò)存活周期來(lái)保證避免死鎖。不過(guò)這個(gè)方法存在單點(diǎn)故障風(fēng)險(xiǎn),如果部署了 master/slave 節(jié)點(diǎn),則在特定條件下可能會(huì)導(dǎo)致安全性方面的沖突,比如:
- 客戶端 A 從 master 節(jié)點(diǎn)獲得鎖
- master 節(jié)點(diǎn)在將 key 復(fù)制到 slave 節(jié)點(diǎn)之前崩潰了
- slave 節(jié)點(diǎn)提升為新的 master 節(jié)點(diǎn)
- 客戶端 B 從新的 master 節(jié)點(diǎn)獲得了鎖,而這個(gè)鎖實(shí)際上已經(jīng)由客戶端 A 所持有,導(dǎo)致了系統(tǒng)中有兩個(gè)客戶端在同一時(shí)間段內(nèi)持有同一個(gè)互斥鎖,破壞了互斥鎖的安全性。
在 Redlock 算法中,通過(guò)類似于下面這樣的命令進(jìn)行加鎖:
- SET resource_name my_random_value NX PX 30000
這里的 my_random_value 為全局不同的隨機(jī)數(shù),每個(gè)客戶端需要自己產(chǎn)生這個(gè)隨機(jī)數(shù)并且記住它,后面解鎖的時(shí)候需要用到它。
解鎖則需要通過(guò)一個(gè) Lua 腳本來(lái)執(zhí)行,不能簡(jiǎn)單地直接刪除 Key,否則可能會(huì)把別人持有的鎖給釋放了:
- if redis.call("get",KEYS[1]) == ARGV[1] then
- return redis.call("del",KEYS[1])
- else
- return 0
- end
這個(gè) ARGV[1] 的值就是前面加鎖的時(shí)候的 my_random_value 的值。
如果需要更好的容錯(cuò)性,可以建立一個(gè)有 N(N 為奇數(shù))個(gè)相互獨(dú)立完備的 Redis 冗余節(jié)點(diǎn)的集群,這種情況下,一個(gè)客戶端獲得鎖和釋放鎖的算法如下:
- 先獲取當(dāng)前時(shí)間戳 timestamp_1,以毫秒為單位。
- 以相同的 Key 和隨機(jī)數(shù)值,依次從 N 個(gè)節(jié)點(diǎn)獲取鎖,每次獲取鎖都設(shè)置一個(gè)超時(shí),超時(shí)時(shí)限要保證小于所有節(jié)點(diǎn)上該鎖的自動(dòng)釋放時(shí)間,以免在某個(gè)節(jié)點(diǎn)上耗時(shí)過(guò)長(zhǎng),通常都設(shè)的比較短。
- 客戶端將當(dāng)前時(shí)間戳減去***步中的時(shí)間戳 timestamp_1,計(jì)算獲取鎖總消耗時(shí)間。只有當(dāng)客戶端獲得了半數(shù)以上節(jié)點(diǎn)的鎖,而且總耗時(shí)少于鎖存活時(shí)間,該客戶端才被認(rèn)為已經(jīng)成功獲得了鎖。
- 如果獲得了鎖,則其存活時(shí)間為開始預(yù)設(shè)鎖存活時(shí)間減去獲取鎖總耗時(shí)間。
- 如果客戶端不能獲得鎖,則應(yīng)該馬上在所有節(jié)點(diǎn)上解鎖。
- 如果要重試,則在隨機(jī)延時(shí)之后重新去獲取鎖。
- 獲得了鎖的客戶端要釋放鎖,簡(jiǎn)單地在所有節(jié)點(diǎn)上解鎖即可。
Redlock 算法不需要保證 Redis 節(jié)點(diǎn)之間的時(shí)鐘是同步的(不論是物理時(shí)鐘還是邏輯時(shí)鐘),這點(diǎn)和傳統(tǒng)的一些基于同步時(shí)鐘的分布式鎖算法有所不同。Redlock 算法的具體的細(xì)節(jié)可以參閱 Redis 的官方文檔,以及文檔中列出的多種語(yǔ)言版本的實(shí)現(xiàn)。
選舉算法
在分布式系統(tǒng)中,經(jīng)常會(huì)有些事務(wù)是需要在某個(gè)時(shí)間段內(nèi)由一個(gè)進(jìn)程來(lái)完成,或者由一個(gè)進(jìn)程作為 leader 來(lái)協(xié)調(diào)其它的進(jìn)程,這個(gè)時(shí)候就需要用到選舉算法,傳統(tǒng)的選舉算法有欺負(fù)選舉算法(霸道選舉算法)、環(huán)選舉算法、Paxos 算法、Zab 算法 (ZooKeeper) 等,這些算法有些依賴于消息的可靠傳遞以及時(shí)鐘同步,有些過(guò)于復(fù)雜,難以實(shí)現(xiàn)和驗(yàn)證。新的 Raft 算法相比較其它算法來(lái)說(shuō)已經(jīng)容易了很多,不過(guò)它仍然需要依賴心跳廣播和邏輯時(shí)鐘,leader 需要不斷地向 follower 廣播消息來(lái)維持從屬關(guān)系,節(jié)點(diǎn)擴(kuò)展時(shí)也需要其它算法配合。
選舉算法和分布式鎖有點(diǎn)類似,任意時(shí)刻最多只能有一個(gè) leader 資源。當(dāng)然,我們也可以用前面描述的分布式鎖來(lái)實(shí)現(xiàn),設(shè)置一個(gè) leader 資源,獲得這個(gè)資源鎖的為 leader,鎖的生命周期過(guò)了之后,再重新競(jìng)爭(zhēng)這個(gè)資源鎖。這是一種競(jìng)爭(zhēng)性的算法,這個(gè)方法會(huì)導(dǎo)致有比較多的空檔期內(nèi)沒(méi)有 leader 的情況,也不好實(shí)現(xiàn) leader 的連任,而 leader 的連任是有比較大的好處的,比如 leader 執(zhí)行任務(wù)可以比較準(zhǔn)時(shí)一些,查看日志以及排查問(wèn)題的時(shí)候也方便很多,如果我們需要一個(gè)算法實(shí)現(xiàn) leader 可以連任,那么可以采用這樣的方法:
- import redis
- rc = redis.Redis()
- local_selector = 0
- def master():
- global local_selector
- master_selector = rc.incr('master_selector')
- if master_selector == 1:
- # initial / restarted
- local_selector = master_selector
- else:
- if local_selector > 0:
- # I'm the master before
- if local_selector > master_selector:
- # lost, maybe the db is fail-overed.
- local_selector = 0
- else:
- # continue to be the master
- local_selector = master_selector
- if local_selector > 0:
- # I'm the current master
- rc.expire('master_selector', 20)
- return local_selector > 0
這個(gè)算法鼓勵(lì)連任,只有當(dāng)前的 leader 發(fā)生故障或者執(zhí)行某個(gè)任務(wù)所耗時(shí)間超過(guò)了任期、或者 Redis 節(jié)點(diǎn)發(fā)生故障恢復(fù)之后才需要重新選舉出新的 leader。在 master/slave 模式下,如果 master 節(jié)點(diǎn)發(fā)生故障,某個(gè) slave 節(jié)點(diǎn)提升為新的 master 節(jié)點(diǎn),即使當(dāng)時(shí) master_selector 值尚未能同步成功,也不會(huì)導(dǎo)致出現(xiàn)兩個(gè) leader 的情況。如果某個(gè) leader 一直連任,則 master_selector 的值會(huì)一直遞增下去,考慮到 master_selector 是一個(gè) 64 位的整型類型,在可預(yù)見(jiàn)的時(shí)間內(nèi)是不可能溢出的,加上每次進(jìn)行 leader 更換的時(shí)候 master_selector 會(huì)重置為從 1 開始,這種遞增的方式是可以接受的,但是碰到 Redis 客戶端(比如 Node.js)不支持 64 位整型類型的時(shí)候就需要針對(duì)這種情況作處理。如果當(dāng)前 leader 進(jìn)程處理時(shí)間超過(guò)了任期,則其它進(jìn)程可以重新生成新的 leader 進(jìn)程,老的 leader 進(jìn)程處理完畢事務(wù)后,如果新的 leader 的進(jìn)程經(jīng)歷的任期次數(shù)超過(guò)或等于老的 leader 進(jìn)程的任期次數(shù),則可能會(huì)出現(xiàn)兩個(gè) leader 進(jìn)程,為了避免這種情況,每個(gè) leader 進(jìn)程在處理完任期事務(wù)之后都應(yīng)該檢查一下自己的處理時(shí)間是否超過(guò)了任期,如果超過(guò)了任期,則應(yīng)當(dāng)先設(shè)置 local_selector 為 0 之后再調(diào)用 master 檢查自己是否是 leader 進(jìn)程。
#p#
消息隊(duì)列
消息隊(duì)列是分布式系統(tǒng)之間的通信基本設(shè)施,通過(guò)消息可以構(gòu)造復(fù)雜的進(jìn)程間的協(xié)調(diào)操作和互操作。Redis 也提供了構(gòu)造消息隊(duì)列的原語(yǔ),比如 Pub/Sub 系列命令,就提供了基于訂閱/發(fā)布模式的消息收發(fā)方法,但是 Pub/Sub 消息并不在 Redis 內(nèi)保持,從而也就沒(méi)有進(jìn)行持久化,適用于所傳輸?shù)南⒓词箒G失了也沒(méi)有關(guān)系的場(chǎng)景。
如果要考慮到持久化,則可以考慮 list 系列操作命令,用 PUSH 系列命令(LPUSH, RPUSH 等)推送消息到某個(gè) list,用 POP 系列命令(LPOP, RPOP,BLPOP,BRPOP 等)獲取某個(gè) list 上的消息,通過(guò)不同的組合方式可以得到 FIFO,F(xiàn)ILO,比如:
- import redis
- rc = redis.Redis()
- def fifo_push(q, data):
- rc.lpush(q, data)
- def fifo_pop(q):
- return rc.rpop(q)
- def filo_push(q, data):
- rc.lpush(q, data)
- def filo_pop(q):
- return rc.lpop(q)
如果用 BLPOP,BRPOP 命令替代 LPOP, RPOP,則在 list 為空的時(shí)候還支持阻塞等待。不過(guò),即使按照這種方式實(shí)現(xiàn)了持久化,如果在 POP 消息返回的時(shí)候網(wǎng)絡(luò)故障,則依然會(huì)發(fā)生消息丟失,針對(duì)這種需求 Redis 提供了 RPOPLPUSH 和 BRPOPLPUSH 命令來(lái)先將提取的消息保存在另外一個(gè) list 中,客戶端可以先從這個(gè) list 查看和處理消息數(shù)據(jù),處理完畢之后再?gòu)倪@個(gè) list 中刪除消息數(shù)據(jù),從而確保了消息不會(huì)丟失,示例如下:
- def safe_fifo_push(q, data):
- rc.lpush(q, data)
- def safe_fifo_pop(q, cache):
- msg = rc.rpoplpush(q, cache)
- # check and do something on msg
- rc.lrem(cache, 1) # remove the msg in cache list.
- return msg
如果使用 BRPOPLPUSH 命令替代 RPOPLPUSH 命令,則可以在 q 為空的時(shí)候阻塞等待。
結(jié)語(yǔ)
使用 Redis 作為分布式系統(tǒng)的共享內(nèi)存,以共享內(nèi)存模式為基礎(chǔ)來(lái)實(shí)現(xiàn)分布式系統(tǒng)協(xié)調(diào)技術(shù),雖然不像傳統(tǒng)的基于消息傳遞的技術(shù)那樣有著堅(jiān)實(shí)的理論證明的基礎(chǔ),但是它在一些要求不苛刻的情況下不失為一種簡(jiǎn)單實(shí)用的輕量級(jí)解決方案,畢竟不是每個(gè)系統(tǒng)都需要嚴(yán)格的容錯(cuò)性等要求,也不是每個(gè)系統(tǒng)都會(huì)頻繁地發(fā)生進(jìn)程異常,而且 Redis 本身已經(jīng)經(jīng)受了工業(yè)界的多年實(shí)踐和考驗(yàn)。另外,用 Redis 技術(shù)還有一些額外的好處,比如在開發(fā)過(guò)程中和生產(chǎn)環(huán)境中都可以直接觀察到鎖、隊(duì)列的內(nèi)容,實(shí)施的時(shí)候也不需要額外的特別配置過(guò)程等,它足夠簡(jiǎn)單,在調(diào)試問(wèn)題的時(shí)候邏輯清晰,進(jìn)行排查和臨時(shí)干預(yù)也比較方便。在可擴(kuò)展性方面也比較好,可以動(dòng)態(tài)擴(kuò)展分布式系統(tǒng)的進(jìn)程數(shù)目,而不需要事先預(yù)定好進(jìn)程數(shù)目。
Redis 支持基于 Key 值 hash 的集群,在集群中應(yīng)用本文所述技術(shù)時(shí)建議另外部署專用 Redis 節(jié)點(diǎn)(或者冗余 Redis 節(jié)點(diǎn)集群)來(lái)使用,因?yàn)樵诨?Key 值 hash 的集群中,不同的 Key 值會(huì)根據(jù) hash 值被分布到不同的集群節(jié)點(diǎn)上,而且對(duì)于 Lua 腳本的支持也受到限制,難以保證一些操作的原子性,這一點(diǎn)是需要考慮到的。使用專用節(jié)點(diǎn)還有一個(gè)好處是專用節(jié)點(diǎn)的數(shù)據(jù)量會(huì)少很多,當(dāng)應(yīng)用了 master/slave 部署或者 AOF 模式的時(shí)候,因?yàn)閿?shù)據(jù)量少,master 和 slave 之間的同步會(huì)少很多,AOF 模式實(shí)時(shí)寫入磁盤的數(shù)據(jù)也少很多,這樣子也可以大大提高可用性。
本文示例所列 Python 代碼在 Python3.4 下運(yùn)行,Redis 客戶端采用 redis 2.10.3 ,Redis 服務(wù)端版本為 3.0.1 版。