Apache Pulsar 如何幫助 Iterable 擴展其客戶參與平臺
關(guān)鍵要點
- 分布式消息傳遞系統(tǒng)支持兩種類型的語義:流式傳輸和排隊。每個都最適合某些類型的用例。
- Apache Pulsar 的獨特之處在于它同時支持流式處理和排隊用例。
- Pulsar 的多層架構(gòu)允許用戶比其他消息傳遞系統(tǒng)更方便地擴展主題的數(shù)量和大小。
- Pulsar 提供了可擴展性、可靠性和功能的適當平衡,以取代 Iterable 的 RabbitMQ,并最終取代其他消息傳遞系統(tǒng),如 Kafka 和 Amazon SQS。
在 Iterable,我們每天代表客戶發(fā)送大量營銷信息。這些包括電子郵件、推送、SMS 和應(yīng)用內(nèi)消息。Iterable 還每天處理更多的用戶更新、事件和自定義工作流狀態(tài),其中許多可以觸發(fā)系統(tǒng)中的其他操作。這導致系統(tǒng)不僅對我們的客戶非常有用,而且非常復(fù)雜。隨著我們客戶群的增長,管理這種復(fù)雜性變得更加重要。
Iterable 管理復(fù)雜性的一種方法是在其架構(gòu)的幾個部分中使用分布式消息傳遞系統(tǒng)。分布式消息系統(tǒng)的主要目的是存儲需要消費者處理的消息,并在處理這些消息時跟蹤這些消費者的狀態(tài)。這樣,消費者可以專注于處理每條消息的任務(wù)。
Iterable 使用工作隊列方法來執(zhí)行客戶指定的營銷工作流、webhook 和其他類型的作業(yè)調(diào)度和處理。其他組件,例如用戶和事件攝取,使用流模型來處理有序的消息流。
通常,分布式消息傳遞系統(tǒng)支持兩種類型的語義:流式傳輸和排隊。每個都最適合某些類型的用例。
流式傳輸和排隊
在流式消息系統(tǒng)中,生產(chǎn)者將數(shù)據(jù)附加到一組僅附加的消息流中。在每個流中,消息必須按特定順序處理,并且消費者在流中標記它們的位置。可以使用某種策略(例如散列用戶 ID)對消息進行分區(qū)以允許更大的并行性,并且每個分區(qū)都充當單獨的數(shù)據(jù)流。因為每個流中的數(shù)據(jù)都是不可變的,并且只存儲了偏移量條目,所以消息可能不會被跳過。流式處理在消息順序很重要的情況下效果很好,例如數(shù)據(jù)攝取。Kafka和Amazon Kinesis是使用流語義來消費消息的消息系統(tǒng)示例。
在隊列消息系統(tǒng)中,生產(chǎn)者將消息發(fā)送到可能由多個消費者共享的隊列。消費者在收到消息時對其進行處理,并在處理每條消息時向排隊系統(tǒng)發(fā)送確認。因為多個消費者可能共享一個隊列并且消息序列并不重要,所以通常更容易擴展基于隊列的系統(tǒng)的消費者端。排隊系統(tǒng)非常適合不需要按特定順序執(zhí)行任務(wù)的工作隊列——例如,向多個收件人發(fā)送一封電子郵件。RabbitMQ和Amazon SQS是流行的基于隊列的消息系統(tǒng)的示例。
排隊系統(tǒng)通常包括簡化處理消息級錯誤任務(wù)的功能。例如,發(fā)生錯誤后,RabbitMQ 可以輕松地將消息傳輸?shù)教厥怅犃校谠撽犃兄斜A糁付ǖ臅r間,然后返回到原始隊列進行重試。它還可以否定地確認一條消息,以便在失敗后重新傳遞它。由于大多數(shù)消息隊列在確認消息后通常不會將消息存儲在積壓中,因此調(diào)試和災(zāi)難恢復(fù)更加困難,因為沒有要檢查的消息。
像 Kafka 這樣的基于流的系統(tǒng)可用于排隊用例,但有一些注意事項。事實上,許多用戶選擇此選項是因為這些系統(tǒng)通常提供卓越的性能。然而,這種解決方案可能是一個挑戰(zhàn),因為它給開發(fā)人員帶來了過度的負擔,無法處理嚴格的流排序所帶來的限制。如果消費者消費消息的速度很慢或需要在暫時失敗后重試處理,則同一流上其他消息的處理可能會延遲。一個常見的解決方案是通過將消息重新發(fā)布到另一個主題來重試處理,但這會引入復(fù)雜性,因為應(yīng)用程序邏輯必須管理額外的狀態(tài)。
為什么 Iterable 需要一個新的消息傳遞平臺
我們一直在大量使用 RabbitMQ,并依靠它的特性來處理內(nèi)部消息傳遞。我們大量使用生存時間 (TTL) 值,不僅用于固定長度的重試,還用于在消息處理中實現(xiàn)顯式延遲。例如,我們可能會延遲發(fā)送營銷電子郵件,以便營銷信息可以在每個收件人最有可能打開的時間發(fā)送給他們。我們還依靠否定確認來重試排隊的消息。
這是我們架構(gòu)的簡化版本:
當我們開始評估 Pulsar 時,上面提到的所有隊列都在 RabbitMQ 上,除了使用 Kafka 的攝取。Kafka 非常適合攝取,因為它提供了必要的性能和排序保證。Kafka 不適合其他用例,因為它缺乏必要的工作隊列語義。我們使用了許多 RabbitMQ 特有的特性,比如延遲,這一事實也使得尋找替代方案變得更具挑戰(zhàn)性。
當我們擴展我們的系統(tǒng)時,RabbitMQ 開始出現(xiàn)以下限制:
- 在高負載下,RabbitMQ 經(jīng)常遇到流量控制問題。流控制是一種在消息代理無法跟上時減慢發(fā)布者的機制,通常是因為內(nèi)存和其他資源限制。這阻礙了生產(chǎn)者發(fā)布的能力,從而導致其他領(lǐng)域的服務(wù)延遲和請求失敗。具體來說,我們注意到當大量消息的 TTL 同時過期時,流控制更頻繁地發(fā)生。在這些情況下,RabbitMQ 試圖一次性將過期消息傳遞到它們的目標隊列。這使 RabbitMQ 實例的內(nèi)存容量不堪重負,從而觸發(fā)了正常生產(chǎn)者的流控機制,阻止了他們的發(fā)布嘗試。
- 調(diào)試變得更加困難,因為 RabbitMQ 的代理在消息被確認后不存儲消息。換言之,無法為消息設(shè)置保留時間。
- 復(fù)制很難實現(xiàn),因為 RabbitMQ 中的復(fù)制組件對于我們的用例來說不夠健壯,導致 RabbitMQ 成為我們消息狀態(tài)的單點故障。
- RabbitMQ 難以處理大量隊列。由于我們有許多需要專用隊列的用例,我們通常一次需要超過 10,000 個隊列。在這個級別,RabbitMQ 遇到了性能問題,通常首先出現(xiàn)在管理界面和 API 中。
評估 Apache Pulsar
總的來說,Apache Pulsar似乎提供了我們需要的所有功能。雖然我們在 Pulsar 周圍看到的很多宣傳都將其與 Kafka 進行流式工作負載進行比較,但我們也發(fā)現(xiàn) Pulsar 非常適合我們的排隊需求。Pulsar 的共享訂閱功能允許將主題用作隊列,可能為同一主題中的不同訂閱者提供多個虛擬隊列。Pulsar 還原生支持延遲和預(yù)定消息,盡管這些功能在我們開始考慮 Pulsar 時是非常新的。
除了提供豐富的功能集之外,Pulsar 的多層架構(gòu)讓我們能夠比其他消息傳遞系統(tǒng)更方便地擴展主題的數(shù)量和大小。
Pulsar 的頂層由代理組成,它們接受來自生產(chǎn)者的消息并將它們發(fā)送給消費者,但不存儲數(shù)據(jù)。單個代理處理每個主題分區(qū),但代理可以輕松交換主題所有權(quán),因為它們不存儲主題狀態(tài)。這使得添加代理以增加吞吐量并立即利用新代理變得容易。這也使 Pulsar 能夠處理代理故障。
Pulsar 的底層BookKeeper將主題數(shù)據(jù)存儲在分段中,這些分段分布在整個集群中。如果需要額外的存儲空間,我們可以輕松地將 BookKeeper 節(jié)點(bookies)添加到集群中,并使用它們來存儲新的主題段。經(jīng)紀人與博彩公司協(xié)調(diào),以在每個主題發(fā)生變化時更新它的狀態(tài)。Pulsar 將 BookKeeper 用于主題數(shù)據(jù)也有助于它支持大量主題,這對于 Iterable 當前的許多用例至關(guān)重要。
在評估了幾個消息傳遞系統(tǒng)之后,我們認為 Pulsar 提供了可擴展性、可靠性和功能之間的適當平衡,以取代 Iterable 的 RabbitMQ,并最終取代其他消息傳遞系統(tǒng),如 Kafka 和 Amazon SQS。
第一個 Pulsar 用例:消息發(fā)送
Iterable 平臺最重要的功能之一是代表 Iterable 的客戶安排和發(fā)送營銷電子郵件。為此,我們將消息發(fā)布到客戶特定的隊列,然后讓另一個服務(wù)處理消息的最終呈現(xiàn)和發(fā)送。這些隊列是我們決定從 RabbitMQ 遷移到 Pulsar 的第一件事。
我們選擇營銷信息發(fā)送作為我們的第一個 Pulsar 用例有兩個原因。首先,因為發(fā)送包含了我們一些更復(fù)雜的 RabbitMQ 用例。其次,因為它代表了我們 RabbitMQ 使用的很大一部分。這不是風險最低的用例;然而,經(jīng)過廣泛的性能和可擴展性測試,我們認為這是 Pulsar 可以增加最大價值的地方。
以下是在 Iterable 平臺上創(chuàng)建的三種常見活動類型:
- 同時向所有收件人發(fā)送營銷信息的爆炸式營銷活動。假設(shè)客戶想要向過去一個月活躍的用戶發(fā)送電子郵件通訊。在這種情況下,我們可以在計劃活動時查詢ElasticSearch的用戶列表,并將它們發(fā)布到該客戶的 Pulsar 主題。
- 為每個收件人指定自定義發(fā)送時間的 Blast 營銷活動。發(fā)送時間可以是固定的(例如,“收件人當?shù)貢r區(qū)的上午 9 點”),也可以由我們的發(fā)送時間優(yōu)化功能計算得出。在每種情況下,我們都希望將排隊消息的處理延遲到指定時間。
- 用戶觸發(fā)的廣告系列。這些可以由自定義工作流程或用戶發(fā)起的交易(例如在線購買)觸發(fā)。用戶觸發(fā)的營銷發(fā)送是根據(jù)需要單獨完成的。
在上述每種情況下,在任何給定時間執(zhí)行的發(fā)送數(shù)量可能會有很大差異,因此我們還需要能夠向上和向下擴展消費者以適應(yīng)不斷變化的負載。
遷移到 Apache Pulsar
盡管 Pulsar 在負載測試中表現(xiàn)良好,但我們不確定它是否能夠在生產(chǎn)中維持高負載水平。這是一個特別關(guān)注的問題,因為我們計劃利用 Pulsar 的幾個新功能,包括否定確認和預(yù)定消息傳遞。
為了建立我們的信心,我們實現(xiàn)了一個并行管道,在該管道中我們向 RabbitMQ 和 Pulsar 發(fā)布消息;在這種情況下,我們在這些主題上設(shè)置消費者來確認排隊的消息而不實際處理它們。我們還模擬了消費延遲。這有助于我們了解 Pulsar 在特定生產(chǎn)環(huán)境中的行為。我們對測試主題和實際生產(chǎn)主題都使用了客戶級別的功能標志,因此我們可以逐一遷移客戶以進行測試,并最終用于生產(chǎn)用途。
在測試過程中,我們發(fā)現(xiàn)了 Pulsar 中的一些錯誤。例如,我們發(fā)現(xiàn)了與延遲消息相關(guān)的競爭條件,Pulsar 開發(fā)人員幫助識別和修復(fù)了這種情況。這是我們發(fā)現(xiàn)的最嚴重的問題,因為它導致消費者卡住,造成未消費消息的積壓。
我們還注意到一些與 Pulsar 消息批處理相關(guān)的有趣問題,這些問題在 Pulsar 生產(chǎn)者中默認啟用。例如,我們注意到 Pulsar 的積壓指標報告的是批次數(shù),而不是實際的消息數(shù)量,這使得為消息積壓設(shè)置警報閾值更具挑戰(zhàn)性。后來我們在否定確認和批處理之間的交互中發(fā)現(xiàn)了一個更嚴重的錯誤,最近已經(jīng)修復(fù)了。最終,我們認為批處理不值得麻煩。幸運的是,在 Pulsar 生產(chǎn)者中禁用批處理很容易,并且沒有批處理的性能足以滿足我們的需求。這些問題也可能在即將發(fā)布的版本中得到修復(fù)。
延遲和否定確認在當時是相對較新的功能,因此我們預(yù)計我們可能會發(fā)現(xiàn)一些問題。這就是為什么我們選擇在幾個月內(nèi)緩慢遷移到 Pulsar,最初發(fā)布僅用于測試主題,然后逐漸遷移真實發(fā)送。這種方法使我們能夠在問題成為我們客戶的問題之前發(fā)現(xiàn)問題。盡管花了大約六個月的時間才完全相信 Pulsar 正在按預(yù)期工作,但結(jié)果值得花時間。
在大約六個月的時間里,我們將整個營銷發(fā)送操作遷移到了 Pulsar。遷移完成后,我們發(fā)現(xiàn) Pulsar 將我們的運營成本降低了近一半,并且在我們增加新客戶時還有增長空間。成本降低顯著,部分原因是我們的 RabbitMQ 實例被過度配置以彌補性能問題。迄今為止,我們的 Pulsar 集群已經(jīng)順利運行了六個多月,沒有出現(xiàn)任何問題。
實施和工具
Iterable 主要在后端使用Scala,因此為 Pulsar 提供良好的 Scala 工具對我們來說很重要。我們使用了優(yōu)秀的pulsar4s庫,并做出了許多支持新功能的貢獻,例如延遲消息。我們還提供了一個基于 Akka Streams 的連接器,用于將消息用作源,并提供單獨的確認支持。
例如,我們可以像這樣使用命名空間中的所有主題:
// Create a consumer on all topics in this namespace
val createConsumer = () => client.consumer(ConsumerConfig(
topicPattern = "persistent://email/project-123/.*".r,
subscription = Subscription("email-service")
))
// Create an Akka streams `Source` stage for this consumer
val pulsarSource = committableSource(createConsumer, Some(MessageId.earliest))
// Materialize the source and get back a `control` to shut it down later.
val control = pulsarSource.mapAsync(parallelism)(handleMessage).to(Sink.ignore).run()
我們喜歡為消費者使用正則表達式訂閱。它們使在創(chuàng)建新主題時自動訂閱它們變得容易,并使消費者不必了解特定的主題分區(qū)策略。同時,我們也在利用 Pulsar 支持大量主題的能力。由于 Pulsar 會在發(fā)布時自動創(chuàng)建新主題,因此為新消息類型甚至單個活動創(chuàng)建新主題很簡單。這也使得為不同的客戶和消息類型實施速率限制變得更加容易。
我們學到了什么
由于 Pulsar 是一個快速發(fā)展的開源項目,我們遇到了一些挑戰(zhàn)——主要是在加快速度和學習它的怪癖方面——我們可能不會在其他更成熟的技術(shù)中看到這些挑戰(zhàn)。文檔并不總是完整的,我們經(jīng)常需要向社區(qū)尋求幫助。也就是說,社區(qū)非常熱情和樂于助人,我們很高興更多地參與 Pulsar 的開發(fā)并參與有關(guān)新功能的討論。
Pulsar 的獨特之處在于它同時支持流式傳輸和排隊用例,同時還支持廣泛的功能集,使其成為我們架構(gòu)中當前使用的許多其他分布式消息傳遞技術(shù)的可行替代方案。Pulsar 涵蓋了我們所有的 Kafka、RabbitMQ 和 SQS 用例。這使我們能夠?qū)W⒂趪@單個統(tǒng)一系統(tǒng)構(gòu)建專業(yè)知識和工具。
自 2019 年初開始與 Pulsar 合作以來,我們一直對 Pulsar 的開發(fā)進展感到鼓舞,尤其是在初學者的準入門檻方面。工具有了很大的改進:例如,Pulsar Manager現(xiàn)在提供了一個非常方便的 GUI 來管理集群。我們還看到許多公司提供托管和托管的 Pulsar 服務(wù),這使初創(chuàng)公司和小型團隊更容易開始使用 Pulsar。
總體而言,Iterable 向 Pulsar 的過渡很有趣,有時也很有挑戰(zhàn)性,但到目前為止相當成功。在許多方面,我們的用例代表了一條尚未被廣泛采用的新路徑。我們預(yù)計會遇到一些問題,但我們的測試過程有助于最大限度地減少它們對客戶的影響。我們現(xiàn)在對使用 Pulsar 充滿信心,并將繼續(xù)擴展我們對 Pulsar 的使用,用于 Iterable 平臺中的其他現(xiàn)有和新組件。