当前位置: 首页 > 知识库问答 >
问题:

如何从应用程序配置Kafka consumer retry属性。spring boot中的属性?

孔斌
2023-03-14

在Spring靴中,

application.yml:

kafka:
   bootstrap-servers: localhost:9092
   listener:
    concurrency: 10
    ack-mode: MANUAL
   producer:
    topic: test-record
    key-serializer: org.apache.kafka.common.serialization.StringSerializer
    value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    retries: 3
    orn-record:
      timeout: 3
    #acks: 1
   consumer:
    groupId: test-record
    topic: test
    enable-auto-commit: false
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer

通过使用上述配置,我们可以避免在spring boot中基于java web(bean)的配置,这是一个很有价值的优势。

Q: 我们可以从应用程序中添加Kafka错误处理程序和Kafka消费者重试次数属性吗。属性/应用。yml?

我找不到任何关于它的参考或留档,因此希望有一些结论,只是因为这个问题,现在我必须在Spring Boot中转到java基于Web的配置并删除属性配置,这再次回到Spring中的旧方式。我相信应该有一些解决方法,我们可以通过属性文件配置来实现这一点。

共有1个答案

许俊晤
2023-03-14

消费者没有重试属性。如果未提交偏移量,下一次轮询将从相同的偏移量重试。

也没有任何可配置的错误处理类像Kafka Streams中那样脱离反序列化带外。

如果你想处理反序列化错误,而不是处理错误,你可以这样设置

spring:
  kafka:
    bootstrap-servers: ...
    consumer:
      # Configures the Spring Kafka ErrorHandlingDeserializer that delegates to the 'real' deserializers
      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
    properties:
      # Delegate deserializers
      spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
      spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer

除此之外,您还可以利用代码中的死信发布恢复器(DeadLetterPublishingRecoverer)和SeekToCurrentErrorHandler,将错误事件生成到新主题,以供检查和进一步处理来源

 类似资料:
  • 概述: 我在我的logback.xml文件中使用哨兵appender,我想通过大量的标签作为参数从application.properties文件到logback配置文件。 向后退。xml文件: application.properties: 注意:application.properties中的spring.profiles.active属性映射到logback配置文件中的springProfi

  • Compile Sdk Version: 指定Android的编译版本. 对应build.gradle文件中的参数是: compileSdkVersion 23 Build Tools Version: 指定构建工具的版本. 对应build.gradle文件中的参数是: buildToolsVersion "23.0.2" SDK编译版本和构建工具的版本都是我们已经下载到本地的,如果本地没有就

  • 我在src/main/resources下创建了2个文件: 应用程序。属性 第一个具有从env变量中获取值的属性,而后者具有固定值。 根据这里的具体情况,我以这样的方式推出了Spring靴: 然而,不会产生任何影响,并且应用程序是局部的。属性似乎被忽略。 有什么提示吗?

  • 我正在将一个非常基本的web应用程序部署到Google应用程序引擎。我使用的是Springboot,我可以在本地很好地运行应用程序,但当我部署到Google时,应用程序引擎不会启动实例。我在启动时配置了一个云SQL数据源。 我有云sql配置属性配置src/main/Resources/application.properties.App Engine似乎找不到这些属性,所以它无法正确设置Cloud

  • 我是springboot的新手,我正在尝试从application.properties文件的位置(src/main/resources)读取属性值。但它总是返回NULL。我也需要帮助。附加类和属性文件。请注意:我试过“https://www.baeldung.com/properties-with-spring”如何访问Spring Boot中application.properties文件中定

  • 我试图在Spring Boot中实现基于配置文件的应用程序,这显然适用于Spring Boot。但是当我试图在弹性搜索APM中实现它时,它不起作用。 根据这里的说法:我们可以用弹性来描述这样的性质。apm前缀: 但它不起作用。当我用elasticapm.properties打电话时,它起作用了。 你有什么建议吗?