字節二面:Spring Boot Redis 可重入分布式鎖實現原理?
我是碼哥,可以叫我靚仔。
書接上回,碼哥上一篇《糾正誤區:這才是 SpringBoot Redis 分布式鎖的正確實現方式》分享了分布式鎖如何從錯誤到殘缺,再到青銅版本的高性能 Redis 分布式鎖代碼實戰,讓你一飛沖天。
這是我們最常用的分布式鎖方案,今天碼哥給你來一個進階。
Chaya:「碼哥,上次的分布式鎖版本雖然好,但是不支持可重入獲取鎖,還差一點點意思?!?/p>
Chaya 別急,今日碼哥給你帶來一個高性能可重入 Redis 分布式鎖解決方案,直搗黃龍,一笑破蒼穹。
什么是可重入鎖
當一個線程執行一段代碼成功獲取鎖之后,繼續執行時,又遇到加鎖的代碼,可重入性就就保證線程能繼續執行,而不可重入就是需要等待鎖釋放之后,再次獲取鎖成功,才能繼續往下執行。
public synchronized void a() {
b();
}
public synchronized void b() {
// doWork
}
假設 X 線程在 a 方法獲取鎖之后,繼續執行 b 方法,如果此時不可重入,線程就必須等待鎖釋放,再次爭搶鎖。
鎖明明是被 X 線程擁有,卻還需要等待自己釋放鎖,然后再去搶鎖,這看起來就很奇怪,我釋放我自己~
可重入鎖實現原理
Chaya:「Redis String 數據結構無法滿足可重入鎖,key 表示鎖定的資源,value 是客戶端唯一標識,可重入沒地方放了?!?/p>
我們可以使用 Redis hash 結構實現,key 表示被鎖的共享資源, hash 結構的 fieldKey 存儲客戶端唯一標識,fieldKey 的 value 則保存加鎖的次數。
加鎖原理
可重入鎖加鎖的過程中有以下場景需要考慮。
- 鎖已經被 A 客戶端獲取,客戶端 B 獲取鎖失敗。
- 鎖已經被客戶端 A 獲取,客戶端 A 多次執行獲取鎖操作。
- 鎖沒有被其他客戶端獲取,那么此刻獲取鎖的客戶端可以獲取成功。
按照之前的經驗,多個操作的原子性可以用 lua 腳本實現??芍厝腈i加鎖 lua 腳本如下。
if ((redis.call('exists', KEYS[1]) == 0) or
(redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
return redis.call('pttl', KEYS[1]);
- KEYS[1]是 lockKey 表示獲取的鎖資源,比如 lock:168。
- ARGV[1] 表示表示鎖的有效時間(單位毫秒)。
- ARGV[2] 表示客戶端唯一標識,在 Redisson 中使用 UUID:ThreadID。
下面我來接下是這段腳本的邏輯。
鎖不存在或者鎖存在且值與客戶端唯一標識匹配,則執行 'hincrby' 和 pexpire指令,接著 return nil。表示的含義就是鎖不存在就設置鎖并設置鎖重入計數值為 1,設置過期時間;鎖存在且唯一標識匹配表明當前加鎖請求是鎖重入請求,鎖從如計數 +1,重新鎖超時時間。
- redis.call('exists', KEYS[1]) == 0判斷鎖是否存在,0 表示不存在。
- redis.call('hexists', KEYS[1], ARGV[2]) == 1)鎖存在的話,判斷 hash 結構中 fieldKey 與客戶端的唯一標識是否相等。相等表示當前加鎖請求是鎖重入。
- redis.call('hincrby', KEYS[1], ARGV[2], 1)將存儲在 hash 結構的 ARGV[2] 的值 +1,不存在則支持成 1。
- redis.call('pexpire', KEYS[1], ARGV[1])對 KEYS[1] 設置超時時間。
鎖存在,但是唯一標識不匹配,表明鎖被其他線程持有,調用 pttl返回鎖剩余的過期時間。
Chaya:「“腳本執行結果返回 nil、鎖剩余過期時間有什么目的?”」
當且僅當返回 nil才表示加鎖成功;客戶端需要感知鎖是否成功的結果。
解鎖原理
解鎖邏輯復雜一些,不僅要保證不能刪除別人的鎖。還要確保,重入次數為 0 才能解鎖。
解鎖代碼執行方式與加鎖類似,三個返回值含義如下。
- 1 代表解鎖成功,鎖被釋放。
- 0 代表可重入次數被減 1。
- nil 代表其他線程嘗試解鎖,解鎖失敗。
if (redis.call('hexists', KEYS[1], ARGV[2]) == 0) then
return nil;
end;
local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1);
if (counter > 0) then
redis.call('pexpire', KEYS[1], ARGV[1]);
return 0;
else
redis.call('del', KEYS[1]);
return 1;
end;
return nil;
- KEYS[1]是 lockKey,表示鎖的資源,比如 lock:order:pay。
- ARGV[1],鎖的超時時間。
- ARGV[2],Hash 表的 FieldKey。
首先使用 hexists 判斷 Redis 的 Hash 表是否存在 fileKey,如果不存在則直接返回 nil解鎖失敗。
若存在的情況下,且唯一標識匹配,使用 hincrby 對 fileKey 的值 -1,然后判斷計算之后可重入次數。當前值 > 0 表示持有的鎖存在重入情況,重新設置超時時間,返回值 1;
若值小于等于 0,表明鎖釋放了,執行 del釋放鎖。
Chaya:“可重入鎖很好,依然存在的一個問題是:加鎖后,業務邏輯執行耗時超過了 lockKey 的過期時間,lockKey 會被 Reids 刪除?!?/p>
這個時間不能瞎寫,一般要根據在測試環境多次測試,然后壓測多輪之后,比如計算出接口平均執行時間 200 ms。那么鎖的超時時間就放大為平均執行時間的 3~5 倍。
Chaya:“鎖的超時時間怎么計算合適呢?”
這個時間不能瞎寫,一般要根據在測試環境多次測試,然后壓測多輪之后,比如計算出接口平均執行時間 200 ms。那么鎖的超時時間就放大為平均執行時間的 3~5 倍。
Chaya:“為啥要放大呢?”
因為如果鎖的操作邏輯中有網絡 IO 操作、JVM FullGC 等,線上的網絡不會總一帆風順,我們要給網絡抖動留有緩沖時間。
Chaya:“有沒有完美的方案呢?不管時間怎么設置都不大合適?!?/p>
我們可以讓獲得鎖的線程開啟一個守護線程,用來給當前客戶端快要過期的鎖續航,續命的前提是,得判斷是不是當前進程持有的鎖,如果不是就不進行續。
如果快要過期,但是業務邏輯還沒執行完成,自動對這個鎖進行續期,重新設置過期時間。
這就是下一篇我要說的超神方案,加入看門狗機制實現鎖自動續期。不過鎖自動續期比較復雜,今天的 Redis 可重入分布式鎖王者方案已經可以讓你稱霸武林,接下來上實戰。
可重入分布式鎖實戰
關于 Spring Boot 的環境搭建以及普通分布式鎖實戰詳見上一篇《糾正誤區:這才是 SpringBoot Redis 分布式鎖的正確實現方式》。今天直接上可重入鎖核心代碼。
ReentrantDistributedLock
可重入鎖由ReentrantDistributedLock標識,它實現 Lock接口,構造方法實現 resourceName 和 StringRedisTemplate 的屬性設置。
客戶端唯一標識使用uuid:threadId 組成。
public class ReentrantDistributedLock implements Lock {
/**
* 鎖超時時間,默認 30 秒
*/
protected long internalLockLeaseTime = 30000;
/**
* 標識 id
*/
private final String id = UUID.randomUUID().toString();
/**
* 資源名稱
*/
private final String resourceName;
private final List<String> keys = new ArrayList<>(1);
/**
* Redis 客戶端
*/
private final StringRedisTemplate redisTemplate;
public ReentrantDistributedLock(String resourceName, StringRedisTemplate redisTemplate) {
this.resourceName = resourceName;
this.redisTemplate = redisTemplate;
keys.add(resourceName);
}
}
加鎖 tryLock、lock
tryLock 以阻塞等待 waitTime 時間的方式來嘗試獲取鎖。獲取成功則返回 true,反之 false。
與 tryLock不同的是, lock 一直嘗試自旋阻塞等待獲取分布式鎖,直到獲取成功為止。而 tryLock 只會阻塞等待 waitTime 時間。
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
// lua 腳本獲取鎖
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - current;
// 等待時間用完,獲取鎖失敗
if (time <= 0) {
return false;
}
// 自旋獲取鎖
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
return false;
}
}
}
@Override
public void lock(long leaseTime, TimeUnit unit) {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
do {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
} while (ttl != null);
}
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
// 執行 lua 腳本
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(LuaScript.reentrantLockScript(), Long.class);
return redisTemplate.execute(redisScript, keys, String.valueOf(unit.toMillis(leaseTime)), getRequestId(threadId));
}
private String getRequestId(long threadId) {
return id + ":" + threadId;
}
解鎖 unlock
public void unlock() {
long threadId = Thread.currentThread().getId();
// 執行 lua 腳本
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(LuaScript.reentrantUnlockScript(), Long.class);
Long opStatus = redisTemplate.execute(redisScript, keys, String.valueOf(internalLockLeaseTime), getRequestId(threadId));
if (opStatus == null) {
throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
}
}
LuaScript
這個腳本就是在講解可重入分布式鎖原理具體邏輯已經解釋過,這里就不再重復分析。
public class LuaScript {
private LuaScript() {
}
/**
* 可重入分布式鎖加鎖腳本
*
* @return 當且僅當返回 `nil`才表示加鎖成功;返回鎖剩余過期時間是讓客戶端感知鎖是否成功。
*/
public static String reentrantLockScript() {
return "if ((redis.call('exists', KEYS[1]) == 0) " +
"or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);";
}
/**
* 可重入分布式鎖解鎖腳本
*
* @return 當且僅當返回 `nil`才表示解鎖成功;
*/
public static String reentrantUnlockScript() {
return "if (redis.call('hexists', KEYS[1], ARGV[2]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"return 1; " +
"end; " +
"return nil;";
}
}
RedisLockClient
最后,還需要提供一個客戶端給方便使用。
@Component
public class RedisLockClient {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 獲取可重入分布式鎖
* @param name
* @return
*/
public Lock getReentrantLock(String name) {
return new ReentrantDistributedLock(name, redisTemplate);
}
}
單元測試走一個,驗證下分布式鎖是否支持可重入。
@Slf4j
@SpringBootTest(classes = RedisApplication.class)
public class RedisLockTest {
@Autowired
private RedisLockClient redisLockClient;
@Test
public void testTryReentrantLockSuccess() throws InterruptedException {
Lock lock = redisLockClient.getReentrantLock("order:pay");
try {
boolean isLock = lock.tryLock(10, 30, TimeUnit.SECONDS);
if (!isLock) {
log.warn("加鎖失敗");
return;
}
// 重復加鎖
reentrant(lock);
log.info("業務邏輯執行完成");
} finally {
lock.unlock();
}
}
private void reentrant(Lock lock) throws InterruptedException {
try {
boolean isLock = lock.tryLock(10, 30, TimeUnit.SECONDS);
if (!isLock) {
log.warn("加鎖失敗");
return;
}
log.info("業務邏輯執行完成");
} finally {
lock.unlock();
}
}
}
有兩個點需要注意。
- 釋放鎖的代碼一定要放在 finally{} 塊中。否則一旦執行業務邏輯過程中拋出異常,程序就無法執行釋放鎖的流程。只能干等著鎖超時釋放。
- 加鎖的代碼應該寫在 try {} 代碼中,放在 try 外面的話,如果執行加鎖異常(客戶端網絡連接超時),但是實際指令已經發送到服務端并執行,就會導致沒有機會執行解鎖的代碼。
CHaya:“碼哥,這個方案確實很王者,大開眼界,接下來的超神版可以實現看門狗自動續期么?”
鑒于篇幅有限,今天就跟大家介紹 Redis 可重入分布式鎖王者方案,關注我,下一篇給你分享、超神版分布式鎖解決方案。