成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Kafka Exactly Once 語義實現原理:冪等性與事務消息

開發 架構
通過本文的深入分析,我們了解到 Kafka 的事務消息功能是如何在流處理場景中提供 Exactly-Once 語義的。Kafka 通過其事務 API 和內部機制,實現了消息發送的原子性、最終一致性、隔離性和持久性,盡管在實際應用中可能存在一些性能和功能上的限制。

1、前言   

在現代分布式系統中,確保數據處理的準確性和一致性是至關重要的。Apache Kafka,作為一個廣泛使用的流處理平臺,提供了強大的消息隊列和流處理功能。隨著業務需求的增長,Kafka 的事務消息功能應運而生,它允許應用程序以一種原子的方式處理消息,即要么所有消息都被正確處理,要么都不處理。本文將深入剖析 Kafka 的 Exactly-Once 語義實現原理,包括冪等性與事務消息的關鍵概念,以及它們是如何在 Kafka 中實現的。我們將探討 Kafka 事務的流程,事務提供的 ACID 保證,以及在實際應用中可能遇到的一些限制。無論您是 Kafka 的新手還是經驗豐富的開發者,本文都將為您提供有價值的見解和指導。

2、消息隊列的事務場景

Kafka 目前用于流處理的場景:相當于一個有向無環圖(DAG,Directed acyclic graph)每個節點是一個 Kafka Topic,每條邊是一個流處理操作。在這樣的場景下,有兩種操作:

? 消費上游消息并提交位點

? 處理消息并發送到下游 Topic

對于由這兩種操作構成的一組處理流程需要具備事務語義,這樣我們就可以不重復(Exactly Once)的處理上游消息并將結果可靠地存儲在下游 Topic 中。

圖片圖片

上圖是一個典型的 Kafka 事務的流程,我們可以看到:MySQL 的 binlog 作為上游數據源將數據寫入到 Kafka 中,Spark Streaming 從 Kafka 中讀取數據并進行處理,最后將處理結果寫入到另外兩個 Topic 中(圖中三個 Topic 位于同一集群中)。其中消費 Topic A 與寫入 Topic B 和 Topic C 的操作具備事務語義。

3、Kafka 的 Exactly Once 語義

從上述的場景中我們可以發現,事務消息最主要的動機是在流處理中實現 Exactly Once 的語義,這可以分為:

? 僅發送一次: 單分區僅發送一次由生產者冪等保證,多分區僅發送一次由事務機制保證

? 僅消費一次: Kafka 通過消費位點的提交來控制消費進度,而消費位點的提交被抽象成向系統 topic 發送消息。這就使得發送和消費行為統一起來,只要解決了多分區發送消息的一致性就能實現 Exactly Once 語義

4、生產者冪等性

在創建 Kafka 生產者時設置了 enable.idempotence 參數,用于開啟生產者冪等性。

val props = new Properties()
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")


val producer = new KafkaProducer(props)

Kafka 的發送冪等是通過序列號來實現的,每個消息都會被分配一個序列號,序列號是遞增的,這樣就可以保證消息的順序性。當生產者發送消息時,會將消息的序列號和消息內容一起寫入到日志文件中,下次收到非預期序列號的消息就會返回 OutOfOrderSequenceException 異常。

設置 enable.idempotence 參數后,生產者會檢查以下三個參數的值是否合法(ProducerConfig#postProcessAndValidateIdempotenceConfigs)

? max.in.flight.requests.per.connection 必須小于 5

? retries 必須大于 0

? acks 必須設置為 all

Kafka 將消息的序列號信息保存在分區維度的 .snapshot 文件中,格式如下(ProducerStateManager#ProducerSnapshotEntrySchema):

圖片圖片

我們可以發現,該文件中保存了 ProducerId、ProducerEpoch 和 LastSequence。所以冪等的約束為:相同分區、相同 Producer(id 和 epoch) 發送的消息序列號需遞增。即 Kafka 的生產者冪等性只在單連接、單分區生效,Producer 重啟或消息發送到其他分區就失去了冪等性的約束。

.snapshot 文件在 log segment 滾動時更新,發生重啟后通過讀取 .snapshot 文件和最新的日志文件即可恢復 Producer 的狀態。Broker 的重啟或分區遷移并不會影響冪等性。

5、事務消息流程

我們首先從 Demo 開始,來看一下如何使用 Kafka 客戶端完成一個事務:

// 事務初始化
val props = new Properties()
...
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")


val producer = new KafkaProducer(props)
producer.initTransactions()
producer.beginTransaction()


// 消息發送
producer.send(RecordUtils.create(topic1, partition1, "message1"))
producer.send(RecordUtils.create(topic2, partition2, "message2"))


// 事務提交或回滾
producer.commitTransaction()

5.1 事務初始化

Kafka Producer 啟動后我們使用兩個 API 來初始化事務:initTransactions 和 beginTransaction。

回顧一下我們的 Demo,在發送消息時是發送到兩個不同分區中,這兩個分區可能在不同的 Broker 上,所以我們需要一個全局的協調者 TransactionCoordinator 來記錄事務的狀態。

所以,在 initTransactions 中,Producer 首先發送 ApiKeys.FIND_COORDINATOR 請求獲取 TransactionCoordinator。

之后即可向其發送 ApiKeys.INIT_PRODUCER_ID 請求獲取 ProducerId 及  ProducerEpoch(也是上文中用于冪等的字段)。此步驟生成的 id 和 epoch 會寫入內部 Topic __transaction_state 中,并且將事務的狀態置為 Empty。

__transaction_state 是 compaction Topic,其中消息的 key 為客戶端設置的transactional.id(詳見 TransactionStateManager#appendTransactionToLog)。

區別于 ProducerId 是服務端生成的內部屬性;TransactionId 由用戶設置,用于標識業務視角認為的“同一個應用”,啟動具有相同 TransactionId 的新 Producer 會使得未完成的事務被回滾并且來自舊 Producer(具有較小 epoch)的請求被拒絕掉。

后續 beginTransaction 用于開始一個事務,該方法會創建一個 Producer 內部事務狀態,標識這一個事務的開始,并不會有 RPC 產生。

5.2 消息發送

上一節說到 beginTransaction 只是更改 Producer 內部狀態,那么在第一條消息發送時才隱式開啟了事務:

首先,Producer 會發送 ApiKeys.ADD_PARTITIONS_TO_TXN 請求到 TransactionCoordinator。TransactionCoordinator 會將這個分區加入到事務中,并更改事務的狀態為 Ongoing,這些信息被持久化到 __transaction_state 中。

然后 Producer 使用 ApiKeys.PRODUCE 請求正常發送消息到對應的分區中。這條消息的可見性控制在下文消息消費一節中會詳細討論。

5.3 事務提交與回滾

當所有消息發送完成后,Producer 可以選擇提交或回滾事務,此時:

? TransactionCoordinator:具有當前事務所有相關分區的信息

? 其他 Broker:已經將消息持久化到日志文件中

接下來 Producer 調用 commitTransaction 會發送 ApiKeys.END_TXN 請求將事務狀態更改為 PrepareCommit(回滾事務對應狀態 PrepareAbort)并持久化到 __transaction_state 中,此時從 Producer 的視角來看整個事務已經結束了。

TransactionCoordinator 會異步向各個 Broker 發送 ApiKeys.WRITE_TXN_MARKERS 請求,當所有參加事務的 Broker 都返回成功后,TransactionCoordinator 會將事務狀態更改為 CompleteCommit(回滾事務對應狀態 CompleteAbort)并持久化到 __transaction_state 中。

5.4 消息的消費

某個分區的消息可能是事務消息與非事務消息混雜的,如下圖所示:

圖片圖片

在 Broker 處理 ApiKeys.PRODUCE 請求時,完成消息持久化會更新 LSO 到第一條未提交的事務消息的 offset。這樣在消費者消費消息時,可以通過 LSO 來判斷消息是否可見:如果設置了 isolation.level 為 read_committed 則只會消費 LSO 之前的消息。

LSO(log stable offset): 它表示的是已經被成功復制到所有副本(replicas)并且可以被消費者安全消費的消息的最大偏移量。 

但是我們可以發現 LSO 之前存在已回滾的消息(圖中紅色矩形)這些消息應該被過濾掉:在 Broker 處理 ApiKeys.WRITE_TXN_MARKERS 請求時,會將已回滾的消息索引寫入到 .txnindex 文件中(LogSegmentKafka#updateTxnIndex)。

后續 Consumer 消費消息時還會收到對應區間的已取消事務消息列表,上圖區間中的該列表為:

圖片圖片

代表 offset 在 [2,5] 之間且由 id 為 11 的 Producer 發送的消息都已回滾。

上文我們討論了 __transaction_state 的實現確保同一時間,同一 TransactionId 有且只有一個事務在進行中。所以可以使用 ProducerId 和 offset 區間定位回滾的消息不會發生沖突。

6、Kafka 事務提供的 ACID 保證

? 原子性(Atomicity)

Kafka 通過對 __transaction_state Topic 的寫入實現了事務狀態的轉移,保證了事務要么同時提交,要么同時回滾。

? 一致性(Consistency)

在事務進入 PrepareCommit 或 PrepareAbort 階段時, TransactionCoordinator 異步向所有參與事務的 Broker 提交或回滾事務。這使得 Kafka 的事務做不到強一致性,只能通過不斷重試保證最終一致性。

? 隔離性(Isolation)

Kafka 通過 LSO 機制和 .txnindex 文件來避免臟讀,實現讀已提交(Read Committed)的隔離級別。

? 持久性(Durability)

Kafka 通過將事務狀態寫入到 __transaction_state Topic 和消息寫入到日志文件中來保證持久性。

7、Kafka 事務的限制

從功能上看,Kafka 事務并不能支持業務方事務,強限制上游的消費和下游寫入都需要是同一個 Kafka 集群,否則就不具備原子性保障。

從性能上看,Kafka 事務的性能開銷主要體現在生產側:

開啟事務時需要額外的 RPC 請求定位 TransactionCoordinator 并初始化數據

消息發送需要在發送消息前向 TransactionCoordinator 同步請求添加分區,并將事務狀態的變化寫入到 __transaction_state Topic

事務提交或回滾時需要向所有參與事務的 Broker 發送請求

對于涉及分區較少且消息數量較多的事務,事務的開銷可以被均攤;反之,較多的同步 RPC 帶來的開銷會極大影響性能。并且每個生產者只能有一個事務在進行中,這就意味著事務的吞吐量會受到限制。

消費側也有一定的影響:消費者只能看到 LSO 以下的消息,并且需要額外的索引文件來過濾已回滾的消息,這無疑會增加端到端的延遲。

8、總結   

通過本文的深入分析,我們了解到 Kafka 的事務消息功能是如何在流處理場景中提供 Exactly-Once 語義的。Kafka 通過其事務 API 和內部機制,實現了消息發送的原子性、最終一致性、隔離性和持久性,盡管在實際應用中可能存在一些性能和功能上的限制。開發者和架構師應當充分理解這些概念,并在設計系統時考慮如何有效地利用 Kafka 的事務功能,以構建更加健壯和可靠的數據處理流程。

AutoMQ 是構建于對象存儲之上的云原生 Kafka fork,在解決了 Kafka 已有的成本和彈性問題基礎上對 Kafka 100%兼容,因此在 AutoMQ 上也可以使用 Kafka 事務消息。AutoMQ 作為國內 Kafka 生態的忠實擁護者,我們將持續為 Kafka 技術愛好者帶來優質的 Kafka 技術內容分享,歡迎關注我們。

責任編輯:武曉燕 來源: AutoMQ
相關推薦

2025-03-12 07:55:46

2024-11-01 09:28:02

2023-08-29 13:53:00

前端攔截HashMap

2021-03-08 08:48:02

應用場景項目

2020-06-22 18:54:39

消息隊列冪等性

2024-03-13 15:18:00

接口冪等性高并發

2017-04-03 21:23:44

消息總線冪等性消息

2023-12-18 09:46:13

Kafka集群開發

2021-04-14 17:18:27

冪等性數據源MySQL

2022-05-05 07:49:54

業務冪MySQL索引

2025-02-23 08:00:00

冪等性Java開發

2024-11-07 11:17:50

2021-04-09 10:03:12

大數據exactly-onc

2021-01-18 14:34:59

冪等性接口客戶端

2020-10-18 07:25:55

MQ消息冪等架構

2021-02-01 08:41:45

Flink語義數據

2023-09-01 15:27:31

2023-03-07 08:19:16

接口冪等性SpringBoot

2024-06-24 01:00:00

2024-08-29 09:01:39

點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 精品久久久久国产免费第一页 | 一区视频在线播放 | 日日干天天干 | 成人在线视频网站 | 成人国产免费视频 | 亚洲国产精品99久久久久久久久 | 九九热精品在线视频 | 91精品久久久久久久久久入口 | 中文字幕国产一区 | 爱操影视| 国产日韩欧美在线观看 | 国产精品一区二区在线 | 亚洲欧美一区二区三区在线 | 成人综合在线视频 | 日韩不卡在线观看 | av黄色免费 | 伊人在线 | 日韩有码一区二区三区 | www.欧美 | 在线免费中文字幕 | 亚洲色图第一页 | 亚洲免费在线 | 日本精品视频在线观看 | 一区二区不卡视频 | 免费黄色在线 | 欧美日韩视频一区二区 | 亚洲va在线va天堂va狼色在线 | 日日干夜夜操 | 久久久久亚洲视频 | 日本精品视频 | av一级久久 | 久久久精品国产 | 欧美黄色免费网站 | 一区二区三区视频免费看 | 亚洲精品1区 | 免费看a | 金莲网| 欧美国产亚洲一区二区 | 国产婷婷精品av在线 | 国产激情在线 | 国产精品久久久久久久久久久久 |