實(shí)戰(zhàn)案例:如何防止超預(yù)期的高并發(fā)流量壓垮系統(tǒng)?實(shí)戰(zhàn)接口限流防護(hù)!代碼已上傳!
在互聯(lián)網(wǎng)應(yīng)用中,高并發(fā)系統(tǒng)會(huì)面臨一個(gè)重大的挑戰(zhàn),那就是大量流高并發(fā)訪問(wèn),比如:天貓的雙十一、京東618、秒殺、搶購(gòu)促銷等,這些都是典型的大流量高并發(fā)場(chǎng)景。
HTTP接口限流實(shí)戰(zhàn)
這里,我們實(shí)現(xiàn)Web接口限流,具體方式為:使用自定義注解封裝基于令牌桶限流算法實(shí)現(xiàn)接口限流。
不使用注解實(shí)現(xiàn)接口限流
搭建項(xiàng)目
這里,我們使用SpringBoot項(xiàng)目來(lái)搭建Http接口限流項(xiàng)目,SpringBoot項(xiàng)目本質(zhì)上還是一個(gè)Maven項(xiàng)目。所以,小伙伴們可以直接創(chuàng)建一個(gè)Maven項(xiàng)目,我這里的項(xiàng)目名稱為mykit-ratelimiter-test。接下來(lái),在pom.xml文件中添加如下依賴使項(xiàng)目構(gòu)建為一個(gè)SpringBoot項(xiàng)目。
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.3.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>io.mykit.limiter</groupId>
<artifactId>mykit-ratelimiter-test</artifactId>
<version>1.0.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>mykit-ratelimiter-test</name>
<properties>
<guava.version>28.2-jre</guava.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-undertow</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version><!--$NO-MVN-MAN-VER$-->
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</build>
可以看到,我在項(xiàng)目中除了引用了SpringBoot相關(guān)的Jar包外,還引用了guava框架,版本為28.2-jre。
創(chuàng)建核心類
這里,我主要是模擬一個(gè)支付接口的限流場(chǎng)景。首先,我們定義一個(gè)PayService接口和MessageService接口。PayService接口主要用于模擬后續(xù)的支付業(yè)務(wù),MessageService接口模擬發(fā)送消息。接口的定義分別如下所示。
- PayService
package io.mykit.limiter.service;
import java.math.BigDecimal;
/**
* @author binghe
* @version 1.0.0
* @description 模擬支付
*/
public interface PayService {
int pay(BigDecimal price);
}
- MessageService
package io.mykit.limiter.service;
/**
* @author binghe
* @version 1.0.0
* @description 模擬發(fā)送消息服務(wù)
*/
public interface MessageService {
boolean sendMessage(String message);
}
接下來(lái),創(chuàng)建二者的實(shí)現(xiàn)類,分別如下。
- MessageServiceImpl
package io.mykit.limiter.service.impl;
import io.mykit.limiter.service.MessageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
/**
* @author binghe
* @version 1.0.0
* @description 模擬實(shí)現(xiàn)發(fā)送消息
*/
@Service
publicclass MessageServiceImpl implements MessageService {
privatefinal Logger logger = LoggerFactory.getLogger(MessageServiceImpl.class);
@Override
public boolean sendMessage(String message) {
logger.info("發(fā)送消息成功===>>" + message);
returntrue;
}
}
- PayServiceImpl
package io.mykit.limiter.service.impl;
import io.mykit.limiter.service.PayService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
/**
* @author binghe
* @version 1.0.0
* @description 模擬支付
*/
@Service
publicclass PayServiceImpl implements PayService {
privatefinal Logger logger = LoggerFactory.getLogger(PayServiceImpl.class);
@Override
public int pay(BigDecimal price) {
logger.info("支付成功===>>" + price);
return1;
}
}
由于是模擬支付和發(fā)送消息,所以,我在具體實(shí)現(xiàn)的方法中打印出了相關(guān)的日志,并沒(méi)有實(shí)現(xiàn)具體的業(yè)務(wù)邏輯。
接下來(lái),就是創(chuàng)建我們的Controller類PayController,在PayController類的接口pay()方法中使用了限流,每秒鐘向桶中放入2個(gè)令牌,并且客戶端從桶中獲取令牌,如果在500毫秒內(nèi)沒(méi)有獲取到令牌的話,我們可以則直接走服務(wù)降級(jí)處理。
PayController的代碼如下所示。
package io.mykit.limiter.controller;
import com.google.common.util.concurrent.RateLimiter;
import io.mykit.limiter.service.MessageService;
import io.mykit.limiter.service.PayService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.math.BigDecimal;
import java.util.concurrent.TimeUnit;
/**
* @author binghe
* @version 1.0.0
* @description 測(cè)試接口限流
*/
@RestController
publicclass PayController {
privatefinal Logger logger = LoggerFactory.getLogger(PayController.class);
/**
* RateLimiter的create()方法中傳入一個(gè)參數(shù),表示以固定的速率2r/s,即以每秒2個(gè)令牌的速率向桶中放入令牌
*/
private RateLimiter rateLimiter = RateLimiter.create(2);
@Autowired
private MessageService messageService;
@Autowired
private PayService payService;
@RequestMapping("/boot/pay")
public String pay(){
//記錄返回接口
String result = "";
//限流處理,客戶端請(qǐng)求從桶中獲取令牌,如果在500毫秒沒(méi)有獲取到令牌,則直接走服務(wù)降級(jí)處理
boolean tryAcquire = rateLimiter.tryAcquire(500, TimeUnit.MILLISECONDS);
if (!tryAcquire){
result = "請(qǐng)求過(guò)多,降級(jí)處理";
logger.info(result);
return result;
}
int ret = payService.pay(BigDecimal.valueOf(100.0));
if(ret > 0){
result = "支付成功";
return result;
}
result = "支付失敗,再試一次吧...";
return result;
}
}
最后,我們來(lái)創(chuàng)建mykit-ratelimiter-test項(xiàng)目的核心啟動(dòng)類,如下所示。
package io.mykit.limiter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author binghe
* @version 1.0.0
* @description 項(xiàng)目啟動(dòng)類
*/
@SpringBootApplication
public class MykitLimiterApplication {
public static void main(String[] args){
SpringApplication.run(MykitLimiterApplication.class, args);
}
}
至此,我們不使用注解方式實(shí)現(xiàn)限流的Web應(yīng)用就基本完成了。
運(yùn)行項(xiàng)目
項(xiàng)目創(chuàng)建完成后,我們來(lái)運(yùn)行項(xiàng)目,運(yùn)行SpringBoot項(xiàng)目比較簡(jiǎn)單,直接運(yùn)行MykitLimiterApplication類的main()方法即可。
項(xiàng)目運(yùn)行成功后,我們?cè)跒g覽器地址欄輸入鏈接:http://localhost:8080/boot/pay。頁(yè)面會(huì)輸出“支付成功”的字樣,說(shuō)明項(xiàng)目搭建成功了。如下所示。
此時(shí),我只訪問(wèn)了一次,并沒(méi)有觸發(fā)限流。接下來(lái),我們不停的刷瀏覽器,此時(shí),瀏覽器會(huì)輸出“支付失敗,再試一次吧...”的字樣,如下所示。
在PayController類中還有一個(gè)sendMessage()方法,模擬的是發(fā)送消息的接口,同樣使用了限流操作,具體代碼如下所示。
@RequestMapping("/boot/send/message")
public String sendMessage(){
//記錄返回接口
String result = "";
//限流處理,客戶端請(qǐng)求從桶中獲取令牌,如果在500毫秒沒(méi)有獲取到令牌,則直接走服務(wù)降級(jí)處理
boolean tryAcquire = rateLimiter.tryAcquire(500, TimeUnit.MILLISECONDS);
if (!tryAcquire){
result = "請(qǐng)求過(guò)多,降級(jí)處理";
logger.info(result);
return result;
}
boolean flag = messageService.sendMessage("恭喜您成長(zhǎng)值+1");
if (flag){
result = "消息發(fā)送成功";
return result;
}
result = "消息發(fā)送失敗,再試一次吧...";
return result;
}
sendMessage()方法的代碼邏輯和運(yùn)行效果與pay()方法相同,我就不再瀏覽器訪問(wèn) http://localhost:8080/boot/send/message 地址的訪問(wèn)效果了,小伙伴們可以自行驗(yàn)證。
不使用注解實(shí)現(xiàn)限流缺點(diǎn)
通過(guò)對(duì)項(xiàng)目的編寫(xiě),我們可以發(fā)現(xiàn),當(dāng)在項(xiàng)目中對(duì)接口進(jìn)行限流時(shí),不使用注解進(jìn)行開(kāi)發(fā),會(huì)導(dǎo)致代碼出現(xiàn)大量冗余,每個(gè)方法中幾乎都要寫(xiě)一段相同的限流邏輯,代碼十分冗余。
如何解決代碼冗余的問(wèn)題呢?我們可以使用自定義注解進(jìn)行實(shí)現(xiàn)。
使用注解實(shí)現(xiàn)接口限流
使用自定義注解,我們可以將一些通用的業(yè)務(wù)邏輯封裝到注解的切面中,在需要添加注解業(yè)務(wù)邏輯的方法上加上相應(yīng)的注解即可。針對(duì)我們這個(gè)限流的實(shí)例來(lái)說(shuō),可以基于自定義注解實(shí)現(xiàn)。
實(shí)現(xiàn)自定義注解
實(shí)現(xiàn),我們來(lái)創(chuàng)建一個(gè)自定義注解,如下所示。
package io.mykit.limiter.annotation;
import java.lang.annotation.*;
/**
* @author binghe
* @version 1.0.0
* @description 實(shí)現(xiàn)限流的自定義注解
*/
@Target(value = ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MyRateLimiter {
//向令牌桶放入令牌的速率
double rate();
//從令牌桶獲取令牌的超時(shí)時(shí)間
long timeout() default 0;
}
自定義注解切面實(shí)現(xiàn)
接下來(lái),我們還要實(shí)現(xiàn)一個(gè)切面類MyRateLimiterAspect,如下所示。
package io.mykit.limiter.aspect;
import com.google.common.util.concurrent.RateLimiter;
import io.mykit.limiter.annotation.MyRateLimiter;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.concurrent.TimeUnit;
/**
* @author binghe
* @version 1.0.0
* @description 一般限流切面類
*/
@Aspect
@Component
publicclass MyRateLimiterAspect {
private RateLimiter rateLimiter = RateLimiter.create(2);
@Pointcut("execution(public * io.mykit.limiter.controller.*.*(..))")
public void pointcut(){
}
/**
* 核心切面方法
*/
@Around("pointcut()")
public Object process(ProceedingJoinPoint proceedingJoinPoint) throws Throwable{
MethodSignature signature = (MethodSignature) proceedingJoinPoint.getSignature();
//使用反射獲取方法上是否存在@MyRateLimiter注解
MyRateLimiter myRateLimiter = signature.getMethod().getDeclaredAnnotation(MyRateLimiter.class);
if(myRateLimiter == null){
//程序正常執(zhí)行,執(zhí)行目標(biāo)方法
return proceedingJoinPoint.proceed();
}
//獲取注解上的參數(shù)
//獲取配置的速率
double rate = myRateLimiter.rate();
//獲取客戶端等待令牌的時(shí)間
long timeout = myRateLimiter.timeout();
//設(shè)置限流速率
rateLimiter.setRate(rate);
//判斷客戶端獲取令牌是否超時(shí)
boolean tryAcquire = rateLimiter.tryAcquire(timeout, TimeUnit.MILLISECONDS);
if(!tryAcquire){
//服務(wù)降級(jí)
fullback();
returnnull;
}
//獲取到令牌,直接執(zhí)行
return proceedingJoinPoint.proceed();
}
/**
* 降級(jí)處理
*/
private void fullback() {
response.setHeader("Content-type", "text/html;charset=UTF-8");
PrintWriter writer = null;
try {
writer = response.getWriter();
writer.println("出錯(cuò)了,重試一次試試?");
writer.flush();;
} catch (IOException e) {
e.printStackTrace();
}finally {
if(writer != null){
writer.close();
}
}
}
}
接下來(lái),我們改造下PayController類中的sendMessage()方法,修改后的方法片段代碼如下所示。
@MyRateLimiter(rate = 1.0, timeout = 500)
@RequestMapping("/boot/send/message")
public String sendMessage(){
//記錄返回接口
String result = "";
boolean flag = messageService.sendMessage("恭喜您成長(zhǎng)值+1");
if (flag){
result = "消息發(fā)送成功";
return result;
}
result = "消息發(fā)送失敗,再試一次吧...";
return result;
}
運(yùn)行部署項(xiàng)目
部署項(xiàng)目比較簡(jiǎn)單,只需要運(yùn)行MykitLimiterApplication類下的main()方法即可。這里,為了簡(jiǎn)單,我們還是從瀏覽器中直接輸入鏈接地址來(lái)進(jìn)行訪問(wèn)。
效果如下所示。
接下來(lái),我們不斷的刷新瀏覽器。會(huì)出現(xiàn)“消息發(fā)送失敗,再試一次吧..”的字樣,說(shuō)明已經(jīng)觸發(fā)限流操作。
基于限流算法實(shí)現(xiàn)限流的缺點(diǎn)
上面介紹的限流方式都只能用于單機(jī)部署的環(huán)境中,如果將應(yīng)用部署到多臺(tái)服務(wù)器進(jìn)行分布式、集群,則上面限流的方式就不適用了,此時(shí),我們需要使用分布式限流。至于在分布式場(chǎng)景下,如何實(shí)現(xiàn)限流操作,我們就在下一篇中進(jìn)行介紹。