成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

成為 Kafka 高手的秘籍:生產者深度實踐總結

新聞 系統運維 Kafka
kafka 是一款已經發布了近10年的分布式消息隊列系統,是一款非常成熟的產品,在各大公司或者產品中或多或少都有他的身影,特別是大數據流處理,log 流處理之類的場景,kafka 更是充當著幾乎必不可少的角色。

 作者簡介

張晉尉,騰訊云消息隊列專項支持團隊成員,kafka,puslar資深開發者,kafka sdk貢獻者,在流式數據處理,消息隊列方向有多年實踐經驗。

Kafka 簡介

kafka 是一款已經發布了近10年的分布式消息隊列系統,是一款非常成熟的產品,在各大公司或者產品中或多或少都有他的身影,特別是大數據流處理,log 流處理之類的場景,kafka 更是充當著幾乎必不可少的角色。

這款消息隊列在官方給出的定義中被稱為“分布式流式處理平臺”,其主要目的是在大數據流處理中承擔著存儲記錄流的一個作用,不過到了現在這個年代,越來越多的業務架構更傾向于將 kafka 當作消息隊列來使用,用來取代比較厚重且性能有限的 RabbitMQ。

kafka 這樣一個系統為了確保其簡潔性和高性能,其實將很多邏輯細節和配置放到了 client 端,所以我們將從客戶端的視角出發,從使用者的角度通過生產者和消費者兩個方面來介紹 kafka 在實踐生產中遇到的一些問題和相應的技術細節。本文是系列文章的第一篇,介紹生產者。

標準 producer API 簡介

這里我們先介紹下最經常使用的生產者 API,相信看本文各位已經是 kafka 使用的熟手了,不過為了后續介紹可能會使用的一些術語,我們還是先復習下 kafka 基礎概念,這里我們只關注于生產這部分,忽略其他的無關細節。

首先我們畫出生產者和 kafka 交互的一張圖,這張圖用于描述生產者消息數據的流向和 kafka server 為了接受消息需要用到什么組件。

圖片如上,現在讓我們分別介紹下圖上所繪內容以及相應的專業術語

  • Client 指的是將會寫入消息的多個不同的客戶端,這里的客戶端是一個抽象化的概念,只要和 kafka server建立了連接,將會寫入消息到 kafka 中,無論是否在同一個服務器或者一個進程中,我們都把它稱為一個 client。
  • Broker 指的是加入到了集群里面的服務器,這是一個物理層面的機器節點。一臺機器上部署了 kafka,并且加入到了 kafka 集群,那么這臺機器就是 kafka 集群的一個節點。一般情況下,一臺機器只會部署一個 kafka 服務。
  • Topic 是一個抽象的概念,主要的作用是將用戶處理的消息分為不同的類別,同時每個 topic 可以具有 topic緯度的一些配置,比如消息最大大小之類的,topic 下會創建不同數量的 partition 去實際的承載消息,這里值得注意的是 kafka 和 RabbitMQ 不一樣的地方,kafka 每創建一個 topic 都會在 broker 上去創建對應數量的 partition,所以 kafka 的 topic 數量是有限的,而且盡量不能太多。
  • 而 RabbitMQ 的 topic 是一個路由概念,創建非常大數量的 topic 并不會實際創建承載的隊列,而只是在訂閱分發的時候執行不同的路由策略。這是很多從 RabiitMQ 切換到 kafka 的用戶比較常見的問題。
  • Partiotion 是 topic 的分區,用來實際的去承載消息,每個 partition 之間是沒有關聯的,他們各自有各自的順序和消息內容,以及記錄的 offset。每個生產者的消息都會寫入到一個 partition 中去,生產者自己會根據自己的算法去選擇 partition 去寫入。
  • Replicas 是 partition 的副本,是物理和概念兩個層面的最小單位,它會將自己綁定到 broker上,每個partition 至少都需要有一個 replicas,它是消息實際寫入的地方。當 partition 有多個 replicas 的時候,控制器會決定哪一個 replicas 會是 leader。消息始終會被寫入到 leader 中,然后 leader 會同步數據到其他為 follower 狀態的 replicas 中,所以如上圖所示,client 的消息在選中寫入到某個 partition 中后,實際上,client 會去連接 replicas leader 所在的 broker,然后把消息寫進去。

現在我們開始講述下關于生產者 API 的使用和一些在生產的時候需要注意的配置,這里的生產者 API 指的是 kafka 提供的幾乎無狀態的 API,非常的輕便,同時也可以提供非常不錯的性能。不過如果使用這個 API 來進行生產,kafka 只保證最少一次和最多一次語義。

  • 最少一次(at-least-once) 也就是說明消息可能重復,如果消息重復,那么消費者需要在消費的時候進行去重。
  • 最多一次(at-most-once) 用的比較少,一般是大量不重要的數據處理的時候,容忍丟失數據的情況下,可以提供比較優秀的性能,生產端生產消息并投遞后就不再關注是否成功了,也不會進行重試。
  • 恰好一次(exactly-once) 這個語義在當前這個生產者api中是不提供的,不過由于大量的流式計算系統都需要保證 exactly once semantics,而且 kafka 也推出了kafka stream 這樣的流式處理框架,所以后續新版本的 kafka 提供了事物消息來確保恰好一次語義,我們將在本文后續章節討論這個問題。

接下來我們通過一個代碼片段的實例來看下如何使用生產者 api,同時看一下一些重要的配置,首先讓我們創建一個有兩個 partition,每個 partition 都有兩個 replicas 的 topic

  1. bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 2 --partitions 2 --topic test 

然后讓我們寫一段 java 生產者片段,代碼非常簡單,只是配置一些生產者 client 的相關配置,然后調用 producer的 send 方法將需要發送的消息提交到 kafka 的 client 庫

  1. Properties props = new Properties(); 
  2.  
  3. props.put("bootstrap.servers""localhost:9092"); 
  4.  
  5. props.put("acks""all"); 
  6.  
  7. props.put("retries",3); 
  8.  
  9. props.put("retry.backoff.ms",2000); 
  10.  
  11. props.put("compression.type","lz4"); 
  12.  
  13. props.put("batch.size"16384); 
  14.  
  15. props.put("linger.ms",200); 
  16.  
  17. props.put("max.request.size",1048576); 
  18.  
  19. props.put("key.serializer""org.apache.kafka.common.serialization.StringSerializer"); 
  20.  
  21. props.put("value.serializer""org.apache.kafka.common.serialization.StringSerializer"); 
  22.  
  23. props.put("request.timeout.ms"10000); 
  24.  
  25. props.put("max.block.ms"30000); 
  26.  
  27. Producer<String, String> producer = new KafkaProducer<String, String>(props); for (int i = 0; i < 1000; i++) { 
  28.  
  29. Future<RecordMetadata> future = producer.send(new ProducerRecord<>("test", UUID.randomUUID().toString())); 
  30.  
  31. System.out.println("produce offset:" + future.get().offset()); 
  32.  
  33.  
  34. producer.close(); 

通過以上代碼就可以完成消息的生產了,kafka 給我們提供的這個 API 的功能確實非常簡單易用,當然這里面實際上包含上比較多的細節,不過被 client 封裝了進去,這里我們繼續往深處挖掘下,看看隱藏在這段代碼里面的可能存在的坑。

首先我們來簡單分析下在這段代碼里面 client 會做什么,(注:這里我們更傾向于給出一個通用工作流程,所以可能會忽略部分 java客戶端獨有特性 )。

1. client 通過代碼中給出的 bootstrap.servers 去連接 broker,這里如果第一個broker 連接失敗,那么 client 會從左往右重試去連接,直到全部連接失敗或者某一個地址成功連接。

2. 當連接成功后,kafka client 會發起 ApiVersions request去kafka server 查詢server 端支持各個 api 以及每個 api 最大的支持版本。從而達到 kafka 一個向下兼容的目的。當然由于 ApiVersions 是一個大約在 0.10 版本加入的 api,所以新版client 如果訪問 0.9 版本的 kafka server 會引起ArrayIndexOutOfBoundsException 的報錯,這個錯誤 kafka 官方在 0.10 的時候修復了。

3. 接下來 client 會查詢將要發送消息的 topic 的元數據信息,向已經連接的 broker 發送 Metadata request,通過這個 api,kafka client 將會拿到集群 broker 的各種信息,包括 ip 和 port,以及 broker 對應的唯一 id,同時 client 也將獲取到 topic的相關信息,parition 的 id 和 partition 選擇出來的 leader replicas 所在 broker 的id,然后 client 將會建立 leader replicas 所在 broker 的連接,作為實際發送消息的數據鏈路。

在這里我們有三個細節需要注意

  • 第一點,如果 kafka 的 server 配置了 auto.create.topics.enable 為 true,那么如果 client 查詢了一個不存在的 topic 元數據,這個 topic 隨后會被 kafka server 自動創建。
  • 第二點,一般來說,kafka client 處理 Metadata 是一個定期刷新的動作,假如 Metadata 每過 30s 刷新一次,那么在這 30s 中,用戶修改了 topic 配置增加了一個 partition,client 是無法感知的,需要等待到client 更新了 Metadata,生產端才會知道這個 topic 多出了一個 partition,才能往新的 partition 寫入數據。
  • 那么如果在 Metadata 刷新時,由于 client 生產流量持續超過 kafka 配額限制,導致 kafka 限流,使得獲取 Metadata 數據一直重試和超時,這種極端情況下,client 可能會非常長一段時間無法感知到 partition 的新增。
  • 這種情況在生產實踐中也是發生過的,如果大家使用過程中發現了這種生產端遲遲不寫入任何消息到新建的 partition 的情況,那么多半可以從這個方向入手。
  • 第三點,從 kafka 的建立鏈接的邏輯來看,kafka 實際上是會建立一條更多的鏈接的,同時也會直接鏈接到集群中不同的 broker 上,所以這里如果要申請防火墻策略,那么一定要為每個 broker 都申請好策略,否則可能會出現,能夠拿到 Metadata,但無法生產消息的情況。

4.client 開始根據 message 中的 key 來計算 hash,確定這個 message 會被投遞到哪個 partition 中去,然后 client 投遞消息到本地的一個隊列中,實際連接到partition 的投遞者類,將從隊列中取出消息,然后 client 會做兩個檢查之后調用Produce request 去投遞消息。

  • 如果消息大于 max.request.size 則直接返回 RecordTooLargeException
  • 如果消息小于 batch.size 則等待后續消息,直到到消息大小總和大于 batch.size或者超過 linger.ms 規定的時間
  • client 將會啟動一個異步過程或者同步過程等待 Produce request 的返回,然后將依據配置的重試策略來執行重試或者返回發送失敗的錯誤到業務邏輯中,讓業務邏輯進行錯誤處理。

5. 在這一步中主要涉及到以下幾個配置

  • acks 為 -1 或者 all,代表所有處于 isr(in-sync) 列表中的 replicas 都寫入消息成功后才會返回成功給客戶端,同時在 topic 級別也提供了一個min.insync.replicas 配置,如果 isr 中的 replicas 少于這個配置的值,那么寫入同樣會失敗。這是 kafka 所能提供的最強約束了。
  • acks 為 0,代表消息只要投遞到 client 的 tcp socket 緩沖區后就認為已經發送出去了,client 不再關注是否 kafka 集群是否收到或者寫入成功。在這種模式下kafka 只提供 at-most-once 語義,在容忍數據丟失的情況下,是性能最好的模式。
  • acks 為 1 代表發送消息到 replicas leader 寫入成功就返回成功,不關注其余follower 是否寫入成功,如果投遞消息后,leader 馬上掛掉了,消息是會丟失的。
  • 這個模式在大多數時候可以確保消息不丟失,是一個性能和安全性權衡的模式。
  • server 將根據 client 提供的 acks 配置值來確定服務端的寫入會在什么情況下返回給客戶端
  • client 如果接收到 produce 寫入失敗,那么將會重試 retries 配置的次數,每次重試之間間隔 retry.backoff.ms 所定義的時間。重試次數耗盡之后才會返回失敗到業務邏輯。

以上就是整個 producer api 在使用過程中的一些細節了,明白了這些細節,在生產時遇到kafka的一些奇怪報錯就會有一些思路去定位和處理。當然從代碼上來看,代碼里面還有一些配置在上面的文章中沒有覆蓋到,這里我在一起介紹一下

  • compression.type 用于配置壓縮,kafka 提供不同的壓縮模式,包括 none(不壓縮),gzip,snappy,lz4,以及 zstd(需要2.1.0以上版本的kafka)。
  • 一般來說我們比較推薦 lz4 格式的壓縮,在比較輕的 cpu 負載下,可以提供不錯的壓縮比,和非常高的吞吐量,整體的性能和性價比會優于其他幾個壓縮方式,所以一般沒有強烈的壓縮比需求的話,使用 lz4 是比較好的選擇。
  • key.serializer 和 value.serializer 是序列化器,這個只是在 java 的客戶端中特有的,用于決定如何把 key 和 value 的值序列化,這里就不細說了。
  • request.timeout.ms 這個配置定義了網絡請求超時時間,任何一個 kafka client對于 server 的請求,如果在本參數規定時間內沒有收到答復,那么就都會取消請求并認為請求失敗,并將邏輯轉移到失敗處理邏輯,這個約束是比較強的約束。
  • max.block.ms 這個配置項定義了 client 內部的一個阻塞時間,比如如果內部的異步隊列滿了,kafka client 調用 send 會等待這樣一個時間,直到超時返回,這個參數需要注意的一點是用戶自定義配置的序列化器和分區器中花費的時間不會計入這個參數超時中。

冪等生產者(Idempotent Producer)簡介

冪等生產者提供了生產者在單一分區上的恰好一次語義,但是他不能覆蓋到生產者對于復數 partition 操作的一致性,這種一致性需要通過后續的事務消息來解決。

現在讓我們先看下冪等生產者如何使用,以及一些涉及到的細節。

為什么我們需要使用到冪等生產者,其主要的原因是生產者發送消息到服務端后,如果遇到了網絡問題導致連接斷開,生產者是無法感知到消息到底是寫入成功還是失敗,對于 kafka 一般的生產者 api 來說我們會設置 retries 參數,始終去進行重試,這也就是我們所謂至少一次語義,因為我們無法感知是否寫入成功,如果寫入成功,但是我們沒有接收到成功的回復,我們進行重試動作,就會導致消息的重復寫入,如果消息消費依賴于消息順序,這種重試甚至會導致順序的錯亂。

現在通過冪等生產者,kafka 可以在我們進行這樣的重試的時候丟棄掉這種重復寫入的消息。

現在讓我們看看如何使用冪等生產者。(這里讓我們來看下代碼,代碼中讓我們忽略掉一些不重要的配置)。

  1. Properties props = new Properties(); 
  2.  
  3. props.put("bootstrap.servers""localhost:9092"); 
  4.  
  5. props.put("enable.idempotence"true); 
  6.  
  7. Producer<String, String> producer = new KafkaProducer<String, String>(props); for (int i = 0; i < 1000; i++) { 
  8.  
  9. Future<RecordMetadata> future = producer.send(new ProducerRecord<>("test", UUID.randomUUID().toString())); 
  10.  
  11. System.out.println("produce offset:" + future.get().offset()); 
  12.  
  13.  
  14. producer.close(); 

在用戶的使用上,要啟動冪等生產者只需要添加設置 enable.idempotence 為 true 就好,讓我們繼續關注下細節,看看啟用冪等生產者后 kafka client 會做什么。首先 client會強制設置一些生產者的配置值。

 

  1. acks 會被強制設置為all,如果客戶本來使用的是0,1級別的 acks 那么用戶需要考慮下被設置為 all 的時候對于自己業務性能的影響,如果用戶本來就是設置為all的情況,那么使用冪等生產者是幾乎不會有額外代價的。
  2. retries 必須設置為大于1的數字,一般 librdkafka 和 java kafka client 會把 retries設置為一個非常大的數比如 Integer.MAX_VALUE,基本靠近于無限重試。確保消息一定會成功發送。
  3. max.inflight.requests.per.connection 必須小于5,其中 java kafka client 如果版本小于1.0.0,會把 max.inflight.requests.per.connection 設置為 1,確保一條數據鏈路上一次只有一個請求,這會導致一定情況下 tps 有所下降。
  4. 發送的消息格式必須是 v2 格式。不支持低版本的消息格式。

完成生產者配置之后,client 開始執行生產消息的發送,這里我們省略在上文提到過的生產 api 的邏輯,只關注于啟用冪等后多出來的邏輯和步驟

  1. 生產者 client 向 broker 發起 InitProducerId request 請求一個 PID,后續發送的消息,都會帶上這一個 PID 用于標明生產者的身份。
  2. 每個消息會帶上一個單調遞增的 Sequence ID。kafka server會記錄下同一個PID最后一次提交消息的 Sequence ID,如果當前發送的消息 Sequence ID 小于等于最后一次提交的 ID,那么 server 會認為當前消息已經過期了,并拒絕接受消息。client 收到這樣的拒絕請求后就可以感知到之前的消息一定是投遞成功了,并停止重試發送,丟棄掉消息。

通過以上的這些步驟,kafka 確保了每個消費者對于單 partition 操作的一個冪等性,這是一個非常實用的功能,特別是在使用消費者 api 的時候本來就已經設置了acks 為 all 的業務,啟用生產者冪等幾乎沒有額外消耗,這也是一個 kafka 推出了比較久的功能了(從 kafka 0.11 開始支持),但是目前看起使用本功能的用戶還是比較少。

事務消息(Transactional Messaging)簡介

事務消息是目前 kafka 為了確保恰好一次語義所提供的最強約束,他確保了一個生產者如果生產多個相互關聯的消息到不同的 partition 上時要么最后同時成功,要么同時失敗。同時啟用事物消息的前提必須啟用冪等生產者,所以單 partition 上的恰好一次語義就由冪等的特性來保證。

不過一般很少有業務會直接使用 kafka 的事物消息,會涉及使用事物消息的業務其實基本上都是通過 kafka stream 進行流處理,而 kafka stream 依賴于事務消息并且對于業務隱藏掉了事務細節,所以這里我們來看看如何直接使用事務消息并繼續嘗試分析下client 在這期間做了什么,先讓我們放出一份代碼片段。

  1. Properties props = new Properties(); 
  2.  
  3. props.put("bootstrap.servers""localhost:9092"); 
  4.  
  5. props.put("enable.idempotence""true"); 
  6.  
  7. props.put("transactional.id""testtrans-1"); 
  8.  
  9. KafkaProducer<String, String> producer = new KafkaProducer(props); 
  10.  
  11. producer.initTransactions(); try
  12.  
  13. producer.beginTransaction(); 
  14.  
  15. producer.send(record0); 
  16.  
  17. producer.send(record1); 
  18.  
  19. producer.sendOffsetsToTxn(…); 
  20.  
  21. producer.commitTransaction(); 
  22.  
  23. catch( ProducerFencedException e) { 
  24.  
  25. producer.close(); 
  26.  
  27. catch( KafkaException e ) { 
  28.  
  29. producer.abortTransaction(); 
  30.  

首先代碼里面執行 initTransactions 作為第一步,在這個邏輯中 client 將會請求 InitProducerId 并傳遞事務 id,用來建立一個事務 id 和 PID 一對一的關系。如果有多個生產者加入到同一個事務 id 中,前面加入的生產者都會被后面加入的替代。前面生產者的請求都會被拒絕。

值得注意的是如果 client 短線重新連接,它會在請求 InitProducerId 的時候提交之前使用的 PID 以及 epoch,如果成功隨后 server 會返回 epoch+1,同時會拒絕所有 epoch小于當前 epoch 的生產者消息,這是為了解決分布式系統中所謂的僵死問題。

然后接下來的代碼調用就和很多事務代碼一樣了,啟動一個事務,寫入所有需要寫入的信息,最后再 commit,如果失敗則回滾,如果成功就會一起提交所有寫入,然后做接下來的業務邏輯。一般大部分事務的實現都是一個狀態機,這里我們就放上一張圖不繼續分析下去了。

在看完了事務代碼后,我們似乎沒有提到 sendOffsetsToTxn 這個函數,這個函數實際上是用于當前事務消息是一個從一個 topic 消費,然后寫入到事務消息的時候使用的,消費的 offset 可以通過這個函數提交到協調者,后續在事物提交的時候再一并提交消費者消費掉的 offset。防止事務失敗的時候用戶還需要手動管理消費者 offset。是一個非常有用的幫助函數。

總結

到此為止,我們從客戶端視角出發簡單的去分析了 kafka 生產者的一些用法和相應需要注意的坑,由于作者的本篇文章是從工作中遇到的一些問題出發的,所以相應的如果某些地方用得多有人咨詢的多,那么可能會寫的稍微詳細一些,有的地方咨詢的人少,遇到的問題也相應比較少,可能就會簡略一些。

希望本文能夠讓各位讀者有所收獲,能夠對 kafka 生產者這部分有更好的了解。感謝各位的閱讀,讓我們下一篇文章 kafka 消費者再見。

 

 

責任編輯:張燕妮 來源: 高效運維
相關推薦

2021-07-05 06:26:08

生產者kafka架構

2021-09-09 06:55:43

kafka冪等生產者

2022-05-10 10:06:03

Kafka

2015-08-26 09:39:30

java消費者

2022-05-23 08:20:29

Kafka生產者元數據管理

2013-07-29 18:09:45

3D打印Autodesk

2021-12-28 12:01:59

Kafka 消費者機制

2020-04-17 14:49:34

Kafka分區數據

2009-08-13 13:14:31

C#生產者和消費者

2021-12-22 11:00:05

模型Golang語言

2024-03-14 11:58:43

2012-02-14 12:31:27

Java

2021-08-31 10:26:24

存儲

2024-10-11 09:27:52

2017-05-16 12:30:21

Python多線程生產者消費者模式

2025-04-29 01:10:00

Kafka高并發系統

2015-10-08 10:04:39

Python高手

2016-10-21 16:30:18

Linux操作系統

2016-10-21 20:27:03

Linux

2009-06-10 18:11:58

Java高手
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 天天躁日日躁aaaa视频 | 永久av | 精品在线看 | 中文字幕成人在线 | 99久久国产综合精品麻豆 | 国产午夜精品视频 | 日韩视频 中文字幕 | av中文天堂| 日韩一区av | 日韩欧美在线免费观看 | 亚洲午夜视频在线观看 | 中文字字幕一区二区三区四区五区 | 亚洲一区导航 | 99精品观看 | 亚洲福利| av中文字幕在线 | 久久在视频| 欧美黑人一区 | 91xxx在线观看 | 国产精品久久久久永久免费观看 | 91色网站 | 国产在线精品区 | 久草福利 | 成人欧美一区二区三区黑人孕妇 | 97久久精品 | 亚洲精品在线观看视频 | 亚洲综合99| 在线看免费的a | 中文字幕第一页在线 | 午夜天堂精品久久久久 | 涩涩鲁亚洲精品一区二区 | 精品国产色| 蜜臀久久 | 91精品国产综合久久婷婷香蕉 | 欧美网站一区 | 国产网站在线播放 | 欧美日韩精品在线一区 | 欧美1区2区| 综合久久av | 国产激情偷乱视频一区二区三区 | 亚洲国产成人精品久久久国产成人一区 |