阿里面試:說說自適應限流?
限流想必大家都不陌生,它是一種控制資源訪問速率的策略,用于保護系統免受過載和崩潰的風險。限流可以控制某個服務、接口或系統在一段時間內能夠處理的請求或數據量,以防止系統資源耗盡、性能下降或服務不可用。
常見的限流策略有以下幾種:
- 令牌桶算法:基于令牌桶的方式,限制每個單位時間內允許通過的請求量,請求量超出限制的將被拒絕或等待。
- 漏桶算法:基于漏桶的方式,限制系統處理請求的速率,請求速率過快時將被限制或拒絕。
- 計數器算法:通過計數器記錄單位時間內的請求次數,并根據設定的閾值進行限制。
通過合理的限流策略,可以保護系統免受惡意攻擊、突發流量和資源濫用的影響,確保系統穩定和可靠運行。在實際應用中,限流常用于接口防刷、防止 DDoS 攻擊、保護關鍵服務等場景。
1.限流實現
在 Java 中,限流的實現方式有很多種,例如以下這些:
- 單機限流:使用 JUC 下的 Semaphore 限流,或一些常用的框架,例如 Google 的 Guava 框架進行限流,但這種限流方式都是基于 JVM 層面的內存級別的單臺機器限流。
- 組件限流:單機限流往往不適用于分布式系統,而分布式系統可以通過組件 Sentinel、Hystrix 對整個集群進行限流。
- 反向代理限流(Nginx 限流):通常在網關層的上游,我們會使用 Nginx(反向代理)一起來配合使用,也就是用戶請求會先到 Nginx(或 Nginx 集群),然后再將請求轉發給網關,網關再調用其他的微服務,從而實現整個流程的請求調用,因此 Nginx 限流也是分布式系統中常用的限流手段。
2.自適應限流
所謂的自適應限流是結合應用的 Load、CPU 使用率、總體平均 RT、入口 QPS 和并發線程數等幾個維度的監控指標,通過自適應的流控策略,讓系統的入口流量和系統的負載達到一個平衡,讓系統盡可能跑在最大吞吐量的同時保證系統整體的穩定性。
類似的實現思路還有很多,如,自適應自旋鎖、還有 K8S 中根據負載進行動態擴容等。
3.實現思路
以 Sentinel 中的自適應限流來說,它的實現思路是用負載(load1)作為啟動控制流量的值,而允許通過的流量由處理請求的能力,即請求的響應時間以及當前系統正在處理的請求速率來決定。
為什么要這樣設計?
長期以來,系統自適應保護的思路是根據硬指標,即系統的負載 (load1) 來做系統過載保護。當系統負載高于某個閾值,就禁止或者減少流量的進入;當 load 開始好轉,則恢復流量的進入。這個思路給我們帶來了不可避免的兩個問題:
- load 是一個“果”,如果根據 load 的情況來調節流量的通過率,那么就始終有延遲性。也就意味著通過率的任何調整,都會過一段時間才能看到效果。當前通過率是使 load 惡化的一個動作,那么也至少要過 1 秒之后才能觀測到;同理,如果當前通過率調整是讓 load 好轉的一個動作,也需要 1 秒之后才能繼續調整,這樣就浪費了系統的處理能力。所以我們看到的曲線,總是會有抖動。
- 恢復慢。想象一下這樣的一個場景(真實),出現了這樣一個問題,下游應用不可靠,導致應用 RT 很高,從而 load 到了一個很高的點。過了一段時間之后下游應用恢復了,應用 RT 也相應減少。這個時候,其實應該大幅度增大流量的通過率;但是由于這個時候 load 仍然很高,通過率的恢復仍然不高。
TCP BBR 的思想給了我們一個很大的啟發。我們應該根據系統能夠處理的請求,和允許進來的請求,來做平衡,而不是根據一個間接的指標(系統 load)來做限流。最終我們追求的目標是 在系統不被拖垮的情況下,提高系統的吞吐率,而不是 load 一定要到低于某個閾值。如果我們還是按照固有的思維,超過特定的 load 就禁止流量進入,系統 load 恢復就放開流量,這樣做的結果是無論我們怎么調參數,調比例,都是按照果來調節因,都無法取得良好的效果。 所以,Sentinel 在系統自適應限流的做法是,用 load1 作為啟動控制流量的值,而允許通過的流量由處理請求的能力,即請求的響應時間以及當前系統正在處理的請求速率來決定。
4.支持規則
Sentinel 是從單臺機器的總體 Load、RT、入口 QPS 和線程數四個維度監控應用數據,讓系統盡可能跑在最大吞吐量的同時保證系統整體的穩定性。
系統保護規則是應用整體維度的,而不是資源維度的,并且僅對入口流量生效。入口流量指的是進入應用的流量(EntryType.IN),比如 Web 服務或 Dubbo 服務端接收的請求,都屬于入口流量。
注意:系統規則只對入口流量起作用(調用類型為 EntryType.IN),對出口流量無效。可通過 SphU.entry(res, entryType) 指定調用類型,如果不指定,默認是 EntryType.OUT。
Sentinel 支持以下的閾值規則:
- Load(僅對 Linux/Unix-like 機器生效):當系統 load1 超過閾值,且系統當前的并發線程數超過系統容量時才會觸發系統保護。系統容量由系統的 maxQps * minRt 計算得出。設定參考值一般是 CPU cores * 2.5。
- CPU usage(1.5.0+ 版本):當系統 CPU 使用率超過閾值即觸發系統保護(取值范圍 0.0-1.0)。
- RT:當單臺機器上所有入口流量的平均 RT 達到閾值即觸發系統保護,單位是毫秒。
- 線程數:當單臺機器上所有入口流量的并發線程數達到閾值即觸發系統保護。
- 入口 QPS:當單臺機器上所有入口流量的 QPS 達到閾值即觸發系統保護。
5.設置自適應限流
在 Sentinel 中,可以通過系統規則 -> 新增系統規則,設置閾值以實現自適應限流功能,如下圖所示:
6.原理分析
先用經典圖來鎮樓:
我們把系統處理請求的過程想象為一個水管,到來的請求是往這個水管灌水,當系統處理順暢的時候,請求不需要排隊,直接從水管中穿過,這個請求的RT是最短的;反之,當請求堆積的時候,那么處理請求的時間則會變為:排隊時間 + 最短處理時間。
推論一:如果我們能夠保證水管里的水量,能夠讓水順暢的流動,則不會增加排隊的請求;也就是說,這個時候的系統負載不會進一步惡化。
我們用 T 來表示(水管內部的水量),用 RT 來表示請求的處理時間,用P來表示進來的請求數,那么一個請求從進入水管道到從水管出來,這個水管會存在 P * RT 個請求。換一句話來說,當 T ≈ QPS * Avg(RT) 的時候,我們可以認為系統的處理能力和允許進入的請求個數達到了平衡,系統的負載不會進一步惡化。
接下來的問題是,水管的水位是可以達到了一個平衡點,但是這個平衡點只能保證水管的水位不再繼續增高,但是還面臨一個問題,就是在達到平衡點之前,這個水管里已經堆積了多少水。如果之前水管的水已經在一個量級了,那么這個時候系統允許通過的水量可能只能緩慢通過,RT 會大,之前堆積在水管里的水會滯留;反之,如果之前的水管水位偏低,那么又會浪費了系統的處理能力。
推論二:當保持入口的流量使水管出來的流量達到最大值的時候,可以最大利用水管的處理能力。
然而,和 TCP BBR 的不一樣的地方在于,還需要用一個系統負載的值(load1)來激發這套機制啟動。
注:這種系統自適應算法對于低 load 的請求,它的效果是一個“兜底”的角色。對于不是應用本身造成的 load 高的情況(如其它進程導致的不穩定的情況),效果不明顯。
7.實現代碼
以 Sentinel 官方提供的自適應限流代碼為例,我們可以再來了解一下它的具體使用:
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.demo.system;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.alibaba.csp.sentinel.util.TimeUtil;
import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.system.SystemRule;
import com.alibaba.csp.sentinel.slots.system.SystemRuleManager;
/**
* @author jialiang.linjl
*/
public class SystemGuardDemo {
private static AtomicInteger pass = new AtomicInteger();
private static AtomicInteger block = new AtomicInteger();
private static AtomicInteger total = new AtomicInteger();
private static volatile boolean stop = false;
private static final int threadCount = 100;
private static int seconds = 60 + 40;
public static void main(String[] args) throws Exception {
tick();
initSystemRule();
for (int i = 0; i < threadCount; i++) {
Thread entryThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
Entry entry = null;
try {
entry = SphU.entry("methodA", EntryType.IN);
pass.incrementAndGet();
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
// ignore
}
} catch (BlockException e1) {
block.incrementAndGet();
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
// ignore
}
} catch (Exception e2) {
// biz exception
} finally {
total.incrementAndGet();
if (entry != null) {
entry.exit();
}
}
}
}
});
entryThread.setName("working-thread");
entryThread.start();
}
}
private static void initSystemRule() {
SystemRule rule = new SystemRule();
// max load is 3
rule.setHighestSystemLoad(3.0);
// max cpu usage is 60%
rule.setHighestCpuUsage(0.6);
// max avg rt of all request is 10 ms
rule.setAvgRt(10);
// max total qps is 20
rule.setQps(20);
// max parallel working thread is 10
rule.setMaxThread(10);
SystemRuleManager.loadRules(Collections.singletonList(rule));
}
private static void tick() {
Thread timer = new Thread(new TimerTask());
timer.setName("sentinel-timer-task");
timer.start();
}
static class TimerTask implements Runnable {
@Override
public void run() {
System.out.println("begin to statistic!!!");
long oldTotal = 0;
long oldPass = 0;
long oldBlock = 0;
while (!stop) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
long globalTotal = total.get();
long oneSecondTotal = globalTotal - oldTotal;
oldTotal = globalTotal;
long globalPass = pass.get();
long oneSecondPass = globalPass - oldPass;
oldPass = globalPass;
long globalBlock = block.get();
long oneSecondBlock = globalBlock - oldBlock;
oldBlock = globalBlock;
System.out.println(seconds + ", " + TimeUtil.currentTimeMillis() + ", total:"
+ oneSecondTotal + ", pass:"
+ oneSecondPass + ", block:" + oneSecondBlock);
if (seconds-- <= 0) {
stop = true;
}
}
System.exit(0);
}
}
}