輕松完成異步任務(wù),一文搞懂Python Celery
雖然現(xiàn)代的網(wǎng)絡(luò)應(yīng)用比以往任何時(shí)候都更快速、更便捷,但仍有許多情況下,需要把繁重的任務(wù)轉(zhuǎn)移到系統(tǒng)的其他部分執(zhí)行,而不是在主線程上進(jìn)行工作。
這些情況中的示例如下:
- 周期性任務(wù) —— 計(jì)劃在特定時(shí)間間隔內(nèi)運(yùn)行的工作。例如,每日、每月的報(bào)告生成。
- 第三方工具 —— 應(yīng)用程序應(yīng)該快速向用戶返回響應(yīng),而不是等待其他任務(wù)先完成。例如,發(fā)送電子郵件、通知,將更新進(jìn)度傳遞給內(nèi)部工具。
- 長(zhǎng)時(shí)間運(yùn)行的工作 —— 執(zhí)行復(fù)雜或資源昂貴的工作,并且用戶需要等待工作完成。例如。DAG工作流、基于Map-Reduce的任務(wù)、長(zhǎng)時(shí)間運(yùn)行的Spark作業(yè)等。
那么,如何處理這些情況呢?這時(shí),Celery就派上用場(chǎng)了。
什么是Celery?
Celery是一個(gè)開(kāi)源的任務(wù)隊(duì)列實(shí)現(xiàn),通常與基于Python的網(wǎng)絡(luò)框架(如Flask和Django)相結(jié)合,在典型的請(qǐng)求-響應(yīng)周期之外異步執(zhí)行任務(wù)。
因此,Celery本質(zhì)上是一個(gè)基于分布式消息傳遞的任務(wù)隊(duì)列。執(zhí)行單元或任務(wù)在一個(gè)或多個(gè)worker上使用多處理、gevent或Eventlet同時(shí)執(zhí)行。這些任務(wù)可以同步執(zhí)行(即等到準(zhǔn)備就緒)或異步執(zhí)行(即在后臺(tái))。
Celery是如何工作的?
Celery是一個(gè)分布式任務(wù)隊(duì)列,基于生產(chǎn)者-消費(fèi)者模式。
任務(wù)隊(duì)列是用于跨線程和機(jī)器分配工作的機(jī)制,本質(zhì)上是生產(chǎn)者(Web應(yīng)用程序)和消費(fèi)者(Celery工作者)之間的消息中介。
Celery通過(guò)消息進(jìn)行交互,代理(broker)在客戶(生產(chǎn)者)和工作者(消費(fèi)者)之間充當(dāng)中間人。為了啟動(dòng)任務(wù),客戶端將消息推送到隊(duì)列中,然后代理將該消息傳遞給工作者。
Celery系統(tǒng)可以由多個(gè)worker和broker組成,這為高可用性和橫向擴(kuò)展提供了可能。
簡(jiǎn)而言之,Celery客戶端是生產(chǎn)者,它通過(guò)消息代理向隊(duì)列中添加新的任務(wù)。然后,Celery工作者同樣通過(guò)消息代理從隊(duì)列中獲取新的任務(wù)。一旦處理完畢,結(jié)果就會(huì)存儲(chǔ)在結(jié)果后端。
工作實(shí)例
下面的例子將使用RedisMQ作為消息代理。
設(shè)置Redis
在linux/macOS系統(tǒng)上,通過(guò)以下命令在本地運(yùn)行Redis服務(wù)器:
設(shè)置好Redis后,通過(guò)執(zhí)行以下命令運(yùn)行Redis服務(wù)器:
該服務(wù)器在默認(rèn)的6379端口運(yùn)行。
設(shè)置應(yīng)用程序
首先,在本地設(shè)置Python項(xiàng)目。
Celery可以通過(guò)標(biāo)準(zhǔn)工具如pip或easy_install來(lái)安裝。通過(guò)以下命令安裝Celery和Redis:
現(xiàn)在需要一個(gè)Celery實(shí)例來(lái)運(yùn)行應(yīng)用程序,Celery實(shí)現(xiàn)任何任務(wù)都是以實(shí)例開(kāi)始,比如創(chuàng)建和管理任務(wù)等。
在項(xiàng)目中創(chuàng)建一個(gè)文件tasks.py:
這里定義了一個(gè)簡(jiǎn)單的任務(wù)add(),返回兩個(gè)數(shù)字的總和。
運(yùn)行Celery Worker
在終端上,切換到項(xiàng)目位置并用以下命令運(yùn)行Celery worker:
關(guān)于Celery worker命令行的詳細(xì)信息,可以使用help:
調(diào)用任務(wù)
在Celery中,使用delay()方法來(lái)調(diào)用任務(wù)。
打開(kāi)項(xiàng)目的另一個(gè)終端窗口并運(yùn)行以下命令:
這將打開(kāi)Python命令行。
這將返回一個(gè)AsyncResult實(shí)例,可以用來(lái)檢查任務(wù)狀態(tài),獲得其返回值,等待任務(wù)完成,也可以在失敗時(shí)獲得異常和回溯。
運(yùn)行add.delay()命令后,任務(wù)會(huì)被推送到隊(duì)列中,然后被worker獲取。這可以在Celery worker終端上進(jìn)行驗(yàn)證,可以清楚地看到任務(wù)被接收,之后任務(wù)成功完成。