成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

春節活動 - 高峰值獎勵發放技術方案

原創 精選
移動開發 移動應用
2022年春節活動在8款字節系 APP 上線,包含了紅包雨、集年味卡和煙火大會等諸多玩法。為了保證用戶體驗、活動效果和資金安全,紅包雨系統需要保證超高的穩定性。在系統設計上不能強依賴任何外部系統,在極端情況下僅需要紅包雨服務可用,用戶請求即可正常處理并返回結果。

作者|張健

1. 背景

2022年春節活動在8款字節系 APP 上線,包含了紅包雨、集年味卡和煙火大會等諸多玩法。紅包雨、集卡開獎和煙火大會都存在高峰值突發流量。其中,紅包雨活動會在10分鐘內給幾千萬甚至上億用戶發放上億現金獎勵,且大多數請求集中在前3分鐘。在項目啟動時,紅包雨活動作為最大的流量來源,預估的發紅包峰值流量有180萬 QPS 。

為了保證用戶體驗、活動效果和資金安全,紅包雨系統需要保證超高的穩定性。在系統設計上不能強依賴任何外部系統,在極端情況下僅需要紅包雨服務可用,用戶請求即可正常處理并返回結果。獎勵系統作為紅包系統的下游服務,負責用戶獎勵的入賬,需要承載最高180萬 QPS 的獎勵發放請求,并且在出現異常情況時保證用戶體驗無損,獎勵可以最終入賬,做到不超發不少發。

圖片

2. 技術挑戰

2.1 峰值流量高

除夕當天會進行7場紅包雨,從12:00起每小時進行一場,集卡開獎和煙火大會于19:30開始。當晚20:00前后,紅包雨、集卡開獎和煙火大會的發獎流量將會疊加在一起,屆時可能產生超過200萬 QPS 的發獎流量。下游資產中臺服務僅提供30萬 QPS 的現金紅包、40萬 QPS 的優惠券入賬能力。獎勵系統需要削峰限流,異步入賬獎勵,確保下游服務不過載。

2.2 獎勵種類多

除現金紅包外,在集卡和煙火大會場景會發放10多種優惠券、實物獎勵、頭像掛件等。不同的優惠券由不同的下游系統發放,且每個系統的吞吐能力不同,甚至部分系統只能提供2000 TPS 的處理能力。獎勵系統在進行削峰限流時,不同獎勵種類限流的閾值需要根據下游系統吞吐能力進行個性化配置。下游系統能力有限的情況下,需要保證現金優先入賬。

2.3 系統高可靠

引入消息隊列進行獎勵異步發放后,需要盡可能保證獎勵事件的可靠投遞和可靠消費,任何獎勵最終都要入賬,還需兼顧消息隊列集群的穩定和容災。

在內部服務出災的情況下,或獎勵事件在消息隊列中堆積時,需要做到用戶無感知,用戶在活動錢包頁可見獎勵流水,隨時可以正常提現。除通過消費獎勵事件入賬外,還需引入用戶提現行為觸發強制入賬的能力,與此同時還要保證安全可靠,不能被黑產攻擊造成資金損失。

3. 技術方案

基于春節活動峰值流量高、穩定性要求高的特點,為了保證高峰值流量下獎勵系統穩定可靠,技術方案選型時選擇了基于消息隊列削峰、異步處理請求的總體方案。獎勵發放的大概流程如下:

圖片

在獎勵事件生產側,為了盡可能降低上游接入方的開發成本,基于不同接入場景特性,由獎勵系統提供獎勵 SDK ,并定義簡單清晰的發獎接口,供接入方選用。獎勵事件的可靠投遞由 SDK 內部保證。獎勵事件 MQ 使用了公司內 ByteMQ 和 RocketMQ 兩種消息隊列,防止因單個消息隊列集群宕機導致整個系統不可用。

在獎勵事件消費側,針對每一個 Topic 創建一個消費者服務,四個消費者功能完全一致。由消費者服務保證消息可靠消費和消費限速。

除激勵金幣外,其他獎勵類型通過資產中臺服務調用各個下游發放。春節活動期間,資產中臺暫未支持發獎請求的削峰,需要在獎勵系統前置進行。業務上,同一訂單號只能發放一種獎勵一次,由于資產中臺和激勵中臺系統之間數據隔離,需要獎勵系統支持單一訂單號跨服務發放冪等。

3.1 獎勵SDK設計

SDK 以代碼“內嵌”的方式運行在接入方服務內,可以避免 RPC 方式網絡傳輸、請求數據序列化和返回數據反序列化帶來的時延和性能消耗。盡管 SDK 的整體時延和性能優于 RPC 方式,對 SDK 本身的穩定性、性能消耗和接口響應時延依然有非常高的要求。以紅包雨場景為例,發獎接口需要50ms內返回,若響應時間超過50ms將會增加整個活動玩法接口的處理時間,影響紅包雨服務的吞吐量,最終會影響用戶參與春節活動的體驗。

獎勵 SDK 在功能上實現了獎勵Token 的生成和存儲和獎勵事件的可靠投遞。 接口設計上面向不同接入場景針對性地提供定制接口,最大限度的降低使用方的理解和接入成本,減少開發周期。

為了保證 SDK 代碼結構清晰,并具有較高的拓展性和可維護性,在代碼結構層面,SDK 內部使用了分層設計,分為了對外接口層、內部接口層和內部實現層。

3.1.1 對外接口層

對外接口層定義了暴露給使用者的外部接口,除初始化、反初始化等接口和通用的異步發獎接口外,還為紅包雨、煙火大會和集卡分別提供差異化定制接口。通用異步發獎接口定義和獎勵 RPC 服務的異步發獎接口保持一致,通過調用 RPC 接口和通過 SDK 發獎的接入方可以低成本的雙向遷移。

定制接口結合使用場景的特點,固化諸如活動 ID、場景 ID、獎勵類型等通用參數,減少接口入參個數,函數名稱語義更清晰,可進一步降低接入方的使用成本,提升接入方代碼的可讀性和可維護性。對于部分場景,還承擔了全局冪等 ID的拼接工作。

發獎請求除用戶信息(用戶 ID、設備 ID 和 AppID )、獎勵信息(獎勵類型、數值)外,還需攜帶一個全局唯一 ID 作為訂單號,以實現根據訂單號冪等的能力。訂單號由接入方根據活動信息和用戶信息拼接而成。所有的接口都支持調用方寫入拓展字段(Map 格式的鍵值對)保存業務自定義信息。

3.1.2 內部接口層

內部接口層提供了通用的獎勵異步發放接口(SendBonus)、Token 生成和存儲接口(GenBonusToken)、初始化接口和反初始化接口。外部接口基于內部接口進行差異化封裝,提供更細化的功能。內部接口層對上層屏蔽內部實現細節。

以異步發放接口 SendBonus 函數為例,主要集成了參數檢查、打點監控、虛擬隊列(Queue)選擇、獎勵消息的構造和發送、獎勵 Token 的生成和存儲等功能。參數校驗通過后,SendBonus 接口即返回獎勵 Token,供上層調用者使用(一般是返回給前端和客戶端)。

/*
SendBonus
@act 活動信息
@user 用戶信息
@bonus 獎勵信息
*/
func SendBonus(ctx context.Context, act Activity, user User, bonus *BonusContent) (string, error) {
// 參數檢查
if err := CheckParams(act, user); err != nil {
// 輸出錯誤日志,監控異常請求
return "", err
}

// 檢查獎勵類型是否合法
cfg, err := CheckBonus(bonus)
if err != nil {
// 輸出錯誤日志,監控異常請求
return "", err
}

// 構造獎勵消息
message := &event.BonusEvent{...}

// SendEvent內部根據獎勵屬性選擇隊列
if err = queue.SendEvent(ctx, message); err != nil {
return GenBonusToken(ctx, act, user, info, true), err
}

// 構造并返回獎勵Token
return GenBonusToken(ctx, act, user, info, true), nil
}

3.1.3 內部實現層

內部實現層主要包含獎勵 Token 和虛擬隊列 Queue 兩大模塊。Token 模塊負責 Token 的生成、存儲和查詢;Queue 模塊負責實現消息的可靠投遞。

A. Token 模塊

在整個活動系統內部,獎勵系統通過消費獎勵事件(異步消息)進行真實的獎勵發放。在獎勵系統內部出災或獎勵實際入賬存在壓單的情況下,引入 Token 機制來保證用戶體驗無損、保證用戶在活動頁面可見獎勵流水、保證用戶使用獎勵時可操作(現金可提現、優惠券可使用等)。Token 作為用戶獲得獎勵的憑據而存在,和獎勵事件一一對應。Token 的產生和流轉過程如下圖所示:

圖片

Token 數據結構和加解密

Token 內部數據結構使用 Protobuf 定義,相對于 JSON 方式序列化和反序列化性能均有提升、序列化后的數據大小減小了50%。Token 數據會返回給客戶端并保存在本地,為防止黑產解析 Token 構造數據惡意請求服務端接口,需要對Token 數據進行加密。Token 對象使用 Protobuf 進行序列化后的明文使用公司內的 KMS 工具進行加密。加密后的密文使用 Base64 算法進行編碼,以便在網絡傳輸和客戶端本地存儲。解密時先進行 Base64 解碼,再使用 KMS 工具進行解密,拿到的明文使用 Brotobuf 進行反序列化后即可得到 Token 對象。

Token 數據內容如下所示:

syntax = "proto3";

message BonusToken {
string TradeNo = 1; // 訂單號,全局唯一,用于冪等
int64 UserID = 2; // 發獎當時的APP內的UID
string Activity = 3; // 活動
string Scene = 4; // 場景
int64 AwardType = 5; // 獎勵類型
int32 AwardCount = 6; // 獎勵數值
int64 AwardTime = 7; // 獎勵發放時間戳
string Desc = 8; // 獎勵文案
}

Token 存儲

Token 存儲是典型的寫多讀少場景,底層存儲需要直接承載發獎的峰值流量(預估350萬 QPS ,部分場景一次請求會發放多個獎勵),用戶進入錢包頁面才會讀取存儲(預估40萬QPS),讀寫請求量級相差較多。數據的有效期較短,獎勵真正入賬后即可刪除。寫入場景均為插入單個 Token,讀取場景均為讀 Token 列表。

Token 主要由紅包雨、集卡開獎和煙火大會發獎產生,其中紅包雨和集卡開獎的獎勵數量有明確的數量上限。在煙火大會玩法中,用戶最快每30秒即可領取一次獎勵,對用戶領獎次數沒有限制,理論上單個用戶在整個煙火大會活動可以產生500個 Token。

基于預估的線上流量、讀寫模型和活動特點,決定使用 Redis 作為底層存儲,數據結構使用 Hash,用戶的 ActID 作為 Hash 數據的 Key、Token 的訂單號 TradeNo 作為 Hash 的 Field、Token 序列化后的明文作為 Hash 的 Value。

Token 服務

Token 服務提供了查詢用戶 Token 列表和加密 Token 合法性校驗接口。根據Token 密文是否可以正常解密、解密后的 Token 是否存在于 Redis 中,Token 合法性校驗接口返回三種結果:

  • 非法 Token:密文無法解密
  • 未知 Token:密文可解密,但存儲無記錄
  • 合法 Token:密文可解密,且存儲有記錄

獎勵 SDK 在寫 Token 的 Redis 時不會進行失敗重試,存在極少數 Token 沒有保存成功的情況。為了保證資金安全、防止黑產惡意攻擊,可解密的未知 Token 不能用作強制入賬。

Token 使用

用戶參與活動獲得獎勵后,Token 由活動前端調用客戶端 JSB 進行保存。用戶查看獎勵流水時,活動錢包頁前端會通過 JSB 讀取本地 Token 列表,在請求資產中臺服務時攜帶。資產中臺服務使用 TokenSDK 進行解密,同時會請求 Token 服務讀取服務端 Token 列表,并進行合并操作。資產中臺還會在合并后的列表中刪除已經入賬的 Token,在返回給用戶的流水里插入暫未入賬的流水并修正活動錢包余額,保證用戶獎勵及時可見。

用戶在活動錢包頁進行提現時,也會將客戶端本地 Token 帶給資產中臺服務。資產中臺服務對未入賬的合法 Token 進行強制入賬,保證用戶可以完成提現操作。

客戶端和服務端 Token 的作用

當獎勵系統依賴的消息隊列出災導致無法寫入或消費時、或由于削峰限流導致獎勵真實入賬存在延遲時,兩種 Token 都可以在一定程度上保證用戶體驗無損。

客戶端 Token 通過用戶設備和后臺服務之間的網絡傳遞,保存于用戶設備存儲。服務端 Token 通過內部網絡傳遞,保存于中心化的 Redis 存儲。兩種 Token 互為備份,在本地 Token 不可取時,可以依賴服務端 Token。服務端 Token 服務出災時,客戶端 Token 仍然可以保證用戶體驗。

本次活動在字節系8個 APP 同時上線,Token 服務還可以保證用戶在不同 APP 上,甚至不同的設備上的體驗一致。

B. Queue 模塊

Queue 模塊負責提供 “可靠” 的消息投遞服務。對外暴露的 SendEvent 函數能夠根據獎勵選用對應的虛擬隊列進行消息發送、并提供統一的監控能力。

func SendEvent(ctx context.Context, msg *BonusEvent) error {
// 根據獎勵信息選擇專用的虛擬隊列
queue := GetQueue(msg.Activity, msg.Scene, msg.BonusType)
data, err := proto.Marshal(message)
if err != nil {
return err
}
return queue.Send(ctx, message.UserID, message.UniqueID, data)
}

虛擬隊列(Queue)是對公司內 ByteMQ 和 RocketMQ 的封裝,內部通過代碼封裝屏蔽了兩種消息隊列 Producer-SDK 的使用細節,并支持使用兩種 MQ 進行互備,提升整個系統的容災能力。虛擬隊列的類圖如下所示:

圖片

虛擬隊列的 Send 方法可根據用戶 ID 動態的調整主備生產者的使用比例,在單個生產者失敗的情況下提供自動容災能力。

func (q *Queue) Send(ctx context.Context, uid int64, tradeNo string, data []byte) error {
var err error
if (uid % 100) < GetQueueRatio(q.Name()) {
err = q.Master.Send(ctx, tradeNo, data)
if err != nil {
err = q.Backup.Send(ctx, tradeNo, data)
}
} else {
err = q.Backup.Send(ctx, tradeNo, data)
if err != nil {
err = q.Master.Send(ctx, tradeNo, data)
}
}
return err
}

使用 RocketMQ 或 ByteMQ 的 SDK 異步批量發送功能時,由 Producer 屏蔽兩個 SDK 失敗回調的差異,統一使用失敗消息通道返回給上層。虛擬隊列的 Retry 邏輯負責讀取主備 Producer 的失敗消息,并采取主備輪轉的方式進行發送重試。在服務進程無異常退出的情況下,可保證消息最終發送成功。進程正常退出時,Close 方法會等待所有消息處理完成再返回。

圖片

消息隊列 Topic可配置

虛擬隊列內部使用了 Master 和 Backup 兩個消息隊列,通過代碼抽象和底層消息隊列類型做了解耦。在真實線上環境,為了達到災備的目的,單個虛擬隊列的 Master 和 Backup 需要使用不同類型或者不同物理集群的消息隊列 Topic。

在春節活動期間,ByteMQ 和 RocketMQ 的研發和運維團隊分別提供了一個活動專用集群,并做重點運維保障。獎勵系統在 ByteMQ 和 RocketMQ 的活動集群申請各申請了兩個 Topic。基于4個 Topic,在上層構建了3個虛擬隊列。

圖片

Topic 的 Producer 實例可以在不同的 Queue 中復用。上圖中,ByteMQ 的生產者 S 在 Special Queue 中作為 Master,在 Express Queue 中作為 Backup;RocketMQ 的生產者 B 同時在 Massive 和 Special Queue 中作為 Backup。

獎勵 SDK 內部使用的消息隊列 Topic 配置在了動態配置 TCC 中,虛擬隊列和 Producer 實例之間的映射關系也可通過 TCC 配置。做到了代碼和消息隊列集群、Topic 解耦。開發測試、線上運行階段可以非常方便的更換消息隊列Topic。

獎勵對應的虛擬隊列可配置

獎勵類型和虛擬隊列的對應關系配置在 TCC 中,不同的獎勵類型可以動態的指定發送的虛擬隊列,沒有配置時默認使用 Massive 虛擬隊列。在 SendEvent 方法中,調用 GetQueue 發放選用虛擬隊列。春節活動期間,Massive 虛擬隊列承載所有場景發放的現金獎勵;Special 虛擬隊列承載了所有場景發放的優惠券;Express 虛擬隊列承載了所有場景下的激勵金幣獎勵。

消息異步批量發送

ByteMQ 和 RocketMQ 的生產者 SDK 均支持同步發送和異步批量發送消息。RocketMQ 同步發送時延 P99為20 ms,而 ByteMQ 同步發送時延 P99為秒級。在發送同等數量級的消息時,RocketMQ 的 CPU 占用明顯高于 ByteMQ。在異步發送模式下,消息隊列的生產者 SDK 會啟動協程定時或當緩沖區內的消息達到閾值時發送。定時的時間間隔和緩沖區閾值可以在初始化時配置。批量發送可以降低生產者對消息隊列服務的請求次數,假設每100個消息批量發送一次,最高可以將消息隊列服務的 QPS 降低100倍,極大的減輕消息隊列集群的負載。

為了降低獎勵事件發送接口的響應時延,以及保持消息隊列集群負載低水位,在大流量發獎場景均使用異步批量發送模式,并配置 ByteMQ 承載主要的流量。

3.2 消費者設計

消息隊列的削峰功能,基于控制消費者的消費速度實現。RocketMQ 消費方式基于長輪訓方式實現,兼具了推拉兩種模式的優點。ByteMQ 消費方式為拉模式。消費者實例可通過控制拉消息的頻率和單次拉取消息的數量來控制消費速度。

在春節活動獎勵發放場景,不僅需要動態的調整多個消息隊列的總消費速度,保證下游獎勵服務、資產中臺服務、激勵中臺服務不過載,且充分利用機器資源;還需要動態的控制不同獎勵類型的消費速度,支持現金等重要獎勵優先入賬。

活動中發放的獎勵類型較多,不能為每種獎勵單獨分配消息隊列 Topic。不同獎勵類型發放的數量差異顯著,發放量級大和入賬優先級高的獎勵獨占 Topic,發放量級小和入賬優先級低的獎勵共用一個 Topic。不同獎勵類型的真實入賬服務(資產中臺服務的下游服務)入賬能力不同,入賬能力最小的服務每秒僅能處理2000的發放請求。需要支持獎勵類型維度的靈活消費控速能力。

在多維度的控速基礎上,還需要提供可靠消費的能力,每個獎勵消息至少成功處理一次(At least Once),所有獎勵最終成功入賬。

基于上述背景,獎勵消費者服務消息拉取速度(從 Topic 讀取消息)和消息處理速度(通過獎勵類型限速,調用獎勵系統發放獎勵)可能存在差異。當拉取速度小于處理速度時,獎勵服務吞吐量下降,消息在 Broker 中堆積時間變長;當拉取速度大于處理速度時,不能通過獎勵類型限速的消息會堆積在消費者服務進程內存中,并阻塞消費,差異顯著時可能造成消費者服務進程因 OOM 而退出,影響服務穩定性。對于被獎勵類型限速的消息,需要立即進行重入 隊列,消費者服務繼續處理后續消息。由于網絡波動等原因,暫時處理失敗的消息,也需要重入隊列,保證消息可以最終處理成功。

3.2.1 消費控速實現

A. 消費限速

RocketMQ 消費者實例在啟動時可配置單實例消費速度和消費 Worker 數量。動態調整消費速度,需要重啟消費者實例。ByteMQ 兼容 Kafka 協議,Golang 代碼中消費 ByteMQ 隊列使用了  sarama-cluster (https://github.com/bsm/sarama-cluster)。sarama-cluster 相比于RocketMQ 的 SDK 更加簡單,沒有提供單實例消費限速能力。單實例可以訂閱多個 Partition,每個 Partition 會啟動一個協程從 Broker 讀取消息,多個 Partiton 共用一個全局通道(Channel)寫入待處理消息。業務代碼需要從全局通道中讀取消息進行處理。限速邏輯只能在業務邏輯中實現,動態調整消費速度無需重啟消費者實例。

基于 sarama-cluster 的特點,使用 Go 原生限速器(golang.org/x/time/rate)實現了 ByteMQ 消費者的單實例限速器。代碼實現如下:

type Limiter struct {
Open bool
Fetcher LimitFetcher
inner *rate.Limiter
stop chan struct{}
}
// Wait 處理消息前調用,返回后進行處理
func (s *Limiter) Wait() {
if s.Open {
_ = s.inner.Wait(context.Background())
}
}
// Loop 用于監聽限速變化
func (s *Limiter) Loop() {
for s.Open && s.Fetcher != nil {
select {
case <-time.After(time.Second * 5):
newLimit := s.Fetcher()
if newLimit != int(s.inner.Limit()) {
s.inner.SetLimit(rate.Limit(newLimit))
}
case <-s.stop:
return
}
}
}

Go 原生限速器采用令牌桶算法實現限流,內部沒有維護 Timer,而是采用了惰加載的思路,在獲取 Token 時根據時間差計算更新可用 Token 數量。沒有任何外部依賴,非常適合用于單實例限流。

動態調整限流器的速率時,通過限速器 Reserve 和 Wait 接口消耗但未使用的Token 不會被取消。使用 Wait 方法阻塞的時間不會因為速率的調整而變化。速率調整發生后,對下游產生的 QPS 由三部分組成:調整前已經在等待的請求(阻塞在 rate.Limiter::Wait()) 、調整后新增的 Token 帶來的請求和 Burst(桶容量)帶來的請求。調整后短時間內的對下游產生的 QPS 可能超過預期的速度。對于突發流量場景,Burst 不宜設置過大。

// SetLimitAt sets a new Limit for the limiter. The new Limit, and Burst, may be violated
// or underutilized by those which reserved (using Reserve or Wait) but did not yet act
// before SetLimitAt was called.
func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit)
B. 并發消費

RocketMQ 有序消費時,單個 Queue 只能分配一個 Worker 進行消費,只有當前 Queue 上一個消息成功處理后,才會處理下一個消息,消費速度受限于Queue 的數量和單個消息的處理時延;無序消費時,所有 Worker 共用一個緩沖區,隨機消費不同 Queue 的消息,Worker 之間并發處理消息,Worker 數量越多消費速度越快。

RocketMQ 進行消息確認(ACK)時,本地處理成功的消息數量超過一定數量時,或者距離上一次提交超過一定時間后,消費者實例會批量提交(BatchCommit)成功消費信息給 Broker。批量提交請求中包含每個消息的 MsgID、QueueID 和 Offset 等。Broker 側提供了消息確認窗口機制,每次保存對應Queue 的窗口中最小 Offset 到磁盤。若 Broker 發生宕機,窗口中大于磁盤保存 Offset 的消息,將會被再次消費。在消費者視角,會消費到已經成功確認的消息。因此,RocketMQ 不能保證 At Most Once,消息處理邏輯需要保證冪等。

ByteMQ 消息確認機制相對簡單,Broker 沒有提供消息確認窗口機制,收到消費者實例的 Commit 請求時,直接保存當前 Offset,偏移量小于當前 Offset 的消息將不會再次被消費。在消費者實例中,業務代碼調用的 MarkOffset 方法,會基于確認消息的 Offset+1并記錄在內存中,由協程定時提交到 Broker。若消費者實例發生宕機,Offset 未提交到 Broker 的消息將會被 Broker 再次下發,ByteMQ 也不能保證 At Most Once,消費者也需要保證處理邏輯需要保證冪等。

消費 ByteMQ 時,從 sarama-cluster 暴露的全局通道中讀取消息后,同步處理成功后調用 MarkOffset 方法可以保證順序消費。但同步處理會嚴重降低消費速度(單實例同一時刻只能處理一個消息)。啟動協程異步處理可以并發處理消息,并可通過增加協程數量來提升消費速度。但在消費者進程異常退出、消費者宕機等情況下會造成消息丟失。例如:Offset 較大的消息處理后并成功確認(Offset 成功提交到 Broker)后,Offset 較小的消息還未處理成功時消費者宕機,Broker 不再下發該消息,導致該消息漏處理,不滿足 At Least Once 語義。

// MarkOffset marks the provided message as processed, alongside a metadata string
// that represents the state of the partition consumer at that point in time. The
// metadata string can be used by another consumer to restore that state, so it
// can resume consumption.
//
// Note: calling MarkOffset does not necessarily commit the offset to the backend
// store immediately for efficiency reasons, and it may never be committed if
// your application crashes. This means that you may end up processing the same
// message twice, and your processing should ideally be idempotent.
func (c *Consumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string) {
c.subs.Fetch(c.client.config.TryWrapTopicByEnv(msg.Topic), msg.Partition).MarkOffset(msg.Offset+ 1, metadata)
}

解決上述消息漏處理的問題,需要針對 ByteMQ 的確認機制在業務層進行優化,即在消費者代碼中自助實現消息確認窗口機制。在消費者進程中,按照消息順序將其 Offset 緩存在鏈表中,同時以 Offset 為 Key 在 HashMap 中存儲鏈表節點指針。消息成功處理時,通過 HashMap 尋址,修改鏈表節點狀態。本地協程定時從鏈表頭部掃描,嚴格按照順序向 Broker 提交成功消費的 Offset。并發處理時,保證較大 Offset 的消息不會提前確認給 Broker。

3.2.2 事件處理邏輯

RocketMQ 提供了失敗隊列,并提供重試能力,但 ByteMQ 沒有失敗處理機制,為抹平兩種消息隊列的差異,事件處理方法(HandleMessage)需要盡最大可能保證成功處理,對于處理失敗的消息需要進行重入隊列(SendEventToBackup)。

RocketMQ 消費者失敗消息多次重入隊列失敗后,會繼續利用消息隊列 SDK 提供的失敗重試能力。由于 ByteMQ 的 SDK 沒有失敗處理機制, 失敗消息多次重入隊列失敗后,依然會對其 Offset 進行確認,保證不會阻塞后續消息處理。

HandleMessage

// HandleMessage for ByteMQ
func HandleMessage(msg *sarama.ConsumerMessage) error {
err := DoReward(msg.Context, msg.Value, limiter)
MarkOffser(msg, err) // 本地確認,由異步協程定時提交
return nil
}

// HandleMessage for RocketMQ
func (w wrapper) HandleMessage(ctx context.Context, msg *pb.ConsumeMessage) error {
return handler.DoReward(ctx, msg.Msg.MsgBody, limiter)
}

type Limiter interface {
Allow(*BonusEvent) bool
}

func DoReward(ctx context.Context, data []byte, rate Limiter) error {
bonus := &BonusEvent{}
if err := proto.Unmarshal(data, bonus); err != nil {
return err
}
// 按照獎勵類型限流,當rate為nil時不限流,熔斷時直接重入隊列
if rate == nil || rate.Allow(bonus) {
// 同步調用獎勵服務進行發獎
if err := callReward(ctx, bonus); err == nil {
return nil
}
}
// 處理失敗:重新寫入隊列
return SendEventToBackup(ctx, bonus.UniqueID, bonus)
}

SendEventToBackup

func SendEventToBackup(ctx context.Context, tradeNo string, bonus *BonusEvent) error {
bonus.Retry++ // 增加Retry次數
data, err := proto.Marshal(bonus)
if err != nil {
return err
}
// 使用新PartitonKey進行重發
newPartitionKey := fmt.Sprintf("%s{%d}", bonus.UniqueID, bonus.Retry)
for _, queue := range instances {
// 多個備選隊列用于重入隊列
if err = queue.Send(ctx, newPartitionKey, data); err == nil {
return nil
}
}
// 極端情況下通過日志回撈的方式處理
logs.CtxError(ctx, "%s", base64.StdEncoding.EncodeToString(data) )
return err
}

3.2.3 獎勵類型限速

由于不同獎勵類型最終由不同的下游系統入賬,為保證下游系統都穩定性,減少下游系統返回限流錯誤和無效調用,針對每一個獎勵類型單獨配置了單實例限速。

func NewLimiter() *Limiter {
l := &Limiter{
m: sync.Map{},
ticker: time.NewTicker(5 * time.Second),
}
l.loop()
return l
}

type Limiter struct {
m sync.Map
ticker *time.Ticker
}

type innerLimiter struct {
*rate.Limiter
Fuse bool
}

// Allow 返回true時處理消息;返回false時不處理消息,直接重入隊列
func (L *Limiter) Allow(event *BonusEvent) bool {
if event == nil {
return true
}
if v, exist := L.m.Load(GetBonusType(event)); exist {
if inner, ok := v.(*innerLimiter); ok {
if inner.Fuse { // 開啟了熔斷開關
return false
}
return inner.Allow()
}
}
return true
}

func (L *Limiter) loop() {
go func() {
defer Recover()
L.run()
for range L.ticker.C {
L.run()
}
}()
}

// 監聽配置變更,動態調整限速
func (L *Limiter) run() {
for wt, config := range tcc.GetRateCfg() {
value, exist := L.m.Load(wt)
if !exist || value == nil {
// 創建新增限流器
L.m.Store(wt, &innerLimiter{
Limiter: rate.NewLimiter(rate.Limit(config.Rate), config.Burst),
Fuse: config.Fuse,
})
continue
}

if inner, ok := value.(*innerLimiter); ok {
// 更新已有限流器
inner.Fuse = config.Fuse
if int(inner.Limiter.Limit()) != config.Rate {
inner.Limiter.SetLimit(rate.Limit(config.Rate))
}
continue
}

L.m.Delete(wt)
L.m.Store(wt, &innerLimiter{
Limiter: rate.NewLimiter(rate.Limit(config.Rate), config.Burst),
Fuse: config.Fuse,
})
}
}

func (L *Limiter) Close() {
if L.ticker != nil {
L.ticker.Stop()
L.ticker = nil
}
}

3.2.4 消費和獎勵類型限速協調

消費者類似于一個管道,消費限速相當于流入管道的流量限制,獎勵類型限速相當于流出管道的流量限制。當消費速度大于所有類型速度之和時,會導致請求重入隊列。減少重入隊列需要保證兩點:

  1. 消費限速和獎勵類型限速聯動,調整類型限速時消費速度自動調整適配
  2. 上游發放獎勵時,不同獎勵出現的概率分布和類型限速配置匹配

在春節活動中,獎勵發放的概率由算法策略控制。在紅包雨、煙火大會、集卡開獎等場景下,概率分布符合預期,沒有發生重入隊列。

3.3 獎勵服務設計

獎勵服務負責調用資產中臺服務和激勵中臺服務發放具體的獎勵。對上層提供全局冪等的保證、失敗托管重試、預算控制等能力。

由于上游存在使用同一個冪等 ID 發放不同獎勵的情況,且不同的下游系統之間數據隔離,故需要獎勵服務存儲所有發獎請求處理狀態及結果,用于保證全局冪等。發放請求使用公司自研的 Abase 進行存儲,同時利用了 Abase 提供的 CAS 能力,對獎勵發放行為進行了并發控制,確保同一個冪等 ID 僅能用于一次發放行為。上游重試請求的獎勵類型和數值需要和原始請求保持一致,才能通過校驗,進入真正的發放流程。

獎勵服務對外提供同步發獎和異步發獎兩類接口。對于需要感知獎勵發放結果的場景,上游需要使用同步發獎接口。例如獎勵事件消費者,需要明確感知發放是否成功,來決策是否需要重試等。同步接口穩定性和響應時延強依賴下游服務。部分獎勵下游發放邏輯較重,耗時較長,容易導致上游調用超時,穩定性降低。

對于無需實時感知發放結果,或對接口響應實驗非常敏感的場景,上游需要使用異步發獎接口。異步接口在通過預算控制,成功將消息投遞到消息隊列后返回。異步接口可以提升系統吞吐能力,降低上游等待時間。利用消息隊列的削峰和異步能力,獎勵服務可以直接承接中等規模(發放 QPS 在10萬到50萬)的發獎場景接入。對于大規模(發放 QPS 在50萬之上)的發獎場景,需要通過獎勵 SDK 接入。相對于同步接口,異步接口支持通用的失敗重試邏輯和異常處理能力,接入方無需再次開發相關邏輯,可降低研發投入。

3.3.1 同步發獎

同步發獎接口會實時返回下游系統返回的入賬結果。對于失敗請求由上游服務負責處理,獎勵服務不進行托管。獎勵同步發放的流程如下圖所示:

圖片

上述流程圖中,寫消息隊列、添加記錄節點可以根據場景要求,可設置為強依賴節點,也可設置為弱依賴節點。當寫消息隊列和添加記錄節點被設置為弱依賴時,獎勵服務不能嚴格保證全局冪等,此時的冪等性需要下游系統保證;在消息隊列和 Abase 存儲系統出災時,獎勵服務可正常對外提供服務。

3.3.2 異步發獎

上游調用異步發獎接口雖然不會實時返回發放結果,但會在上游請求時同步調用預算控制服務進行扣減預算。異步發獎流程中,發獎請求成功寫入消息隊列后,立即返回。后續發獎流程由獎勵系統的消費者服務通過消費消息觸發,并保證最終成功入賬。

異步發獎請求處理過程中,收到下游系統返回的不可重試錯誤時,會將異常請求寫入專用的失敗隊列并落 Hive 表存檔,以便后續處理。

3.3.3 預算控制

預算控制是保證資金安全的手段之一。在春節活動中,除活動玩法自身的頻控邏輯和預算控制策略外,獎勵系統、資產中臺和下游賬戶服務都有自身的預算控制策略。

獎勵系統中場景預算通過動態配置 TCC 配置,可支持動態調整。預算消耗情況通過 KV 存儲,為防止出現熱點 Key,根據接入場景的流量大小做了分 Key,單預算 Key 承載小于500 QPS 的請求。進行預算扣減時,通過對唯一訂單號進行哈希求余來決定具體的預算 Key,并在預算 Key 的 Value 中存儲若干條最新的訂單號,基于存儲系統的 CAS 能力提供有限的預算扣減冪等能力。若在單預算 Key 上產生較高的并發請求,存儲的訂單號被淘汰的情況下發生超時重試,會導致預算超扣。進行預算配置時,做了一定比例的超配,防止因為流量不均和預算超扣導致誤攔截。

資產中臺系統中,基于 Redis 執行 Lua 腳本的能力,實現了多 Key 事務預算控制方案,提供了相對嚴格的預算控制能力。在下游的賬戶服務中,基于關系型數據的事務能力進行了嚴格的預算控制,保證在活動場景不會發生超發。

4. 總結

春節活動于2022年1月24日正式上線,2022年1月31日(除夕)結束,共持續7天。活動期間通過獎勵系統發放各類獎勵約70億筆,僅除夕當天就發放20億筆。在多場紅包雨中,獎勵系統從生產端到消費端做到了全部消息的可靠處理,離線對賬未檢測到任何有效差異,現金獎勵全部成功入賬。

在春節活動中對相關服務的性能、穩定性和可靠性有著極高的要求。在設計技術方案時,技術選型和常規需求有所不同,需要在可供選擇的組件中權衡性能和可靠性。降低系統復雜度,減少外部依賴,并對依賴部分進行充分的深入的了解是保證整個系統穩定可靠的關鍵。

責任編輯:未麗燕 來源: 字節跳動技術團隊
相關推薦

2018-01-18 16:25:01

潤乾報務猿

2022-06-23 11:19:14

抖音春節發券

2022-04-01 15:41:35

字節春節活動用戶

2021-02-10 07:45:21

APP手機熱點推薦

2014-10-24 16:18:36

移動

2021-02-04 10:00:00

斑馬技術

2021-02-15 18:34:27

數字人民幣數字貨幣區塊鏈

2009-05-19 09:27:38

運維管理春節晚會摩卡

2021-05-26 14:43:50

技術

2016-09-22 14:09:47

2010-12-09 11:31:10

跳槽

2017-06-06 09:46:46

互聯網

2011-07-12 15:24:17

前沿網絡BDN深信服

2016-04-11 17:09:57

慧聰網

2022-09-07 18:13:15

智能數據

2019-11-13 11:03:10

華為獎金美國

2011-05-10 15:17:31

PHP技術高峰論壇

2013-11-18 16:21:26

華為智慧城市

2018-10-25 09:47:11

Gartner智能交通智能停車

2016-01-13 14:54:50

京東京東大腦
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 91在线导航| 国产高清久久久 | 午夜噜噜噜 | 欧美激情在线精品一区二区三区 | 成人久久视频 | 精品久久久久久亚洲精品 | 久久99网站| 国产无人区一区二区三区 | 欧美一区二区三区免费在线观看 | 91av视频在线播放 | 日日操日日干 | 日韩免费一区 | 一区二区在线免费播放 | 国产精品一区二区在线 | 国产精品亚洲精品 | 视频一区二区在线观看 | 国产精品伦理一区二区三区 | 久久精品视频免费观看 | 久久综合av | 亚洲精品一 | 亚洲精品中文字幕在线观看 | 国产精品毛片一区二区在线看 | 精品一区二区免费视频 | 国产wwwcom| 欧美视频在线看 | 国产无套一区二区三区久久 | 久久精品一区二区三区四区 | 国内精品视频免费观看 | 一区二区三区免费 | 在线日韩av电影 | 欧美性吧| 国产精品久久久久久久久久东京 | 激情av在线 | 国产一区三区视频 | 国产电影一区二区在线观看 | 精品一区二区三区四区五区 | 一区二区国产精品 | 一本一道久久a久久精品综合蜜臀 | 免费一区二区 | 草草草影院 | 国产精品精品3d动漫 |