作者 | 蔡柱梁
審校 | 重樓
目錄
- 分布式鎖介紹
- 如何實現分布式鎖
- 實現分布式鎖
1 分布式鎖介紹
現在的服務往往都是多節點,在一些特定的場景下容易產生并發問題,比如扣減庫存,送完即止活動,中臺的批量導入(有唯一校驗要求)等等。這時,我們可以通過分布式鎖解決這些問題。
2 如何實現分布式鎖
實現的方式有很多種,如:
- 基于 MySQL 等數據庫實現
- 基于 ZooKeeper 實現
- 基于 Redis 實現不管采用什么技術棧實現,但是邏輯流程都是大體不差的。下面是筆者自己在工作中基于Redis 實踐過的流程圖:
3 實現分布式鎖
其實可以不用自己手寫,現在有一個中間件Redisson 相當好用,十分推薦。這里的實現更多是用于學習。
3.1 Redis 是單節點的情況下實現的分布式鎖
需要使用分布式鎖的業務代碼如下:
package com.example.demo.test.utils;
import com.example.demo.utils.RedisLockUtil;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
/**
* @author CaiZhuliang
* @date 2023/8/31
*/
@Slf4j
@SpringBootTest
public class RedisLockUtilTest {
@Autowired
private RedisLockUtil redisLockUtil;
@Test
public void simpleLockTest() {
String key = "redis:lock:" + System.currentTimeMillis();
boolean result = redisLockUtil.lock(key, 8_000L);
if (result) {
try {
// do something
} catch (Exception e) {
log.error("simpleLockTest - 系統異常!", e);
} finally {
boolean unlock = redisLockUtil.unlock(key);
if (!unlock) {
log.error("simpleLockTest - 釋放鎖失敗,key : {}", key);
}
}
}
}
}
分布式鎖工具類代碼如下:
package com.example.demo.utils;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author CaiZhuliang
* @date 2023/8/31
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class RedisLockUtil {
private static final ScheduledExecutorService EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(50,
new BasicThreadFactory.Builder()
.namingPattern("redisLockUtil-schedule-pool-%d")
.daemon(true)
.build());
private static final ThreadLocal<String> THREAD_LOCAL = new ThreadLocal<>();
private final RedisTemplate<String, String> redisTemplate;
/**
* 釋放鎖
* <p>必須和RedisLockUtil#simpleLock是同一個線程</p>
* @param key 需要釋放鎖的key
* @return true-成功 false-失敗
*/
public boolean releaseSimpleLock(String key) {
String token = THREAD_LOCAL.get();
try {
String remoteToken = redisTemplate.opsForValue().get(key);
if (!token.equals(remoteToken)) {
// 當前線程不再持有鎖
return false;
}
// 是自己持有鎖才能釋放
return Boolean.TRUE.equals(redisTemplate.delete(key));
} catch (Exception e) {
log.error("非cluster模式簡單分布式鎖 - 釋放鎖發生異常,key : {}", key, e);
return false;
} finally {
THREAD_LOCAL.remove();
}
}
/**
* 這個方法不考慮Redis的集群架構,不考慮腦裂問題,當只有一個Redis來考慮。
* @param key 需要上鎖的key
* @param expireTime 過期時間,單位:毫秒
* @return true-成功 false-失敗
*/
public boolean simpleLock(String key, Long expireTime) {
if (StringUtils.isBlank(key)) {
log.warn("非cluster模式簡單分布式鎖 - key is blank");
return false;
}
if (null == expireTime || expireTime <= 0) {
expireTime = 0L;
}
String token = UUID.randomUUID().toString();
// 續約周期,單位納秒
long renewPeriod = expireTime / 2 * 1000_000;
try {
// 設置鎖
Boolean result = redisTemplate.opsForValue().setIfAbsent(key, token, expireTime, TimeUnit.MILLISECONDS);
if (Boolean.FALSE.equals(result)) {
return false;
}
// 上鎖成功后將令牌綁定當前線程
THREAD_LOCAL.set(token);
if (renewPeriod > 0) {
// 續約任務
renewTask(key, token, expireTime, renewPeriod);
}
return true;
} catch (Exception e) {
log.error("非cluster模式簡單分布式鎖 - 上鎖失敗。", e);
THREAD_LOCAL.remove();
return false;
}
}
/**
* 鎖續約任務
* @param key 需要續命的key
* @param token 成功獲鎖的線程持有的令牌
* @param expireTime 過期時間,單位:毫秒
* @param renewPeriod 續約周期,單位:納秒
*/
private void renewTask(String key, String token, long expireTime, long renewPeriod) {
EXECUTOR_SERVICE.schedule(() -> {
ValueOperations<String, String> valueOperator = redisTemplate.opsForValue();
String val = valueOperator.get(key);
if (token.equals(val)) {
// 是自己持有鎖才能續約
try {
Boolean result = valueOperator.setIfPresent(key, token, expireTime, TimeUnit.MILLISECONDS);
if (Boolean.TRUE.equals(result)) {
// 續約成功
log.debug("非cluster模式簡單分布式鎖 - 鎖續約成功,key : {}", key);
// 開啟下一次續約任務
renewTask(key, token, expireTime, renewPeriod);
} else {
log.error("非cluster模式簡單分布式鎖 - 鎖續約失敗,key : {}", key);
}
} catch (Exception e) {
// 這里異常是拋不出去的,所以需要 catch 打印
log.error("非cluster模式簡單分布式鎖 - 鎖續約發生異常,key : {}", key, e);
}
} else {
log.error("非cluster模式簡單分布式鎖 - 鎖續約失敗,不再持有token,key : {}", key);
}
}, renewPeriod, TimeUnit.NANOSECONDS);
}
}
這就是一個最簡單的實現方式。不過這里存在著許多問題:
- 續約任務
這里判斷是否持有令牌和續約這兩個動作不在同一個事務里,可能發生覆蓋現象。假設A線程判斷自己持有令牌,但是一直沒有請求 Redis 導致鎖過期。B線程成功獲鎖,這時A線程往下執行 Redis 請求,結果A線程搶了B線程的鎖。
- 釋放鎖
這里判斷是否持有令牌和刪除key這兩個動作不在同一個事務里,可能出現誤刪現象。假設A線程現在要釋放鎖,通過了令牌判斷,準備刪除 key 但是還沒執行。這時 key 過期了,B線程成功獲鎖。接著A線程執行刪除 key 導致了 B 線程的鎖被刪除。
因此,判斷持有令牌與續約/刪除key這兩個動作是需要原子性的,我們可以通過 lua 來實現。
擴展,了解管道與 lua 的區別
- pipeline(多用于命令簡單高效,無關聯的場景)
優點:使用簡單,有效減少網絡IO
缺點:本質還是發送命令請求Redis 服務,如果效率過低,就會阻塞 Redis,導致 Redis 無法處理其他請求
- lua(多用于命令復雜,命令間有關聯的場景)
優點:
- Redis 支持 lua 腳本,Redis 服務執行 lua 的同時是可以處理別的請求的,不會產生阻塞
- 命令都在腳本中,有效減少網絡IO
- 具有原子性
缺點:
有一定的學習成本
3.1.1 使用 lua 進行優化
RedisLockUtil 代碼如下:
package com.example.demo.utils;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author CaiZhuliang
* @date 2023/8/31
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class RedisLockUtil {
private static final ScheduledExecutorService EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(50,
new BasicThreadFactory.Builder()
.namingPattern("redisLockUtil-schedule-pool-%d")
.daemon(true)
.build());
private static final ThreadLocal<String> THREAD_LOCAL = new ThreadLocal<>();
private static final String SUCCESS = "1";
/**
* 允許當前token續約
*/
private static final Integer CAN_RENEW = 0;
/**
* 記錄token的狀態,0-可以續約,其他情況均不能續約
*/
private static final Map<String, Integer> TOKEN_STATUS = Maps.newConcurrentMap();
private final RedisTemplate<String, String> redisTemplate;
/**
* 釋放鎖,這個方法與 com.example.demo.utils.RedisLockUtil#simpleLock(java.lang.String, java.lang.Long) 配對。
* <p>必須和RedisLockUtil#simpleLock是同一個線程</p>
* @param key 需要釋放鎖的key
* @return true-成功 false-失敗
*/
public boolean releaseSimpleLock(String key) {
String token = THREAD_LOCAL.get();
if (null != token) {
TOKEN_STATUS.put(token, 1);
}
try {
String lua = "if (redis.call('get', KEYS[1]) == ARGV[1]) " +
"then redis.call('expire', KEYS[1], 0) return '1' end " +
"return '0'";
DefaultRedisScript<String> luaScript = new DefaultRedisScript<>(lua, String.class);
String result = redisTemplate.execute(luaScript, Lists.newArrayList(key), token);
log.info("非cluster模式簡單分布式鎖 - 釋放key: {}, result : {}, token : {}", key, result, token);
return SUCCESS.equals(result);
} catch (Exception e) {
log.error("非cluster模式簡單分布式鎖 - 釋放鎖發生異常,key : {}", key, e);
return false;
} finally {
THREAD_LOCAL.remove();
if (null != token) {
TOKEN_STATUS.remove(token);
}
}
}
/**
* 簡單分布式鎖實現,續約周期是 expireTime 的一半。舉個例子, expireTime = 8000,那么鎖續約將會是每 4000 毫秒續約一次
* <p>這個方法不考慮Redis的集群架構,不考慮腦裂問題,當只有一個 Redis來考慮。</p>
* <p>這個方法使用 com.example.demo.utils.RedisLockUtil#releaseSimpleLock(java.lang.String) 來釋放鎖</p>
* @param key 需要上鎖的key
* @param expireTime 過期時間,單位:毫秒
* @return true-成功 false-失敗
*/
public boolean simpleLock(String key, Long expireTime) {
if (StringUtils.isBlank(key)) {
log.warn("非cluster模式簡單分布式鎖 - key is blank");
return false;
}
if (null == expireTime || expireTime <= 0) {
expireTime = 0L;
}
// 續約周期,單位納秒
long renewPeriod = expireTime / 2 * 1000_000;
try {
String token = System.currentTimeMillis() + ":" + UUID.randomUUID();
// 設置鎖
Boolean result = redisTemplate.opsForValue().setIfAbsent(key, token, expireTime, TimeUnit.MILLISECONDS);
if (Boolean.FALSE.equals(result)) {
return false;
}
log.info("非cluster模式簡單分布式鎖 - 上鎖成功,key : {}, token : {}", key, token);
// 上鎖成功后將令牌綁定當前線程
THREAD_LOCAL.set(token);
TOKEN_STATUS.put(token, 0);
if (renewPeriod > 0) {
// 續約任務
renewTask(key, token, expireTime, renewPeriod);
}
return true;
} catch (Exception e) {
log.error("非cluster模式簡單分布式鎖 - 上鎖發生異常,key : {}", key, e);
String token = THREAD_LOCAL.get();
if (StringUtils.isNotBlank(token)) {
if (!releaseSimpleLock(key)) {
log.warn("非cluster模式簡單分布式鎖 - 釋放鎖發生失敗,key : {}, token : {}", key, token);
}
}
return false;
}
}
/**
* 鎖續約任務
* @param key 需要續命的key
* @param token 成功獲鎖的線程持有的令牌
* @param expireTime 過期時間,單位:毫秒
* @param renewPeriod 續約周期,單位:納秒
*/
private void renewTask(String key, String token, long expireTime, long renewPeriod) {
if (CAN_RENEW.equals(TOKEN_STATUS.get(token))) {
EXECUTOR_SERVICE.schedule(() -> {
if (CAN_RENEW.equals(TOKEN_STATUS.get(token))) {
try {
String lua = "if (redis.call('get', KEYS[1]) == ARGV[1]) " +
"then " +
" if (redis.call('set', KEYS[1], ARGV[1], 'PX', ARGV[2])) " +
" then return '1' else return redis.call('get', KEYS[1]) end " +
"end " +
"return redis.call('get', KEYS[1])";
DefaultRedisScript<String> luaScript = new DefaultRedisScript<>(lua, String.class);
String result = redisTemplate.execute(luaScript, Lists.newArrayList(key), token, String.valueOf(expireTime));
if (SUCCESS.equals(result)) {
// 續約成功
log.debug("非cluster模式簡單分布式鎖 - 鎖續約成功,key : {}", key);
// 開啟下一次續約任務
renewTask(key, token, expireTime, renewPeriod);
} else {
// 打印下 result,看下是否因為不再持有令牌導致的續約失敗
log.warn("非cluster模式簡單分布式鎖 - 鎖續約失敗,key : {}, token : {}, result : {}", key, token, result);
}
} catch (Exception e) {
// 這里異常是拋不出去的,所以需要 catch 打印
log.error("非cluster模式簡單分布式鎖 - 鎖續約發生異常,key : {}", key, e);
}
}
}, renewPeriod, TimeUnit.NANOSECONDS);
}
}
}
這里還有一個問題:如果redis.call('get', KEYS[1]) == ARGV[1] 成立,但是執行redis.call('expire', KEYS[1], 0) 失敗,怎么辦?我這里已經執行了THREAD_LOCAL.remove(),想重復釋放是不可能的了,但是我這里不能不 remove 或者僅當 Redis 釋放鎖成功才 remove,這樣存在內存泄漏的風險。要怎么處理呢?
這是優化后的代碼:
package com.example.demo.utils;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author CaiZhuliang
* @date 2023/8/31
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class RedisSimpleLockUtil {
private static final ScheduledExecutorService EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(50,
new BasicThreadFactory.Builder()
.namingPattern("redisSimpleLockUtil-schedule-pool-%d")
.daemon(true)
.build());
private static final ThreadLocal<String> THREAD_LOCAL_TOKEN = new ThreadLocal<>();
private static final String SUCCESS = "1";
/**
* 允許當前token續約
*/
private static final Integer CAN_RENEW = 0;
/**
* 記錄token的狀態,0-可以續約,其他情況均不能續約
*/
private static final Map<String, Integer> TOKEN_STATUS = Maps.newConcurrentMap();
private final RedisTemplate<String, String> redisTemplate;
/**
* 釋放鎖
* <p>必須和 RedisSimpleLockUtil#lock 是同一個線程</p>
* @param key key 需要釋放鎖的key
* @param token 持有的令牌
* @return true-成功 false-失敗
*/
public boolean releaseLock(String key, String token) {
if (StringUtils.isBlank(token)) {
return false;
}
TOKEN_STATUS.put(token, 1);
try {
String lua = "if (redis.call('get', KEYS[1]) == ARGV[1]) " +
"then redis.call('expire', KEYS[1], 0) return '1' end " +
"return '0'";
DefaultRedisScript<String> luaScript = new DefaultRedisScript<>(lua, String.class);
String result = redisTemplate.execute(luaScript, Lists.newArrayList(key), token);
log.info("非cluster模式簡單分布式鎖 - 釋放key: {}, result : {}, token : {}", key, result, token);
if (SUCCESS.equals(result)) {
return true;
}
String remoteToken = redisTemplate.opsForValue().get(key);
if (token.equals(remoteToken)) {
log.warn("非cluster模式簡單分布式鎖 - 釋放鎖失敗,key : {}, token : {}", key, token);
return false;
}
return true;
} catch (Exception e) {
log.error("非cluster模式簡單分布式鎖 - 釋放鎖發生異常,key : {}, token : {}", key, token, e);
return false;
} finally {
THREAD_LOCAL_TOKEN.remove();
TOKEN_STATUS.remove(token);
}
}
/**
* 簡單分布式鎖實現,續約周期是 expireTime 的一半。舉個例子, expireTime = 8000,那么鎖續約將會是每 4000 毫秒續約一次
* <p>這個方法不考慮Redis的集群架構,不考慮腦裂問題,當只有一個Redis來考慮。</p>
* @param key 需要上鎖的key
* @param expireTime 過期時間,單位:毫秒
* @return 上鎖成功返回令牌,失敗則返回空串
*/
public String lock(String key, Long expireTime) {
if (StringUtils.isBlank(key)) {
log.warn("非cluster模式簡單分布式鎖 - key is blank");
return StringUtils.EMPTY;
}
if (null == expireTime || expireTime <= 0) {
expireTime = 0L;
}
// 續約周期,單位納秒
long renewPeriod = expireTime * 500_000;
try {
String token = System.currentTimeMillis() + ":" + UUID.randomUUID();
// 設置鎖
Boolean result = redisTemplate.opsForValue().setIfAbsent(key, token, expireTime, TimeUnit.MILLISECONDS);
if (Boolean.FALSE.equals(result)) {
return StringUtils.EMPTY;
}
log.info("非cluster模式簡單分布式鎖 - 上鎖成功,key : {}, token : {}", key, token);
// 上鎖成功后將令牌綁定當前線程
THREAD_LOCAL_TOKEN.set(token);
TOKEN_STATUS.put(token, 0);
if (renewPeriod > 0) {
// 續約任務
log.info("非cluster模式簡單分布式鎖 - 添加續約任務,key : {}, token : {}, renewPeriod : {}納秒", key, token, renewPeriod);
renewTask(key, token, expireTime, renewPeriod);
}
return token;
} catch (Exception e) {
String token = THREAD_LOCAL_TOKEN.get();
log.error("非cluster模式簡單分布式鎖 - 上鎖發生異常,key : {}, token : {}", key, token, e);
return StringUtils.isBlank(token) ? StringUtils.EMPTY : token;
}
}
/**
* 鎖續約任務
* @param key 需要續命的key
* @param token 成功獲鎖的線程持有的令牌
* @param expireTime 過期時間,單位:毫秒
* @param renewPeriod 續約周期,單位:納秒
*/
private void renewTask(String key, String token, long expireTime, long renewPeriod) {
try {
EXECUTOR_SERVICE.schedule(() -> {
try {
String lua = "if (redis.call('get', KEYS[1]) == ARGV[1]) " +
"then " +
" if (redis.call('set', KEYS[1], ARGV[1], 'PX', ARGV[2])) " +
" then return '1' else return redis.call('get', KEYS[1]) end " +
"end " +
"return redis.call('get', KEYS[1])";
DefaultRedisScript<String> luaScript = new DefaultRedisScript<>(lua, String.class);
String result = redisTemplate.execute(luaScript, Lists.newArrayList(key), token, String.valueOf(expireTime));
if (SUCCESS.equals(result)) {
// 續約成功
log.debug("非cluster模式簡單分布式鎖 - 鎖續約成功,key : {}, token : {}", key, token);
// 這里加判斷是為了減少定時任務
if (CAN_RENEW.equals(TOKEN_STATUS.get(token))) {
// 開啟下一次續約任務
renewTask(key, token, expireTime, renewPeriod);
}
} else {
// 這里加判斷是為了防止誤打印warn日志
if (CAN_RENEW.equals(TOKEN_STATUS.get(token))) {
log.warn("非cluster模式簡單分布式鎖 - 鎖續約失敗,key : {}, token : {}, result : {}", key, token, result);
}
}
} catch (Exception e) {
// 這里異常是拋不出去的,所以需要 catch 打印
log.error("非cluster模式簡單分布式鎖 - 鎖續約發生異常,key : {}, token : {}", key, token, e);
}
}, renewPeriod, TimeUnit.NANOSECONDS);
} catch (Exception e) {
log.error("非cluster模式簡單分布式鎖 - 添加鎖續約任務發生異常,key : {}, token : {}", key, token, e);
}
}
}
package com.example.demo.utils;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author CaiZhuliang
* @date 2023/8/31
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class RedisSimpleLockUtil {
private static final ScheduledExecutorService EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(50,
new BasicThreadFactory.Builder()
.namingPattern("redisSimpleLockUtil-schedule-pool-%d")
.daemon(true)
.build());
private static final ThreadLocal<String> THREAD_LOCAL_TOKEN = new ThreadLocal<>();
private static final String SUCCESS = "1";
/**
* 允許當前token續約
*/
private static final Integer CAN_RENEW = 0;
/**
* 記錄token的狀態,0-可以續約,其他情況均不能續約
*/
private static final Map<String, Integer> TOKEN_STATUS = Maps.newConcurrentMap();
private final RedisTemplate<String, String> redisTemplate;
/**
* 釋放鎖
* <p>必須和 RedisSimpleLockUtil#lock 是同一個線程</p>
* @param key key 需要釋放鎖的key
* @param token 持有的令牌
* @return true-成功 false-失敗
*/
public boolean releaseLock(String key, String token) {
if (StringUtils.isBlank(token)) {
return false;
}
TOKEN_STATUS.put(token, 1);
try {
String lua = "if (redis.call('get', KEYS[1]) == ARGV[1]) " +
"then redis.call('expire', KEYS[1], 0) return '1' end " +
"return '0'";
DefaultRedisScript<String> luaScript = new DefaultRedisScript<>(lua, String.class);
String result = redisTemplate.execute(luaScript, Lists.newArrayList(key), token);
log.info("非cluster模式簡單分布式鎖 - 釋放key: {}, result : {}, token : {}", key, result, token);
if (SUCCESS.equals(result)) {
return true;
}
String remoteToken = redisTemplate.opsForValue().get(key);
if (token.equals(remoteToken)) {
log.warn("非cluster模式簡單分布式鎖 - 釋放鎖失敗,key : {}, token : {}", key, token);
return false;
}
return true;
} catch (Exception e) {
log.error("非cluster模式簡單分布式鎖 - 釋放鎖發生異常,key : {}, token : {}", key, token, e);
return false;
} finally {
THREAD_LOCAL_TOKEN.remove();
TOKEN_STATUS.remove(token);
}
}
/**
* 簡單分布式鎖實現,續約周期是 expireTime 的一半。舉個例子, expireTime = 8000,那么鎖續約將會是每 4000 毫秒續約一次
* <p>這個方法不考慮Redis的集群架構,不考慮腦裂問題,當只有一個Redis來考慮。</p>
* @param key 需要上鎖的key
* @param expireTime 過期時間,單位:毫秒
* @return 上鎖成功返回令牌,失敗則返回空串
*/
public String lock(String key, Long expireTime) {
if (StringUtils.isBlank(key)) {
log.warn("非cluster模式簡單分布式鎖 - key is blank");
return StringUtils.EMPTY;
}
if (null == expireTime || expireTime <= 0) {
expireTime = 0L;
}
// 續約周期,單位納秒
long renewPeriod = expireTime * 500_000;
try {
String token = System.currentTimeMillis() + ":" + UUID.randomUUID();
// 設置鎖
Boolean result = redisTemplate.opsForValue().setIfAbsent(key, token, expireTime, TimeUnit.MILLISECONDS);
if (Boolean.FALSE.equals(result)) {
return StringUtils.EMPTY;
}
log.info("非cluster模式簡單分布式鎖 - 上鎖成功,key : {}, token : {}", key, token);
// 上鎖成功后將令牌綁定當前線程
THREAD_LOCAL_TOKEN.set(token);
TOKEN_STATUS.put(token, 0);
if (renewPeriod > 0) {
// 續約任務
log.info("非cluster模式簡單分布式鎖 - 添加續約任務,key : {}, token : {}, renewPeriod : {}納秒", key, token, renewPeriod);
renewTask(key, token, expireTime, renewPeriod);
}
return token;
} catch (Exception e) {
String token = THREAD_LOCAL_TOKEN.get();
log.error("非cluster模式簡單分布式鎖 - 上鎖發生異常,key : {}, token : {}", key, token, e);
return StringUtils.isBlank(token) ? StringUtils.EMPTY : token;
}
}
/**
* 鎖續約任務
* @param key 需要續命的key
* @param token 成功獲鎖的線程持有的令牌
* @param expireTime 過期時間,單位:毫秒
* @param renewPeriod 續約周期,單位:納秒
*/
private void renewTask(String key, String token, long expireTime, long renewPeriod) {
try {
EXECUTOR_SERVICE.schedule(() -> {
try {
String lua = "if (redis.call('get', KEYS[1]) == ARGV[1]) " +
"then " +
" if (redis.call('set', KEYS[1], ARGV[1], 'PX', ARGV[2])) " +
" then return '1' else return redis.call('get', KEYS[1]) end " +
"end " +
"return redis.call('get', KEYS[1])";
DefaultRedisScript<String> luaScript = new DefaultRedisScript<>(lua, String.class);
String result = redisTemplate.execute(luaScript, Lists.newArrayList(key), token, String.valueOf(expireTime));
if (SUCCESS.equals(result)) {
// 續約成功
log.debug("非cluster模式簡單分布式鎖 - 鎖續約成功,key : {}, token : {}", key, token);
// 這里加判斷是為了減少定時任務
if (CAN_RENEW.equals(TOKEN_STATUS.get(token))) {
// 開啟下一次續約任務
renewTask(key, token, expireTime, renewPeriod);
}
} else {
// 這里加判斷是為了防止誤打印warn日志
if (CAN_RENEW.equals(TOKEN_STATUS.get(token))) {
log.warn("非cluster模式簡單分布式鎖 - 鎖續約失敗,key : {}, token : {}, result : {}", key, token, result);
}
}
} catch (Exception e) {
// 這里異常是拋不出去的,所以需要 catch 打印
log.error("非cluster模式簡單分布式鎖 - 鎖續約發生異常,key : {}, token : {}", key, token, e);
}
}, renewPeriod, TimeUnit.NANOSECONDS);
} catch (Exception e) {
log.error("非cluster模式簡單分布式鎖 - 添加鎖續約任務發生異常,key : {}, token : {}", key, token, e);
}
}
}
下面是并發單元測試代碼:
@Test
public void concurrencyTest() {
String[] nums = {"1", "2", "3", "4", "5"};
List<CompletableFuture<Void>> list = Lists.newArrayListWithExpectedSize(100);
for (int i = 0; i < 50; i++) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
for (int count = 0; count < 10; count++) {
int random = new Random().nextInt(100) % 5;
String key = "test_" + nums[random];
while (true) {
String token = redisSimpleLockUtil.lock(key, 3_000L);
if (StringUtils.isNotBlank(token)) {
log.info("concurrencyTest - key : {}", key);
try {
Thread.sleep(new Random().nextInt(1500));
} catch (Exception e) {
log.error("concurrencyTest - 發生異常, key : {}", key, e);
} finally {
boolean unlock = redisSimpleLockUtil.releaseLock(key, token);
if (!unlock) {
log.error("concurrencyTest - 釋放鎖失敗,key : {}", key);
}
}
break;
}
}
}
});
list.add(future);
}
CompletableFuture<?>[] futures = new CompletableFuture[list.size()];
list.toArray(futures);
CompletableFuture.allOf(futures).join();
}
3.2 紅鎖
一般公司使用Redis 時都不可能是單節點的,要么主從+哨兵架構,要么就是 cluster 架構。面對集群,我們不得不思考如何應對腦裂這個問題。而 Redlock 是Redis官方網站給出的解決方案。
下面看下針對這兩種集群架構的處理方式:
- 主從+哨兵
通過訪問哨兵獲取當前 master 節點,統計票數,超過半數的 master 節點就是真的 master。我們可以對比我們成功上鎖的節點是否是真的 master node,從而避免腦裂問題。
- cluster
- 上鎖需要在集群中半數以上的 master 操作成功了才算成功。
3.2.1 紅鎖的問題
紅鎖通過過半原則來規避腦裂,但是這就讓我們不得不考慮訪問節點的等待超時時間應該要多長。而且,也會降低Redis 分布式鎖的吞吐量。如果有半數節點不可用,那么分布式鎖也將變得不可用。因此,實際使用中,我們還要結合自己實際的業務場景來權衡要不要用紅鎖或者修改實現方案。
作者介紹
蔡柱梁,51CTO社區編輯,從事Java后端開發8年,做過傳統項目廣電BOSS系統,后投身互聯網電商,負責過訂單,TMS,中間件等。