成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

FastAPI + RabbitMQ:構建高性能異步任務系統

開發 架構
本文將帶你使用 FastAPI + RabbitMQ 構建一個簡單的 異步任務隊列,模擬一個耗時的任務(如發送郵件),由后臺獨立 worker 消費執行。

在現代微服務架構中,任務解耦 和 異步處理 是系統擴展能力的關鍵。本文將帶你使用 FastAPI + RabbitMQ 構建一個簡單的 異步任務隊列,模擬一個耗時的任務(如發送郵件),由后臺獨立 worker 消費執行。

技術棧

  • FastAPI(主服務,負責接收請求)
  • RabbitMQ(消息隊列)
  • aio-pika(Python 異步 RabbitMQ 客戶端)

系統架構簡圖

用戶請求 --> FastAPI(推送任務) --> RabbitMQ
                    ↓
             Worker(消費執行)

安裝依賴

pip install fastapi uvicorn aio-pika pydantic

RabbitMQ 環境準備

可以用 Docker 啟動 RabbitMQ 服務:

docker run -d --hostname rabbit --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
  • 5672:RabbitMQ 消息端口
  • 15672:Web 管理后臺(默認賬號密碼都是 guest)

FastAPI 應用(producer)

# app/main.py
from fastapi import FastAPI
from pydantic import BaseModel
import asyncio
import aio_pika

app = FastAPI()

class TaskRequest(BaseModel):
    user_email: str
    message: str

RABBITMQ_URL = "amqp://guest:guest@localhost/"

@app.on_event("startup")
async def startup():
    app.state.rabbit_connection = await aio_pika.connect_robust(RABBITMQ_URL)

@app.on_event("shutdown")
async def shutdown():
    await app.state.rabbit_connection.close()

@app.post("/send-task")
async def send_task(task: TaskRequest):
    channel = await app.state.rabbit_connection.channel()
    queue = await channel.declare_queue("task_queue", durable=True)

    # 構造消息
    msg_body = task.json().encode()
    message = aio_pika.Message(body=msg_body, delivery_mode=aio_pika.DeliveryMode.PERSISTENT)

    await channel.default_exchange.publish(message, routing_key="task_queue")

    return {"status": "success", "msg": "任務已入隊"}

消費者 Worker(consumer)

# worker.py
import asyncio
import json
import aio_pika

RABBITMQ_URL = "amqp://guest:guest@localhost/"

async def main():
    connection = await aio_pika.connect_robust(RABBITMQ_URL)
    channel = await connection.channel()
    await channel.set_qos(prefetch_count=1)
    queue = await channel.declare_queue("task_queue", durable=True)

    async def on_message(message: aio_pika.IncomingMessage):
        async with message.process():
            data = json.loads(message.body)
            print(f"?? 收到任務:發送郵件至 {data['user_email']},內容:{data['message']}")
            await asyncio.sleep(2)  # 模擬耗時任務
            print("? 郵件發送完成")

    print("?? Worker 正在等待任務...")
    await queue.consume(on_message)

if __name__ == "__main__":
    asyncio.run(main())

啟動服務

啟動 FastAPI:

uvicorn app.main:app --reload

啟動 worker:

python worker.py

測試

發送 POST 請求到 /send-task:

POST http://localhost:8000/send-task
{
  "user_email": "test@example.com",
  "message": "歡迎使用 FastAPI + RabbitMQ!"
}

終端會看到 worker 消費消息并執行任務的輸出。

總結

通過 FastAPI + RabbitMQ,可以輕松實現異步任務分發系統:

  • 主服務響應快速,避免卡頓
  • 異步 worker 后臺處理,任務解耦
  • RabbitMQ 提供可靠、高可用的消息傳遞機制
責任編輯:趙寧寧 來源: Ssoul肥魚
相關推薦

2024-10-09 11:31:51

2023-11-06 08:32:17

FastAPIPython

2022-12-09 08:40:56

高性能內存隊列

2025-01-13 12:23:51

2025-04-15 08:20:00

FastAPI異步函數

2023-03-13 07:40:44

高并發golang

2011-10-21 14:20:59

高性能計算HPC虛擬化

2011-10-25 13:13:35

HPC高性能計算Platform

2020-06-05 07:20:41

測試自動化環境

2011-12-15 13:28:57

2025-03-04 08:00:00

機器學習Rust開發

2022-10-08 07:55:33

DemoMongoDB異步

2023-10-28 09:05:38

2009-10-29 09:11:50

Juniper高性能網絡

2009-06-03 14:24:12

ibmdwWebSphere

2012-04-17 16:48:43

應用優化負載均衡Array APV

2011-02-23 09:49:40

ASP.NET

2014-11-21 15:48:39

英特爾高性能計算模塊

2011-02-13 09:17:02

ASP.NET

2023-12-26 00:58:53

Web應用Go語言
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 国产精品美女www爽爽爽 | 中文字幕亚洲欧美 | 一本大道久久a久久精二百 欧洲一区二区三区 | 中文字幕亚洲在线 | 色综合久久伊人 | 精品久久久久久亚洲综合网 | 成人影音| 在线成人福利 | 欧洲一级黄 | 成年视频在线观看福利资源 | 黄色大片免费网站 | 欧美高清一区 | 免费视频久久 | 97高清国语自产拍 | 婷婷久久五月天 | 国产成人在线播放 | 一级特黄网站 | 久久久久久久久久一区二区 | 午夜久久久 | 久久伊人亚洲 | 亚洲免费一区二区 | 久久这里有精品 | 偷拍亚洲色图 | 天堂在线91 | 精品一区二区在线看 | 成人在线视频免费观看 | www.99re| 欧美成年人视频在线观看 | 一二三区在线 | 99国产精品视频免费观看一公开 | 久久一区视频 | 日韩精品免费在线观看 | 春色av| 国产一级久久久久 | 视频一二区 | 国产电影一区二区在线观看 | 国产99精品 | 99久久婷婷国产综合精品电影 | 色视频www在线播放国产人成 | 精品国产91久久久久久 | 国产一区三区视频 |