0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

Spring Kafka的各種用法

科技綠洲 ? 來(lái)源:Java技術(shù)指北 ? 作者:Java技術(shù)指北 ? 2023-09-25 17:04 ? 次閱讀

最近業(yè)務(wù)上用到了Spring Kafka,所以系統(tǒng)性的探索了下Spring Kafka的各種用法,發(fā)現(xiàn)了很多實(shí)用的特性,下面介紹下Spring Kafka的消息重試機(jī)制。

0. 前言

原生 Kafka 是不支持消息重試的。但是 Spring Kafka 2.7+ 封裝了 Retry Topic 這個(gè)功能。

1. @RetryableTopic

使用注解的方式啟用 Retry Topic,在 @KafkaListener 方法上添加 @RetryableTopic 即可:

@Slf4j
@Component
public class DemoConsumer {
    @RetryableTopic
    @KafkaListener(topics = "topic1", groupId = "group1")
    public void onMsg(ConsumerRecord< String, String > record) {
        log.info("topic: {}", record.topic());
        throw new RuntimeException("kafka exception");
    }

}

這樣就開啟了 Spring Kafka 的消息重試機(jī)制:默認(rèn)重試 3 次,間隔為 1 秒。

我們?cè)诜椒ɡ?a href="http://srfitnesspt.com/analog/" target="_blank">模擬了拋出異常,運(yùn)行后可以發(fā)現(xiàn)打印了 3 條日志,間隔時(shí)間大約為 1 秒,重試的topic為原topic加上后綴“-retry”

2022-11-12 12:14:10.230  INFO 1023 --- [ner#3-dlt-0-C-1] c.b.b.demo.retrytopic.KafkaListener: topic: topic1
2022-11-12 12:14:11.315  INFO 1023 --- [ner#3-dlt-0-C-1] c.b.b.demo.retrytopic.KafkaListener: topic: topic1-retry-0  
2022-11-12 12:14:12.310  INFO 1023 --- [ner#3-dlt-0-C-1] c.b.b.demo.retrytopic.KafkaListener: topic: topic1-retry-1

2. DLT死信隊(duì)列

如果 3 次重試后依舊失敗,會(huì)將消息發(fā)送到 DLT,

默認(rèn)情況,消息被發(fā)送到死信隊(duì)列后,會(huì)輸出一條日志。

2022-11-12 12:14:13.324  INFO 1023 --- [ner#3-dlt-0-C-1] o.s.k.retrytopic.RetryTopicConfigurer    : Received message in dlt listener: topic1-dlt@233

DLT的topic為原topic加上后綴“-dlt”

我們可以使用@DltHandler注解來(lái)定義進(jìn)入死信隊(duì)列后的操作:

@DltHandler
public void dltHandler(ConsumerRecord< String, String > record) {
    log.info("topic:{}, key:{}, value:{}", record.topic(), record.key(), record.value());
}

3. 自定義@RetryableTopic

可以自定義重試次數(shù)、延遲時(shí)間、topic命名策略等等,支持使用 Spring EL 表達(dá)式讀取配置。

@Slf4j
@Component
public class DemoConsumer {
    @RetryableTopic(
            attempts = "4",
            backoff = @Backoff(delay = "5000", multiplier = "2"),
            fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC
    )
    @KafkaListener(topics = "topic2", groupId = "group1")
    public void onMsg2(ConsumerRecord< String, String > record) {
        log.info("topic: {}", record.topic());
        throw new RuntimeException("kafka exception");
    }

}

注解屬性說(shuō)明:

attempts :重試次數(shù),默認(rèn)為3。

@Backoff delay :消費(fèi)延遲時(shí)間,單位為毫秒。

@Backoff multiplier :延遲時(shí)間系數(shù),此例中 attempts = 4, delay = 5000, multiplier = 2 ,則間隔時(shí)間依次為5s、10s、20s、40s,最大延遲時(shí)間受 maxDelay 限制。

fixedDelayTopicStrategy :可選策略包括:SINGLE_TOPIC 、MULTIPLE_TOPICS

4. 配置類

以上介紹的是注解的方式,只對(duì)注解下的方法有效。如果想讓多個(gè)方法都用相同的消息重試配置,那么可以使用配置類方式:

@Bean
public RetryTopicConfiguration retryTopic(KafkaTemplate< String, String > template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .maxAttempts(4)
            .fixedBackOff(5000)
            .includeTopic("topic1")
            .create(template);
}

小結(jié)

以上就是Spring Kafka消息重試機(jī)制的簡(jiǎn)單應(yīng)用~希望能夠幫助那些正在使用Spring Kafka或即將使用的人少走一些彎路、少踩一點(diǎn)坑。

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • spring
    +關(guān)注

    關(guān)注

    0

    文章

    335

    瀏覽量

    14277
  • 日志
    +關(guān)注

    關(guān)注

    0

    文章

    131

    瀏覽量

    10615
  • 機(jī)制
    +關(guān)注

    關(guān)注

    0

    文章

    24

    瀏覽量

    9763
  • DLT
    DLT
    +關(guān)注

    關(guān)注

    0

    文章

    16

    瀏覽量

    5285
收藏 人收藏

    評(píng)論

    相關(guān)推薦

    Spring Boot Starter需要些什么

    pulsar-spring-boot-starter是非常有必要的,在此之前,我們先看看一個(gè)starter需要些什么。 Spring Boot Starter spring-boot的強(qiáng)大之處在于其提供的大量
    的頭像 發(fā)表于 09-25 11:35 ?687次閱讀
    <b class='flag-5'>Spring</b> Boot Starter需要些什么

    java spring教程

    java spring教程理解Spring 實(shí)現(xiàn)原理掌握Spring IOC,AOP掌握Spring的基礎(chǔ)配置和用法熟練使用SSH開發(fā)項(xiàng)目
    發(fā)表于 09-11 11:09

    什么是java spring

    、并且更易于測(cè)試的代碼。它們也為Spring中的各種模塊提供了基礎(chǔ)支持。Spring帶給我們什么◆方便解耦,簡(jiǎn)化開發(fā)      
    發(fā)表于 09-11 11:16

    基于發(fā)布與訂閱的消息系統(tǒng)Kafka

    Kafka權(quán)威指南》——初識(shí) Kafka
    發(fā)表于 03-05 13:46

    Kafka基礎(chǔ)入門文檔

    kafka系統(tǒng)入門教程(原理、配置、集群搭建、Java應(yīng)用、Kafka-manager)
    發(fā)表于 03-12 07:22

    Spring筆記分享

    Spring實(shí)現(xiàn)了使用簡(jiǎn)單的組件配置組合成一個(gè)復(fù)雜的應(yīng)用。在 Spring 中可以使用XML和Java注解組合這些對(duì)象。6) 一站式:在IOC和AOP的基礎(chǔ)上可以整合各種企業(yè)應(yīng)用的開源框架和優(yōu)秀的第三方類
    發(fā)表于 11-04 07:51

    Kafka集群環(huán)境的搭建

    1、環(huán)境版本版本:kafka2.11,zookeeper3.4注意:這里zookeeper3.4也是基于集群模式部署。2、解壓重命名tar -zxvf
    發(fā)表于 01-05 17:55

    spring定時(shí)器用法詳解

    Spring是于2003年興起的一個(gè)輕量級(jí)的Java開發(fā)框架,由RodJohnson創(chuàng)建。簡(jiǎn)單來(lái)說(shuō),Spring是一個(gè)分層的JavaSE/EEfull-stack(一站式)輕量級(jí)開源框架。下文為大家介紹spring定時(shí)器
    發(fā)表于 01-28 10:16 ?5661次閱讀
    <b class='flag-5'>spring</b>定時(shí)器<b class='flag-5'>用法</b>詳解

    Kafka的概念及Kafka的宕機(jī)

    問題要從一次Kafka的宕機(jī)開始說(shuō)起。 筆者所在的是一家金融科技公司,但公司內(nèi)部并沒有采用在金融支付領(lǐng)域更為流行的 RabbitMQ ,而是采用了設(shè)計(jì)之初就為日志處理而生的 Kafka ,所以我一直
    的頭像 發(fā)表于 08-27 11:21 ?2012次閱讀
    <b class='flag-5'>Kafka</b>的概念及<b class='flag-5'>Kafka</b>的宕機(jī)

    Spring Boot實(shí)現(xiàn)各種參數(shù)校驗(yàn)

    之前也寫過一篇關(guān)于Spring Validation使用的文章,不過自我感覺還是浮于表面,本次打算徹底搞懂Spring Validation。本文會(huì)詳細(xì)介紹Spring Validation
    的頭像 發(fā)表于 08-14 15:54 ?906次閱讀

    Spring Validation的使用

    之前也寫過一篇關(guān)于Spring Validation使用的文章,不過自我感覺還是浮于表面,本次打算徹底搞懂Spring Validation。本文會(huì)詳細(xì)介紹Spring Validation
    的頭像 發(fā)表于 09-08 10:31 ?849次閱讀

    Kafka 的簡(jiǎn)介

    ? 1 kafka簡(jiǎn)介 2 為什么要用消息系統(tǒng) 3 kafka基礎(chǔ)知識(shí) 4 kafka集群架構(gòu) 5 總結(jié) ? 1 kafka簡(jiǎn)介 其主要設(shè)計(jì)目標(biāo)如下: 以時(shí)間復(fù)雜度為O(1)的方式提供
    的頭像 發(fā)表于 07-03 11:10 ?552次閱讀
    <b class='flag-5'>Kafka</b> 的簡(jiǎn)介

    監(jiān)控Kafka集群的常用的方法和工具介紹

    Control等工具連接到Kafka Broker的JMX端口,并監(jiān)控各種關(guān)鍵指標(biāo),如吞吐量、延遲、磁盤使用率、網(wǎng)絡(luò)連接數(shù)等。
    發(fā)表于 08-30 10:05 ?1948次閱讀
    監(jiān)控<b class='flag-5'>Kafka</b>集群的常用的方法和工具介紹

    kafka client在 spring如何實(shí)現(xiàn)

    認(rèn)識(shí)了 spring-boot-starter ,今天不妨來(lái)看下如何寫一個(gè) pulsar-spring-boot-starter 模塊。 目標(biāo) 寫一個(gè)完整的類似 kafka-spring-boot-st
    的頭像 發(fā)表于 09-25 11:21 ?432次閱讀
    <b class='flag-5'>kafka</b> client在 <b class='flag-5'>spring</b>如何實(shí)現(xiàn)

    Kafka架構(gòu)技術(shù):Kafka的架構(gòu)和客戶端API設(shè)計(jì)

    Kafka 給自己的定位是事件流平臺(tái)(event stream platform)。因此在消息隊(duì)列中經(jīng)常使用的 "消息"一詞,在 Kafka 中被稱為 "事件"。
    的頭像 發(fā)表于 10-10 15:41 ?2236次閱讀
    <b class='flag-5'>Kafka</b>架構(gòu)技術(shù):<b class='flag-5'>Kafka</b>的架構(gòu)和客戶端API設(shè)計(jì)