場景問題及原因
緩存穿透:
原因:客戶端請求的數據在緩存和數據庫中不存在,這樣緩存永遠不會生效,請求全部打入數據庫,造成數據庫連接異常。
解決思路:
- 緩存空對象
- 對于不存在的數據也在Redis建立緩存,值為空,并設置一個較短的TTL時間問題:實現簡單,維護方便,但短期的數據不一致問題
緩存雪崩:
原因:在同一時段大量的緩存key同時失效或者Redis服務宕機,導致大量請求到達數據庫,帶來巨大壓力。
解決思路:給不同的Key的TTL添加隨機值(簡單),給緩存業務添加降級限流策略(復雜),給業務添加多級緩存(復雜)
緩存擊穿(熱點Key):
前提條件:熱點Key&在某一時段被高并發訪問&緩存重建耗時較長
原因:熱點key突然過期,因為重建耗時長,在這段時間內大量請求落到數據庫,帶來巨大沖擊
解決思路:
- 互斥鎖
- 給緩存重建過程加鎖,確保重建過程只有一個線程執行,其它線程等待問題:線程阻塞,導致性能下降且有死鎖風險
- 邏輯過期
- 熱點key緩存永不過期,而是設置一個邏輯過期時間,查詢到數據時通過對邏輯過期時間判斷,來決定是否需要重建緩存;重建緩存也通過互斥鎖保證單線程執行,但是重建緩存利用獨立線程異步執行,其它線程無需等待,直接查詢到的舊數據即可問題:不保證一致性,有額外內存消耗且實現復雜
場景問題實踐解決
完整代碼地址:https://github.com/xbhog/hm-dianping
分支:20221221-xbhog-cacheBrenkdown
分支:20230110-xbhog-Cache_Penetration_Avalance
緩存穿透:

代碼實現:
12345678910111213141516171819202122public Shop queryWithPassThrough(Long id){
//從redis查詢商鋪信息
String shopInfo = stringRedisTemplate.opsForValue().get(SHOP_CACHE_KEY + id);
//命中緩存,返回店鋪信息
if(StrUtil.isNotBlank(shopInfo)){
return JSONUtil.toBean(shopInfo, Shop.class);
}
//redis既沒有key的緩存,但查出來信息不為null,則為空字符串
if(shopInfo != null){
return null;
}
//未命中緩存
Shop shop = getById(id);
if(Objects.isNull(shop)){
//將null添加至緩存,過期時間減少
stringRedisTemplate.opsForValue().set(SHOP_CACHE_KEY+id,"",5L, TimeUnit.MINUTES);
return null;
}
//對象轉字符串
stringRedisTemplate.opsForValue().set(SHOP_CACHE_KEY+id,JSONUtil.toJsonStr(shop),30L, TimeUnit.MINUTES);
return shop;
}
上述流程圖和代碼非常清晰,由于緩存雪崩簡單實現(復雜實踐不會)增加隨機TTL值,緩存穿透和緩存雪崩不過多解釋。
緩存擊穿:
緩存擊穿邏輯分析:

首先線程1在查詢緩存時未命中,然后進行查詢數據庫并重建緩存。注意上述緩存擊穿發生的條件,被高并發訪問&緩存重建耗時較長;
由于緩存重建耗時較長,在這時間穿插線程2,3,4進入;那么這些線程都不能從緩存中查詢到數據,同一時間去訪問數據庫,同時的去執行數據庫操作代碼,對數據庫訪問壓力過大。
互斥鎖:
解決方式:加鎖;****可以采用**tryLock方法 + double check**來解決這樣的問題

在線程2執行的時候,由于線程1加鎖在重建緩存,所以線程2被阻塞,休眠等待線程1執行完成后查詢緩存。由此造成在重建緩存的時候阻塞進程,效率下降且有死鎖的風險。
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455private Shop queryWithMutex(Long id) {
//從redis查詢商鋪信息
String shopInfo = stringRedisTemplate.opsForValue().get(SHOP_CACHE_KEY + id);
//命中緩存,返回店鋪信息
if(StrUtil.isNotBlank(shopInfo)){
return JSONUtil.toBean(shopInfo, Shop.class);
}
//redis既沒有key的緩存,但查出來信息不為null,則為空字符串
if(shopInfo != null){
return null;
}
//實現緩存重建
String lockKey = "lock:shop:"+id;
Shop shop = null;
try {
Boolean aBoolean = tryLock(lockKey);
if(!aBoolean){
//加鎖失敗,休眠
Thread.sleep(50);
//遞歸等待
return queryWithMutex(id);
}
//獲取鎖成功應該再次檢測redis緩存是否還存在,做doubleCheck,如果存在則無需重建緩存。
synchronized (this){
//從redis查詢商鋪信息
String shopInfoTwo = stringRedisTemplate.opsForValue().get(SHOP_CACHE_KEY + id);
//命中緩存,返回店鋪信息
if(StrUtil.isNotBlank(shopInfoTwo)){
return JSONUtil.toBean(shopInfoTwo, Shop.class);
}
//redis既沒有key的緩存,但查出來信息不為null,則為“”
if(shopInfoTwo != null){
return null;
}
//未命中緩存
shop = getById(id);
// 5.不存在,返回錯誤
if(Objects.isNull(shop)){
//將null添加至緩存,過期時間減少
stringRedisTemplate.opsForValue().set(SHOP_CACHE_KEY+id,"",5L, TimeUnit.MINUTES);
return null;
}
//模擬重建的延時
Thread.sleep(200);
//對象轉字符串
stringRedisTemplate.opsForValue().set(SHOP_CACHE_KEY+id,JSONUtil.toJsonStr(shop),30L, TimeUnit.MINUTES);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
unLock(lockKey);
}
return shop;
}
在獲取鎖失敗時,證明已有線程在重建緩存,使當前線程休眠并重試(遞歸實現)。
代碼中需要注意的是synchronized關鍵字的使用,在獲取到鎖的時候,在判斷下緩存是否存在(失效)double-check,該關鍵字鎖的是當前對象。在其關鍵字{}中是同步處理。
推薦博客:https://blog.csdn.net/u013142781/article/details/51697672
然后進行測試代碼,進行壓力測試(jmeter),首先去除緩存中的值,模擬緩存失效。
設置1000個線程,多線程執行間隔5s。


所有的請求都是成功的,其qps大約在200,其吞吐量還是比較可觀的。然后看下緩存是否成功(只查詢一次數據庫);

邏輯過期:
思路分析:
當用戶開始查詢redis時,判斷是否命中,如果沒有命中則直接返回空數據,不查詢數據庫,而一旦命中后,將value取出,判斷value中的過期時間是否滿足,如果沒有過期,則直接返回redis中的數據,如果過期,則在開啟獨立線程后直接返回之前的數據,獨立線程去重構數據,重構完成后釋放互斥鎖。

封裝數據:這里我們采用新建實體類來實現
12345678910/**
* @author xbhog
* @describe:
* @date
@Data
public class RedisData {
private LocalDateTime expireTime;
private Object data;
}
使得過期時間和數據有關聯關系,這里的數據類型是Object,方便后續不同類型的封裝。
123456789101112131415161718192021222324252627282930313233343536373839public Shop queryWithLogicalExpire( Long id ) {
String key = CACHE_SHOP_KEY + id;
// 1.從redis查詢商鋪緩存
String json = stringRedisTemplate.opsForValue().get(key);
// 2.判斷是否存在
if (StrUtil.isBlank(json)) {
// 3.存在,直接返回
return null;
}
// 4.命中,需要先把json反序列化為對象
RedisData redisData = JSONUtil.toBean(json, RedisData.class);
Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);
LocalDateTime expireTime = redisData.getExpireTime();
// 5.判斷是否過期
if(expireTime.isAfter(LocalDateTime.now())) {
// 5.1.未過期,直接返回店鋪信息
return shop;
}
// 5.2.已過期,需要緩存重建
// 6.緩存重建
// 6.1.獲取互斥鎖
String lockKey = LOCK_SHOP_KEY + id;
boolean isLock = tryLock(lockKey);
// 6.2.判斷是否獲取鎖成功
if (isLock){
exectorPool().execute(() -> {
try {
//重建緩存
this.saveShop2Redis(id, 20L);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
unLock(lockKey);
}
});
}
// 6.4.返回過期的商鋪信息
return shop;
}
當前的執行流程跟互斥鎖基本相同,需要注意的是,在獲取鎖成功后,我們將緩存重建放到線程池中執行,來異步實現。
線程池代碼:
12345678910111213141516/**
* 線程池的創建
* @return
*/
private static ThreadPoolExecutor exectorPool(){
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5,
//根據自己的處理器數量+1
Runtime.getRuntime().availableProcessors()+1,
2L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
return executor;
}
緩存重建代碼:
1234567891011121314/**
* 重建緩存
* @param id 重建ID
* @param l 過期時間
*/
public void saveShop2Redis(Long id, long l){
//查詢店鋪信息
Shop shop = getById(id);
//封裝邏輯過期時間
RedisData redisData = new RedisData();
redisData.setData(shop);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(l));
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY+id,JSONUtil.toJsonStr(redisData));
}
測試條件:100線程,1s線程間隔時間,緩存失效時間10s。
測試環境:緩存中存在對應的數據,并且在緩存快失效之前修改數據庫中的數據,造成緩存與數據庫不一致,通過執行壓測,來查看相關線程返回的數據情況。


從上述兩張圖中可以看到,在前幾個線程執行過程中店鋪name為102,當執行時間從19-20的時候店鋪name發生變化為105,滿足邏輯過期異步執行緩存重建的需求.?