Spring Boot異步請(qǐng)求和異步調(diào)用,一文搞定!
一、Spring Boot中異步請(qǐng)求的使用
1、異步請(qǐng)求與同步請(qǐng)求
特點(diǎn):
可以先釋放容器分配給請(qǐng)求的線(xiàn)程與相關(guān)資源,減輕系統(tǒng)負(fù)擔(dān),釋放了容器所分配線(xiàn)程的請(qǐng)求,其響應(yīng)將被延后,可以在耗時(shí)處理完成(例如長(zhǎng)時(shí)間的運(yùn)算)時(shí)再對(duì)客戶(hù)端進(jìn)行響應(yīng)。
一句話(huà):增加了服務(wù)器對(duì)客戶(hù)端請(qǐng)求的吞吐量(實(shí)際生產(chǎn)上我們用的比較少,如果并發(fā)請(qǐng)求量很大的情況下,我們會(huì)通過(guò)nginx把請(qǐng)求負(fù)載到集群服務(wù)的各個(gè)節(jié)點(diǎn)上來(lái)分?jǐn)傉?qǐng)求壓力,當(dāng)然還可以通過(guò)消息隊(duì)列來(lái)做請(qǐng)求的緩沖)。
2、異步請(qǐng)求的實(shí)現(xiàn)
方式一:Servlet方式實(shí)現(xiàn)異步請(qǐng)求
- @RequestMapping(value = "/email/servletReq", method = GET)
- public void servletReq (HttpServletRequest request, HttpServletResponse response) {
- AsyncContext asyncContext = request.startAsync();
- //設(shè)置監(jiān)聽(tīng)器:可設(shè)置其開(kāi)始、完成、異常、超時(shí)等事件的回調(diào)處理
- asyncContext.addListener(new AsyncListener() {
- @Override
- public void onTimeout(AsyncEvent event) throws IOException {
- System.out.println("超時(shí)了...");
- //做一些超時(shí)后的相關(guān)操作...
- }
- @Override
- public void onStartAsync(AsyncEvent event) throws IOException {
- System.out.println("線(xiàn)程開(kāi)始");
- }
- @Override
- public void onError(AsyncEvent event) throws IOException {
- System.out.println("發(fā)生錯(cuò)誤:"+event.getThrowable());
- }
- @Override
- public void onComplete(AsyncEvent event) throws IOException {
- System.out.println("執(zhí)行完成");
- //這里可以做一些清理資源的操作...
- }
- });
- //設(shè)置超時(shí)時(shí)間
- asyncContext.setTimeout(20000);
- asyncContext.start(new Runnable() {
- @Override
- public void run() {
- try {
- Thread.sleep(10000);
- System.out.println("內(nèi)部線(xiàn)程:" + Thread.currentThread().getName());
- asyncContext.getResponse().setCharacterEncoding("utf-8");
- asyncContext.getResponse().setContentType("text/html;charset=UTF-8");
- asyncContext.getResponse().getWriter().println("這是異步的請(qǐng)求返回");
- } catch (Exception e) {
- System.out.println("異常:"+e);
- }
- //異步請(qǐng)求完成通知
- //此時(shí)整個(gè)請(qǐng)求才完成
- asyncContext.complete();
- }
- });
- //此時(shí)之類(lèi) request的線(xiàn)程連接已經(jīng)釋放了
- System.out.println("主線(xiàn)程:" + Thread.currentThread().getName());
- }
方式二:使用很簡(jiǎn)單,直接返回的參數(shù)包裹一層callable即可,可以繼承WebMvcConfigurerAdapter類(lèi)來(lái)設(shè)置默認(rèn)線(xiàn)程池和超時(shí)處理
- @RequestMapping(value = "/email/callableReq", method = GET)
- @ResponseBody
- public Callable<String> callableReq () {
- System.out.println("外部線(xiàn)程:" + Thread.currentThread().getName());
- return new Callable<String>() {
- @Override
- public String call() throws Exception {
- Thread.sleep(10000);
- System.out.println("內(nèi)部線(xiàn)程:" + Thread.currentThread().getName());
- return "callable!";
- }
- };
- }
- @Configuration
- public class RequestAsyncPoolConfig extends WebMvcConfigurerAdapter {
- @Resource
- private ThreadPoolTaskExecutor myThreadPoolTaskExecutor;
- @Override
- public void configureAsyncSupport(final AsyncSupportConfigurer configurer) {
- //處理 callable超時(shí)
- configurer.setDefaultTimeout(60*1000);
- configurer.setTaskExecutor(myThreadPoolTaskExecutor);
- configurer.registerCallableInterceptors(timeoutCallableProcessingInterceptor());
- }
- @Bean
- public TimeoutCallableProcessingInterceptor timeoutCallableProcessingInterceptor() {
- return new TimeoutCallableProcessingInterceptor();
- }
- }
方式三:和方式二差不多,在Callable外包一層,給WebAsyncTask設(shè)置一個(gè)超時(shí)回調(diào),即可實(shí)現(xiàn)超時(shí)處理
- @RequestMapping(value = "/email/webAsyncReq", method = GET)
- @ResponseBody
- public WebAsyncTask<String> webAsyncReq () {
- System.out.println("外部線(xiàn)程:" + Thread.currentThread().getName());
- Callable<String> result = () -> {
- System.out.println("內(nèi)部線(xiàn)程開(kāi)始:" + Thread.currentThread().getName());
- try {
- TimeUnit.SECONDS.sleep(4);
- } catch (Exception e) {
- // TODO: handle exception
- }
- logger.info("副線(xiàn)程返回");
- System.out.println("內(nèi)部線(xiàn)程返回:" + Thread.currentThread().getName());
- return "success";
- };
- WebAsyncTask<String> wat = new WebAsyncTask<String>(3000L, result);
- wat.onTimeout(new Callable<String>() {
- @Override
- public String call() throws Exception {
- // TODO Auto-generated method stub
- return "超時(shí)";
- }
- });
- return wat;
- }
方式四:DeferredResult可以處理一些相對(duì)復(fù)雜一些的業(yè)務(wù)邏輯,最主要還是可以在另一個(gè)線(xiàn)程里面進(jìn)行業(yè)務(wù)處理及返回,即可在兩個(gè)完全不相干的線(xiàn)程間的通信。
- @RequestMapping(value = "/email/deferredResultReq", method = GET)
- @ResponseBody
- public DeferredResult<String> deferredResultReq () {
- System.out.println("外部線(xiàn)程:" + Thread.currentThread().getName());
- //設(shè)置超時(shí)時(shí)間
- DeferredResult<String> result = new DeferredResult<String>(60*1000L);
- //處理超時(shí)事件 采用委托機(jī)制
- result.onTimeout(new Runnable() {
- @Override
- public void run() {
- System.out.println("DeferredResult超時(shí)");
- result.setResult("超時(shí)了!");
- }
- });
- result.onCompletion(new Runnable() {
- @Override
- public void run() {
- //完成后
- System.out.println("調(diào)用完成");
- }
- });
- myThreadPoolTaskExecutor.execute(new Runnable() {
- @Override
- public void run() {
- //處理業(yè)務(wù)邏輯
- System.out.println("內(nèi)部線(xiàn)程:" + Thread.currentThread().getName());
- //返回結(jié)果
- result.setResult("DeferredResult!!");
- }
- });
- return result;
- }
二、Spring Boot中異步調(diào)用的使用
1、介紹
異步請(qǐng)求的處理。除了異步請(qǐng)求,一般上我們用的比較多的應(yīng)該是異步調(diào)用。通常在開(kāi)發(fā)過(guò)程中,會(huì)遇到一個(gè)方法是和實(shí)際業(yè)務(wù)無(wú)關(guān)的,沒(méi)有緊密性的。比如記錄日志信息等業(yè)務(wù)。這個(gè)時(shí)候正常就是啟一個(gè)新線(xiàn)程去做一些業(yè)務(wù)處理,讓主線(xiàn)程異步的執(zhí)行其他業(yè)務(wù)。
2、使用方式(基于spring下)
需要在啟動(dòng)類(lèi)加入@EnableAsync使異步調(diào)用@Async注解生效
在需要異步執(zhí)行的方法上加入此注解即可@Async("threadPool"),threadPool為自定義線(xiàn)程池。
代碼略。。。就倆標(biāo)簽,自己試一把就可以了
3、注意事項(xiàng)
在默認(rèn)情況下,未設(shè)置TaskExecutor時(shí),默認(rèn)是使用SimpleAsyncTaskExecutor這個(gè)線(xiàn)程池,但此線(xiàn)程不是真正意義上的線(xiàn)程池,因?yàn)榫€(xiàn)程不重用,每次調(diào)用都會(huì)創(chuàng)建一個(gè)新的線(xiàn)程??赏ㄟ^(guò)控制臺(tái)日志輸出可以看出,每次輸出線(xiàn)程名都是遞增的。所以最好我們來(lái)自定義一個(gè)線(xiàn)程池。
調(diào)用的異步方法,不能為同一個(gè)類(lèi)的方法(包括同一個(gè)類(lèi)的內(nèi)部類(lèi)),簡(jiǎn)單來(lái)說(shuō),因?yàn)镾pring在啟動(dòng)掃描時(shí)會(huì)為其創(chuàng)建一個(gè)代理類(lèi),而同類(lèi)調(diào)用時(shí),還是調(diào)用本身的代理類(lèi)的,所以和平常調(diào)用是一樣的。
其他的注解如@Cache等也是一樣的道理,說(shuō)白了,就是Spring的代理機(jī)制造成的。所以在開(kāi)發(fā)中,最好把異步服務(wù)單獨(dú)抽出一個(gè)類(lèi)來(lái)管理。下面會(huì)重點(diǎn)講述。。
4、什么情況下會(huì)導(dǎo)致@Async異步方法會(huì)失效?
調(diào)用同一個(gè)類(lèi)下注有@Async異步方法:
在spring中像@Async和@Transactional、cache等注解本質(zhì)使用的是動(dòng)態(tài)代理,其實(shí)Spring容器在初始化的時(shí)候Spring容器會(huì)將含有AOP注解的類(lèi)對(duì)象“替換”為代理對(duì)象(簡(jiǎn)單這么理解),那么注解失效的原因就很明顯了,就是因?yàn)檎{(diào)用方法的是對(duì)象本身而不是代理對(duì)象,因?yàn)闆](méi)有經(jīng)過(guò)Spring容器,那么解決方法也會(huì)沿著這個(gè)思路來(lái)解決。
調(diào)用的是靜態(tài)(static )方法
調(diào)用(private)私有化方法
5、解決4中問(wèn)題1的方式(其它2,3兩個(gè)問(wèn)題自己注意下就可以了)
將要異步執(zhí)行的方法單獨(dú)抽取成一個(gè)類(lèi),原理就是當(dāng)你把執(zhí)行異步的方法單獨(dú)抽取成一個(gè)類(lèi)的時(shí)候,這個(gè)類(lèi)肯定是被Spring管理的,其他Spring組件需要調(diào)用的時(shí)候肯定會(huì)注入進(jìn)去,這時(shí)候?qū)嶋H上注入進(jìn)去的就是代理類(lèi)了。
其實(shí)我們的注入對(duì)象都是從Spring容器中給當(dāng)前Spring組件進(jìn)行成員變量的賦值,由于某些類(lèi)使用了AOP注解,那么實(shí)際上在Spring容器中實(shí)際存在的是它的代理對(duì)象。那么我們就可以通過(guò)上下文獲取自己的代理對(duì)象調(diào)用異步方法。
- @Controller
- @RequestMapping("/app")
- public class EmailController {
- //獲取ApplicationContext對(duì)象方式有多種,這種最簡(jiǎn)單,其它的大家自行了解一下
- @Autowired
- private ApplicationContext applicationContext;
- @RequestMapping(value = "/email/asyncCall", method = GET)
- @ResponseBody
- public Map<String, Object> asyncCall () {
- Map<String, Object> resMap = new HashMap<String, Object>();
- try{
- //這樣調(diào)用同類(lèi)下的異步方法是不起作用的
- //this.testAsyncTask();
- //通過(guò)上下文獲取自己的代理對(duì)象調(diào)用異步方法
- EmailController emailController = (EmailController)applicationContext.getBean(EmailController.class);
- emailController.testAsyncTask();
- resMap.put("code",200);
- }catch (Exception e) {
- resMap.put("code",400);
- logger.error("error!",e);
- }
- return resMap;
- }
- //注意一定是public,且是非static方法
- @Async
- public void testAsyncTask() throws InterruptedException {
- Thread.sleep(10000);
- System.out.println("異步任務(wù)執(zhí)行完成!");
- }
- }
開(kāi)啟cglib代理,手動(dòng)獲取Spring代理類(lèi),從而調(diào)用同類(lèi)下的異步方法。首先,在啟動(dòng)類(lèi)上加上@EnableAspectJAutoProxy(exposeProxy = true)注解。代碼實(shí)現(xiàn),如下:
- @Service
- @Transactional(value = "transactionManager", readOnly = false, propagation = Propagation.REQUIRED, rollbackFor = Throwable.class)
- public class EmailService {
- @Autowired
- private ApplicationContext applicationContext;
- @Async
- public void testSyncTask() throws InterruptedException {
- Thread.sleep(10000);
- System.out.println("異步任務(wù)執(zhí)行完成!");
- }
- public void asyncCallTwo() throws InterruptedException {
- //this.testSyncTask();
- // EmailService emailService = (EmailService)applicationContext.getBean(EmailService.class);
- // emailService.testSyncTask();
- boolean isAop = AopUtils.isAopProxy(EmailController.class);//是否是代理對(duì)象;
- boolean isCglib = AopUtils.isCglibProxy(EmailController.class); //是否是CGLIB方式的代理對(duì)象;
- boolean isJdk = AopUtils.isJdkDynamicProxy(EmailController.class); //是否是JDK動(dòng)態(tài)代理方式的代理對(duì)象;
- //以下才是重點(diǎn)!!!
- EmailService emailService = (EmailService)applicationContext.getBean(EmailService.class);
- EmailService proxy = (EmailService) AopContext.currentProxy();
- System.out.println(emailService == proxy ? true : false);
- proxy.testSyncTask();
- System.out.println("end!!!");
- }
- }
三、異步請(qǐng)求與異步調(diào)用的區(qū)別
兩者的使用場(chǎng)景不同,異步請(qǐng)求用來(lái)解決并發(fā)請(qǐng)求對(duì)服務(wù)器造成的壓力,從而提高對(duì)請(qǐng)求的吞吐量;而異步調(diào)用是用來(lái)做一些非主線(xiàn)流程且不需要實(shí)時(shí)計(jì)算和響應(yīng)的任務(wù),比如同步日志到kafka中做日志分析等。
異步請(qǐng)求是會(huì)一直等待response相應(yīng)的,需要返回結(jié)果給客戶(hù)端的;而異步調(diào)用我們往往會(huì)馬上返回給客戶(hù)端響應(yīng),完成這次整個(gè)的請(qǐng)求,至于異步調(diào)用的任務(wù)后臺(tái)自己慢慢跑就行,客戶(hù)端不會(huì)關(guān)心。
四、總結(jié)
異步請(qǐng)求和異步調(diào)用的使用到這里基本就差不多了,有問(wèn)題還希望大家多多指出。這邊文章提到了動(dòng)態(tài)代理,而spring中Aop的實(shí)現(xiàn)原理就是動(dòng)態(tài)代理,后續(xù)會(huì)對(duì)動(dòng)態(tài)代理做詳細(xì)解讀,還望多多支持哈~