Spring boot集成Kafka之spring-kafka深入探秘
前言
kafka是一個消息隊列產(chǎn)品,基于Topic partitions的設(shè)計,能達到非常高的消息發(fā)送處理性能。Spring創(chuàng)建了一個項目Spring-kafka,封裝了Apache 的Kafka-client,用于在Spring項目里快速集成kafka。除了簡單的收發(fā)消息外,Spring-kafka還提供了很多高級功能,下面我們就來一一探秘這些用法。
項目地址:https://github.com/spring-projects/spring-kafka
簡單集成
引入依賴
- <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.6.RELEASE</version></dependency>
添加配置
- spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
測試發(fā)送和接收
- /**
- * @author: kl @kailing.pub
- * @date: 2019/5/30
- */
- @SpringBootApplication
- @RestController
- public class Application {
- private final Logger logger = LoggerFactory.getLogger(Application.class);
- public static void main(String[] args) {
- SpringApplication.run(Application.class, args);
- }
- @Autowired
- private KafkaTemplate<Object, Object> template;
- @GetMapping("/send/{input}")
- public void sendFoo(@PathVariable String input) {
- this.template.send("topic_input", input);
- }
- @KafkaListener(id = "webGroup", topics = "topic_input")
- public void listen(String input) {
- logger.info("input value: {}" , input);
- }
- }
啟動應(yīng)用后,在瀏覽器中輸入:http://localhost:8080/send/kl。就可以在控制臺看到有日志輸出了:input value: "kl"。基礎(chǔ)的使用就這么簡單。發(fā)送消息時注入一個KafkaTemplate,接收消息時添加一個@KafkaListener注解即可。
Spring-kafka-test嵌入式Kafka Server
不過上面的代碼能夠啟動成功,前提是你已經(jīng)有了Kafka Server的服務(wù)環(huán)境,我們知道Kafka是由Scala + Zookeeper構(gòu)建的,可以從官網(wǎng)下載部署包在本地部署。但是,我想告訴你,為了簡化開發(fā)環(huán)節(jié)驗證Kafka相關(guān)功能,Spring-Kafka-Test已經(jīng)封裝了Kafka-test提供了注解式的一鍵開啟Kafka Server的功能,使用起來也是超級簡單。本文后面的所有測試用例的Kafka都是使用這種嵌入式服務(wù)提供的。
引入依賴
- <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><version>2.2.6.RELEASE</version><scope>test</scope></dependency>
啟動服務(wù)
下面使用Junit測試用例,直接啟動一個Kafka Server服務(wù),包含四個Broker節(jié)點。
- @RunWith(SpringRunner.class)@SpringBootTest(classes = ApplicationTests.class)@EmbeddedKafka(count = 4,ports = {9092,9093,9094,9095})public class ApplicationTests {@Testpublic void contextLoads()throws IOException { System.in.read(); }}
如上:只需要一個注解@EmbeddedKafka即可,就可以啟動一個功能完整的Kafka服務(wù),是不是很酷。默認(rèn)只寫注解不加參數(shù)的情況下,是創(chuàng)建一個隨機端口的Broker,在啟動的日志中會輸出具體的端口以及默認(rèn)的一些配置項。不過這些我們在Kafka安裝包配置文件中的配置項,在注解參數(shù)中都可以配置,下面詳解下@EmbeddedKafka注解中的可設(shè)置參數(shù) :
- value:broker節(jié)點數(shù)量
- count:同value作用一樣,也是配置的broker的節(jié)點數(shù)量
- controlledShutdown:控制關(guān)閉開關(guān),主要用來在Broker意外關(guān)閉時減少此Broker上Partition的不可用時間
Kafka是多Broker架構(gòu)的高可用服務(wù),一個Topic對應(yīng)多個partition,一個Partition可以有多個副本Replication,這些Replication副本保存在多個Broker,用于高可用。但是,雖然存在多個分區(qū)副本集,當(dāng)前工作副本集卻只有一個,默認(rèn)就是首次分配的副本集【首選副本】為Leader,負(fù)責(zé)寫入和讀取數(shù)據(jù)。當(dāng)我們升級Broker或者更新Broker配置時需要重啟服務(wù),這個時候需要將partition轉(zhuǎn)移到可用的Broker。下面涉及到三種情況
- 直接關(guān)閉Broker:當(dāng)Broker關(guān)閉時,Broker集群會重新進行選主操作,選出一個新的Broker來作為Partition Leader,選舉時此Broker上的Partition會短時不可用
- 開啟controlledShutdown:當(dāng)Broker關(guān)閉時,Broker本身會先嘗試將Leader角色轉(zhuǎn)移到其他可用的Broker上
- 使用命令行工具:使用bin/kafka-preferred-replica-election.sh,手動觸發(fā)PartitionLeader角色轉(zhuǎn)移
- ports:端口列表,是一個數(shù)組。對應(yīng)了count參數(shù),有幾個Broker,就要對應(yīng)幾個端口號
- brokerProperties:Broker參數(shù)設(shè)置,是一個數(shù)組結(jié)構(gòu),支持如下方式進行Broker參數(shù)設(shè)置:
- @EmbeddedKafka(brokerProperties = {"log.index.interval.bytes = 4096","num.io.threads = 8"})
- kerPropertiesLocation:Broker參數(shù)文件設(shè)置
功能同上面的brokerProperties,只是Kafka Broker的可設(shè)置參數(shù)達182個之多,都像上面這樣配置肯定不是最優(yōu)方案,所以提供了加載本地配置文件的功能,如:
- @EmbeddedKafka(brokerPropertiesLocation = "classpath:application.properties")
默認(rèn)情況下,如果在使用KafkaTemplate發(fā)送消息時,Topic不存在,會創(chuàng)建一個新的Topic,默認(rèn)的分區(qū)數(shù)和副本數(shù)為如下Broker參數(shù)來設(shè)定
創(chuàng)建新的Topic
- num.partitions = 1 #默認(rèn)Topic分區(qū)數(shù)
- num.replica.fetchers = 1 #默認(rèn)副本數(shù)
程序啟動時創(chuàng)建Topic
- /**
- * @author: kl @kailing.pub
- * @date: 2019/5/31
- */
- @Configuration
- public class KafkaConfig {
- @Bean
- public KafkaAdmin admin(KafkaProperties properties){
- KafkaAdmin admin = new KafkaAdmin(properties.buildAdminProperties());
- admin.setFatalIfBrokerNotAvailable(true);
- return admin;
- }
- @Bean
- public NewTopic topic2() {
- return new NewTopic("topic-kl", 1, (short) 1);
- }
- }
如果Kafka Broker支持(1.0.0或更高版本),則如果發(fā)現(xiàn)現(xiàn)有Topic的Partition 數(shù)少于設(shè)置的Partition 數(shù),則會新增新的Partition分區(qū)。關(guān)于KafkaAdmin有幾個常用的用法如下:
setFatalIfBrokerNotAvailable(true):默認(rèn)這個值是False的,在Broker不可用時,不影響Spring 上下文的初始化。如果你覺得Broker不可用影響正常業(yè)務(wù)需要顯示的將這個值設(shè)置為True
setAutoCreate(false) : 默認(rèn)值為True,也就是Kafka實例化后會自動創(chuàng)建已經(jīng)實例化的NewTopic對象
initialize():當(dāng)setAutoCreate為false時,需要我們程序顯示的調(diào)用admin的initialize()方法來初始化NewTopic對象
代碼邏輯中創(chuàng)建
有時候我們在程序啟動時并不知道某個Topic需要多少Partition數(shù)合適,但是又不能一股腦的直接使用Broker的默認(rèn)設(shè)置,這個時候就需要使用Kafka-Client自帶的AdminClient來進行處理。上面的Spring封裝的KafkaAdmin也是使用的AdminClient來處理的。如:
- @Autowired
- private KafkaProperties properties;
- @Test
- public void testCreateToipc(){
- AdminClient client = AdminClient.create(properties.buildAdminProperties());
- if(client !=null){
- try {
- Collection<NewTopic> newnewTopics = new ArrayList<>(1);
- newTopics.add(new NewTopic("topic-kl",1,(short) 1));
- client.createTopics(newTopics);
- }catch (Throwable e){
- e.printStackTrace();
- }finally {
- client.close();
- }
- }
- }
ps:其他的方式創(chuàng)建Topic
上面的這些創(chuàng)建Topic方式前提是你的spring boot版本到2.x以上了,因為spring-kafka2.x版本只支持spring boot2.x的版本。在1.x的版本中還沒有這些api。下面補充一種在程序中通過Kafka_2.10創(chuàng)建Topic的方式
引入依賴
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- <version>0.8.2.2</version>
- </dependency>
api方式創(chuàng)建
- @Test
- public void testCreateTopic()throws Exception{
- ZkClient zkClient =new ZkClient("127.0.0.1:2181", 3000, 3000, ZKStringSerializer$.MODULE$)
- String topicName = "topic-kl";
- int partitions = 1;
- int replication = 1;
- AdminUtils.createTopic(zkClient,topicName,partitions,replication,new Properties());
- }
注意下ZkClient最后一個構(gòu)造入?yún)ⅲ且粋€序列化反序列化的接口實現(xiàn),博主測試如果不填的話,創(chuàng)建的Topic在ZK上的數(shù)據(jù)是有問題的,默認(rèn)的Kafka實現(xiàn)也很簡單,就是做了字符串UTF-8編碼處理。ZKStringSerializer$是Kafka中已經(jīng)實現(xiàn)好的一個接口實例,是一個Scala的伴生對象,在Java中直接調(diào)用點MODULE$就可以得到一個實例
命令方式創(chuàng)建
- @Test
- public void testCreateTopic(){
- String [] options= new String[]{
- "--create",
- "--zookeeper","127.0.0.1:2181",
- "--replication-factor", "3",
- "--partitions", "3",
- "--topic", "topic-kl"
- };
- TopicCommand.main(options);
- }
消息發(fā)送之KafkaTemplate探秘
獲取發(fā)送結(jié)果
異步獲取
- template.send("","").addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
- @Override
- public void onFailure(Throwable throwable) {
- ......
- }
- @Override
- public void onSuccess(SendResult<Object, Object> objectObjectSendResult) {
- ....
- }
- });
同步獲取
- ListenableFuture<SendResult<Object,Object>> future = template.send("topic-kl","kl");
- try {
- SendResult<Object,Object> result = future.get();
- }catch (Throwable e){
- e.printStackTrace();
- }
kafka事務(wù)消息
默認(rèn)情況下,Spring-kafka自動生成的KafkaTemplate實例,是不具有事務(wù)消息發(fā)送能力的。需要使用如下配置激活事務(wù)特性。事務(wù)激活后,所有的消息發(fā)送只能在發(fā)生事務(wù)的方法內(nèi)執(zhí)行了,不然就會拋一個沒有事務(wù)交易的異常
- spring.kafka.producer.transaction-id-prefix=kafka_tx.
當(dāng)發(fā)送消息有事務(wù)要求時,比如,當(dāng)所有消息發(fā)送成功才算成功,如下面的例子:假設(shè)第一條消費發(fā)送后,在發(fā)第二條消息前出現(xiàn)了異常,那么第一條已經(jīng)發(fā)送的消息也會回滾。而且正常情況下,假設(shè)在消息一發(fā)送后休眠一段時間,在發(fā)送第二條消息,消費端也只有在事務(wù)方法執(zhí)行完成后才會接收到消息
- @GetMapping("/send/{input}")
- public void sendFoo(@PathVariable String input) {
- template.executeInTransaction(t ->{
- t.send("topic_input","kl");
- if("error".equals(input)){
- throw new RuntimeException("failed");
- }
- t.send("topic_input","ckl");
- return true;
- });
- }
當(dāng)事務(wù)特性激活時,同樣,在方法上面加@Transactional注解也會生效
- @GetMapping("/send/{input}")
- @Transactional(rollbackFor = RuntimeException.class)
- public void sendFoo(@PathVariable String input) {
- template.send("topic_input", "kl");
- if ("error".equals(input)) {
- throw new RuntimeException("failed");
- }
- template.send("topic_input", "ckl");
- }
Spring-Kafka的事務(wù)消息是基于Kafka提供的事務(wù)消息功能的。而Kafka Broker默認(rèn)的配置針對的三個或以上Broker高可用服務(wù)而設(shè)置的。這邊在測試的時候為了簡單方便,使用了嵌入式服務(wù)新建了一個單Broker的Kafka服務(wù),出現(xiàn)了一些問題:如
1、事務(wù)日志副本集大于Broker數(shù)量,會拋如下異常:
- Number of alive brokers '1' does not meet the required replication factor '3'
- for the transactions state topic (configured via 'transaction.state.log.replication.factor').
- This error can be ignored if the cluster is starting up and not all brokers are up yet.
默認(rèn)Broker的配置transaction.state.log.replication.factor=3,單節(jié)點只能調(diào)整為1
2、副本數(shù)小于副本同步隊列數(shù)目,會拋如下異常
- Number of insync replicas for partition __transaction_state-13 is [1], below required minimum [2]
默認(rèn)Broker的配置transaction.state.log.min.isr=2,單節(jié)點只能調(diào)整為1
ReplyingKafkaTemplate獲得消息回復(fù)
ReplyingKafkaTemplate是KafkaTemplate的一個子類,除了繼承父類的方法,新增了一個方法sendAndReceive,實現(xiàn)了消息發(fā)送\回復(fù)語義
- RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
也就是我發(fā)送一條消息,能夠拿到消費者給我返回的結(jié)果。就像傳統(tǒng)的RPC交互那樣。當(dāng)消息的發(fā)送者需要知道消息消費者的具體的消費情況,非常適合這個api。如,一條消息中發(fā)送一批數(shù)據(jù),需要知道消費者成功處理了哪些數(shù)據(jù)。下面代碼演示了怎么集成以及使用ReplyingKafkaTemplate
- /**
- * @author: kl @kailing.pub
- * @date: 2019/5/30
- */
- @SpringBootApplication
- @RestController
- public class Application {
- private final Logger logger = LoggerFactory.getLogger(Application.class);
- public static void main(String[] args) {
- SpringApplication.run(Application.class, args);
- }
- @Bean
- public ConcurrentMessageListenerContainer<String, String> repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
- ConcurrentMessageListenerContainer<String, String> repliesContainer = containerFactory.createContainer("replies");
- repliesContainer.getContainerProperties().setGroupId("repliesGroup");
- repliesContainer.setAutoStartup(false);
- return repliesContainer;
- }
- @Bean
- public ReplyingKafkaTemplate<String, String, String> replyingTemplate(ProducerFactory<String, String> pf, ConcurrentMessageListenerContainer<String, String> repliesContainer) {
- return new ReplyingKafkaTemplate(pf, repliesContainer);
- }
- @Bean
- public KafkaTemplate kafkaTemplate(ProducerFactory<String, String> pf) {
- return new KafkaTemplate(pf);
- }
- @Autowired
- private ReplyingKafkaTemplate template;
- @GetMapping("/send/{input}")
- @Transactional(rollbackFor = RuntimeException.class)
- public void sendFoo(@PathVariable String input) throws Exception {
- ProducerRecord<String, String> record = new ProducerRecord<>("topic-kl", input);
- RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
- ConsumerRecord<String, String> consumerRecord = replyFuture.get();
- System.err.println("Return value: " + consumerRecord.value());
- }
- @KafkaListener(id = "webGroup", topics = "topic-kl")
- @SendTo
- public String listen(String input) {
- logger.info("input value: {}", input);
- return "successful";
- }
- }
Spring-kafka消息消費用法探秘
@KafkaListener的使用
前面在簡單集成中已經(jīng)演示過了@KafkaListener接收消息的能力,但是@KafkaListener的功能不止如此,其他的比較常見的,使用場景比較多的功能點如下:
- 顯示的指定消費哪些Topic和分區(qū)的消息,
- 設(shè)置每個Topic以及分區(qū)初始化的偏移量,
- 設(shè)置消費線程并發(fā)度
- 設(shè)置消息異常處理器
- @KafkaListener(id = "webGroup", topicPartitions = {
- @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
- @TopicPartition(topic = "topic2", partitions = "0",
- partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
- },concurrency = "6",errorHandler = "myErrorHandler")
- public String listen(String input) {
- logger.info("input value: {}", input);
- return "successful";
- }
其他的注解參數(shù)都很好理解,errorHandler需要說明下,設(shè)置這個參數(shù)需要實現(xiàn)一個接口KafkaListenerErrorHandler。而且注解里的配置,是你自定義實現(xiàn)實例在spring上下文中的Name。比如,上面配置為errorHandler = "myErrorHandler"。則在spring上線中應(yīng)該存在這樣一個實例:
- /**
- * @author: kl @kailing.pub
- * @date: 2019/5/31
- */
- @Service("myErrorHandler")
- public class MyKafkaListenerErrorHandler implements KafkaListenerErrorHandler {
- Logger logger =LoggerFactory.getLogger(getClass());
- @Override
- public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
- logger.info(message.getPayload().toString());
- return null;
- }
- @Override
- public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
- logger.info(message.getPayload().toString());
- return null;
- }
- }
手動Ack模式
手動ACK模式,由業(yè)務(wù)邏輯控制提交偏移量。比如程序在消費時,有這種語義,特別異常情況下不確認(rèn)ack,也就是不提交偏移量,那么你只能使用手動Ack模式來做了。開啟手動首先需要關(guān)閉自動提交,然后設(shè)置下consumer的消費模式
- spring.kafka.consumer.enable-auto-commit=false
- spring.kafka.listener.ack-mode=manual
上面的設(shè)置好后,在消費時,只需要在@KafkaListener監(jiān)聽方法的入?yún)⒓尤階cknowledgment 即可,執(zhí)行到ack.acknowledge()代表提交了偏移量
- @KafkaListener(id = "webGroup", topics = "topic-kl")
- public String listen(String input, Acknowledgment ack) {
- logger.info("input value: {}", input);
- if ("kl".equals(input)) {
- ack.acknowledge();
- }
- return "successful";
- }
@KafkaListener注解監(jiān)聽器生命周期
@KafkaListener注解的監(jiān)聽器的生命周期是可以控制的,默認(rèn)情況下,@KafkaListener的參數(shù)autoStartup = "true"。也就是自動啟動消費,但是也可以同過KafkaListenerEndpointRegistry來干預(yù)他的生命周期。KafkaListenerEndpointRegistry有三個動作方法分別如:start(),pause(),resume()/啟動,停止,繼續(xù)。如下代碼詳細演示了這種功能。
- /**
- * @author: kl @kailing.pub
- * @date: 2019/5/30
- */
- @SpringBootApplication
- @RestController
- public class Application {
- private final Logger logger = LoggerFactory.getLogger(Application.class);
- public static void main(String[] args) {
- SpringApplication.run(Application.class, args);
- }
- @Autowired
- private KafkaTemplate template;
- @GetMapping("/send/{input}")
- @Transactional(rollbackFor = RuntimeException.class)
- public void sendFoo(@PathVariable String input) throws Exception {
- ProducerRecord<String, String> record = new ProducerRecord<>("topic-kl", input);
- template.send(record);
- }
- @Autowired
- private KafkaListenerEndpointRegistry registry;
- @GetMapping("/stop/{listenerID}")
- public void stop(@PathVariable String listenerID){
- registry.getListenerContainer(listenerID).pause();
- }
- @GetMapping("/resume/{listenerID}")
- public void resume(@PathVariable String listenerID){
- registry.getListenerContainer(listenerID).resume();
- }
- @GetMapping("/start/{listenerID}")
- public void start(@PathVariable String listenerID){
- registry.getListenerContainer(listenerID).start();
- }
- @KafkaListener(id = "webGroup", topics = "topic-kl",autoStartup = "false")
- public String listen(String input) {
- logger.info("input value: {}", input);
- return "successful";
- }
- }
在上面的代碼中,listenerID就是@KafkaListener中的id值“webGroup”。項目啟動好后,分別執(zhí)行如下url,就可以看到效果了。
先發(fā)送一條消息:http://localhost:8081/send/ckl。因為autoStartup = "false",所以并不會看到有消息進入監(jiān)聽器。
接著啟動監(jiān)聽器:http://localhost:8081/start/webGroup。可以看到有一條消息進來了。
暫停和繼續(xù)消費的效果使用類似方法就可以測試出來了。
SendTo消息轉(zhuǎn)發(fā)
前面的消息發(fā)送響應(yīng)應(yīng)用里面已經(jīng)見過@SendTo,其實除了做發(fā)送響應(yīng)語義外,@SendTo注解還可以帶一個參數(shù),指定轉(zhuǎn)發(fā)的Topic隊列。常見的場景如,一個消息需要做多重加工,不同的加工耗費的cup等資源不一致,那么就可以通過跨不同Topic和部署在不同主機上的consumer來解決了。如:
- @KafkaListener(id = "webGroup", topics = "topic-kl")
- @SendTo("topic-ckl")
- public String listen(String input) {
- logger.info("input value: {}", input);
- return input + "hello!";
- }
- @KafkaListener(id = "webGroup1", topics = "topic-ckl")
- public void listen2(String input) {
- logger.info("input value: {}", input);
- }
消息重試和死信隊列的應(yīng)用
除了上面談到的通過手動Ack模式來控制消息偏移量外,其實Spring-kafka內(nèi)部還封裝了可重試消費消息的語義,也就是可以設(shè)置為當(dāng)消費數(shù)據(jù)出現(xiàn)異常時,重試這個消息。而且可以設(shè)置重試達到多少次后,讓消息進入預(yù)定好的Topic。也就是死信隊列里。下面代碼演示了這種效果:
- @Autowired
- private KafkaTemplate template;
- @Bean
- public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
- ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
- ConsumerFactory<Object, Object> kafkaConsumerFactory,
- KafkaTemplate<Object, Object> template) {
- ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
- configurer.configure(factory, kafkaConsumerFactory);
- //最大重試三次
- factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 3));
- return factory;
- }
- @GetMapping("/send/{input}")
- public void sendFoo(@PathVariable String input) {
- template.send("topic-kl", input);
- }
- @KafkaListener(id = "webGroup", topics = "topic-kl")
- public String listen(String input) {
- logger.info("input value: {}", input);
- throw new RuntimeException("dlt");
- }
- @KafkaListener(id = "dltGroup", topics = "topic-kl.DLT")
- public void dltListen(String input) {
- logger.info("Received from DLT: " + input);
- }
上面應(yīng)用,在topic-kl監(jiān)聽到消息會,會觸發(fā)運行時異常,然后監(jiān)聽器會嘗試三次調(diào)用,當(dāng)?shù)竭_最大的重試次數(shù)后。消息就會被丟掉重試死信隊列里面去。死信隊列的Topic的規(guī)則是,業(yè)務(wù)Topic名字+“.DLT”。如上面業(yè)務(wù)Topic的name為“topic-kl”,那么對應(yīng)的死信隊列的Topic就是“topic-kl.DLT”
文末結(jié)語
最近業(yè)務(wù)上使用了kafka用到了Spring-kafka,所以系統(tǒng)性的探索了下Spring-kafka的各種用法,發(fā)現(xiàn)了很多好玩很酷的特性,比如,一個注解開啟嵌入式的Kafka服務(wù)、像RPC調(diào)用一樣的發(fā)送\響應(yīng)語義調(diào)用、事務(wù)消息等功能。希望此博文能夠幫助那些正在使用Spring-kafka或即將使用的人少走一些彎路少踩一點坑。