FastAPI + RabbitMQ:構建高性能異步任務系統
作者:Ss肥魚
本文將帶你使用 FastAPI + RabbitMQ 構建一個簡單的 異步任務隊列,模擬一個耗時的任務(如發送郵件),由后臺獨立 worker 消費執行。
在現代微服務架構中,任務解耦 和 異步處理 是系統擴展能力的關鍵。本文將帶你使用 FastAPI + RabbitMQ 構建一個簡單的 異步任務隊列,模擬一個耗時的任務(如發送郵件),由后臺獨立 worker 消費執行。
技術棧
- FastAPI(主服務,負責接收請求)
- RabbitMQ(消息隊列)
- aio-pika(Python 異步 RabbitMQ 客戶端)
系統架構簡圖
用戶請求 --> FastAPI(推送任務) --> RabbitMQ
↓
Worker(消費執行)
安裝依賴
pip install fastapi uvicorn aio-pika pydantic
RabbitMQ 環境準備
可以用 Docker 啟動 RabbitMQ 服務:
docker run -d --hostname rabbit --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
- 5672:RabbitMQ 消息端口
- 15672:Web 管理后臺(默認賬號密碼都是 guest)
FastAPI 應用(producer)
# app/main.py
from fastapi import FastAPI
from pydantic import BaseModel
import asyncio
import aio_pika
app = FastAPI()
class TaskRequest(BaseModel):
user_email: str
message: str
RABBITMQ_URL = "amqp://guest:guest@localhost/"
@app.on_event("startup")
async def startup():
app.state.rabbit_connection = await aio_pika.connect_robust(RABBITMQ_URL)
@app.on_event("shutdown")
async def shutdown():
await app.state.rabbit_connection.close()
@app.post("/send-task")
async def send_task(task: TaskRequest):
channel = await app.state.rabbit_connection.channel()
queue = await channel.declare_queue("task_queue", durable=True)
# 構造消息
msg_body = task.json().encode()
message = aio_pika.Message(body=msg_body, delivery_mode=aio_pika.DeliveryMode.PERSISTENT)
await channel.default_exchange.publish(message, routing_key="task_queue")
return {"status": "success", "msg": "任務已入隊"}
消費者 Worker(consumer)
# worker.py
import asyncio
import json
import aio_pika
RABBITMQ_URL = "amqp://guest:guest@localhost/"
async def main():
connection = await aio_pika.connect_robust(RABBITMQ_URL)
channel = await connection.channel()
await channel.set_qos(prefetch_count=1)
queue = await channel.declare_queue("task_queue", durable=True)
async def on_message(message: aio_pika.IncomingMessage):
async with message.process():
data = json.loads(message.body)
print(f"?? 收到任務:發送郵件至 {data['user_email']},內容:{data['message']}")
await asyncio.sleep(2) # 模擬耗時任務
print("? 郵件發送完成")
print("?? Worker 正在等待任務...")
await queue.consume(on_message)
if __name__ == "__main__":
asyncio.run(main())
啟動服務
啟動 FastAPI:
uvicorn app.main:app --reload
啟動 worker:
python worker.py
測試
發送 POST 請求到 /send-task:
POST http://localhost:8000/send-task
{
"user_email": "test@example.com",
"message": "歡迎使用 FastAPI + RabbitMQ!"
}
終端會看到 worker 消費消息并執行任務的輸出。
總結
通過 FastAPI + RabbitMQ,可以輕松實現異步任務分發系統:
- 主服務響應快速,避免卡頓
- 異步 worker 后臺處理,任務解耦
- RabbitMQ 提供可靠、高可用的消息傳遞機制
責任編輯:趙寧寧
來源:
Ssoul肥魚