用Go語言&&Redis實現分布式鎖,我還是第一次
一、為什么需要分布式鎖
共享資源訪問控制: 當多個節點需要同時訪問共享資源時,為了避免并發寫入導致數據不一致,需要使用分布式鎖確保同時只有一個節點可以寫入或修改共享資源。
避免重復執行: 在分布式系統中,某些操作可能需要在整個系統中只執行一次,比如定時任務、數據初始化等。為了避免多個節點同時執行這些操作,需要使用分布式鎖來確保只有一個節點可以執行。
任務協調: 在分布式任務隊列中,多個節點競爭執行任務時,可能需要對任務進行加鎖,以確保每個任務只被一個節點執行,避免重復執行或者操作沖突。
防止死鎖: 在分布式系統中,由于網絡延遲、節點故障等原因,可能會導致死鎖情況的發生。分布式鎖可以用來避免死鎖的發生,通過設置合理的超時時間和重試機制,確保鎖在一定時間內被釋放。
分布式系統中共享同一個資源時,就需要分布式鎖來確保變更資源的一致性。這就是為什么要用到分布式鎖的原因咯。
二、分布式鎖需要具備特性
1 互斥性(Mutual Exclusion): 在任何時刻,只能有一個客戶端持有鎖,其他客戶端不能同時持有該鎖。這是最基本的鎖特性,確保在同一時間只有一個客戶端能夠訪問共享資源。
2 安全性(Safety): 在鎖被釋放之前,任何其他客戶端都不能獲得該鎖。即使是在網絡分區、節點故障等異常情況下,也要確保鎖的安全性,避免數據不一致或者操作沖突。
3 活性(Liveness): 鎖應該能夠在合理的時間內被獲取,避免長時間的等待導致死鎖或者無法響應其他客戶端請求。活性也包括在鎖被釋放后,其他客戶端能夠盡快地獲取到該鎖。
4 容錯性(Fault Tolerance): 分布式系統中可能會發生網絡分區、節點故障等異常情況,分布式鎖需要具備容錯性,能夠在這些異常情況下正確地工作。比如,鎖的實現應該能夠處理網絡分區導致的消息丟失或者超時等情況。
5 性能(Performance): 鎖的實現應該盡可能地減少鎖競爭和通信開銷,提高系統的性能。例如,可以使用高效的算法和數據結構來減少鎖的持有時間和等待時間,或者采用緩存和批處理等技術來減少通信開銷。
6 可擴展性(Scalability): 鎖的實現應該能夠隨著系統規模的增長而擴展,確保在高并發和大規模的分布式環境下仍然能夠保持良好的性能和可用性。
三、實現 Redis 鎖應先掌握的知識點
set 命令
SET key value [EX seconds] [PX milliseconds] [NX|XX]
- EX second :設置鍵的過期時間為 second 秒。SET key value EX second 效果等同于 SETEX key second value。
- PX millisecond :設置鍵的過期時間為 millisecond 毫秒。SET key value PX millisecond ,效果等同于 PSETEX key millisecond value 。
- NX :鍵不存在時,才對鍵進行設置操作。SET key value NX 等同于 SETNX key value 。
- XX :鍵已經存在時,才對鍵進行設置操作。
Redis.lua 腳本
我們可以使用 redis lua 腳本,將一系列命令操作封裝成 pipline,實現整體操作的原子性。
加鎖的整個流程,詳細原理說明看注釋
-- Lua 腳本實現 Redis 分布式鎖
-- 生成唯一標識
local requestId = ARGV[1]
-- 嘗試獲取鎖
local lockKey = KEYS[1]
local lockValue = requestId
local lockExpireTime = tonumber(ARGV[2])
local result = redis.call('SET', lockKey, lockValue, 'NX', 'PX', lockExpireTime)
-- 判斷獲取鎖的結果
if result == 'OK' then
-- 獲取鎖成功,設置鎖的過期時間
return 'OK'
else
-- 獲取鎖失敗
return 'FAIL'
end
1 生成唯一標識: 首先,在客戶端生成一個唯一的標識,可以是 UUID、Snowflake 算法生成的分布式 ID 等。
2 嘗試獲取鎖: 客戶端將生成的唯一標識作為參數,調用 Redis 的 SET 命令嘗試獲取鎖。可以使用 NX(如果鍵不存在則設置)和 PX(設置鍵的過期時間)選項,確保只有一個客戶端能夠成功獲取到鎖。
3 判斷獲取鎖的結果: 如果獲取鎖成功,SET 命令會返回 OK,表示當前客戶端成功獲取了鎖。如果獲取鎖失敗,說明已經有其他客戶端持有了鎖,此時客戶端需要進行等待或者返回失敗。
4 設置鎖的過期時間: 在成功獲取鎖之后,客戶端需要設置鎖的過期時間,以防止因為客戶端崩潰或者其他原因導致鎖一直占用,造成死鎖。
5 返回獲取鎖的結果: 根據 SET 命令的返回值,客戶端判斷是否成功獲取到了鎖,并將結果返回給調用方。
加鎖流程圖
圖片
解鎖流程
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
1 使用 KEYS[1] 獲取傳入的鎖鍵名。
2 使用 ARGV[1] 獲取傳入的鎖值(即加鎖時設置的唯一標識)。
3 判斷當前鎖是否存在且鎖值與傳入的鎖值相同,若是,則調用 DEL 命令刪除該鎖,并返回 1 表示解鎖成功。
4 若鎖不存在或鎖值不匹配,則返回 0 表示解鎖失敗。
解鎖的流程圖
圖片
源碼解析
package redis
import (
"math/rand"
"strconv"
"sync/atomic"
"time"
red "github.com/go-redis/redis"
"github.com/tal-tech/go-zero/core/logx"
)
const (
letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
randomLen = 16
// 默認超時時間,用來防止死鎖
tolerance = 300 // milliseconds
millisPerSecond = 800
lockCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then
redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2])
return "OK"
else
return redis.call("SET", KEYS[1], ARGV[1], "NX", "PX", ARGV[2])
end`
delCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end`
)
type redisLock struct {
// redis客戶端
store *Redis
// 超時時間
seconds uint32
// 鎖key
keys string
// 鎖value,防止鎖被別人獲取到
value string
}
func init() {
rand.Seed(time.Now().UnixNano())
}
// NewRedisLock returns a RedisLock.
func NewRedisLock(store *Redis, keys string) *RedisLock {
return &RedisLock{
store: store,
keys: keys,
// 獲取鎖時,鎖的值通過隨機字符串生成
// 實際上go-zero提供更加高效的隨機字符串生成方式
// 見core/stringx/random.go:Randn
value: randomStr(randomLen),
}
}
// Acquire acquires the lock.
// 加鎖
func (rl *RedisLock) Acquire() (bool, error) {
// 獲取過期時間
seconds := atomic.LoadUint32(&rl.seconds)
// 默認鎖過期時間為500ms,防止死鎖
resp, err := rl.store.Eval(lockCommand, []string{rl.keys}, []string{
rl.value, strconv.Itoa(int(seconds)*millisPerSecond + tolerance),
})
if err == red.Nil {
return false, nil
} else if err != nil {
logx.Errorf("Error on lock for %s, %s", rl.key, err.Error())
return false, err
} else if resp == nil {
return false, nil
}
reply, ok := resp.(string)
if ok && reply == "OK" {
return true, nil
}
logx.Errorf("Unknown reply lock for %s: %v", rl.keys, resp)
return false, nil
}
// Release releases the lock.
// 釋放鎖
func (rl *RedisLock) Release() (bool, error) {
resp, err := rl.store.Eval(delCommand, []string{rl.keys}, []string{rl.value})
if err != nil {
return false, err
}
reply, ok := resp.(int64)
if !ok {
return false, nil
}
return reply == 1, nil
}
func randomStr(n int) string {
b := make([]byte, n)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return string(b)
}
// SetExpire sets the expire.
// 需要注意的是需要在Acquire()之前調用
// 不然默認為300ms自動釋放
func (rl *RedisLock) SetExpire(seconds int) {
atomic.StoreUint32(&rl.seconds, uint32(seconds))
}
這個詳細源碼根據自己的業務需要,可以利用。