当前位置: 首页 > 知识库问答 >
问题:

如何通过属性设置Spring Cloud Stream Kafka的偏移提交间隔

蒙洛华
2023-03-14

我想通过属性为Spring Cloud Stream ;在Kafka consumer上设置一个自动偏移提交间隔。

正如我从度量中看到的,默认情况下,Spring Cloud Stream ;Kafka对每个消耗的消息提交偏移量。对于高负载的主题(例如,如果流量为每秒10K消息),它会变得非常明显,并增加Kafka broker的负载。

我们以以下方式声明消费者:

@Bean
public Consumer<TestEvent> testEvents() {
    …
}

我尝试了几个选择,但都没有帮助我。

spring:
  cloud:
    function:
      definition: testEvents
    stream:
      kafka:
        binder:
          brokers: xxx
          consumer-properties:
            auto.offset.reset: latest
            auto.commit.interval.ms: 2000
            enable.auto.commit: true
      bindings:
        testEvents-in-0:
          destination: test_topic
          group: ${spring.application.name}_test_topic
          consumer:
            concurrency: 2
{
  "status": "DOWN",
  "components": {
    "binders": {
      "status": "DOWN",
      "components": {
        "kafka": {
          "status": "DOWN",
          "details": {
            "error": "org.apache.kafka.common.KafkaException: Failed to construct kafka consumer"
          }
        }
      }
    }, …

据我所知,enable.auto.commit:true不建议用于Spring Cloud Stream。

备选方案2

spring:
  cloud:
    function:
      definition: testEvents
    stream:
      kafka:
        binder:
          brokers: xxx
          configuration:
            auto.offset.reset: latest
      bindings:
        testEvents-in-0:
          destination: test_topic
          group: ${spring.application.name}_test_topic
          consumer:
            concurrency: 2
            ack-mode: TIME
            ack-time: 2000

但是,这样的配置属性ack-mode:time并没有帮助,因为我们看到每个消耗的消息都有偏移量提交。

我使用maven dependencyspring-cloud-starter-stream-kafkaversion3.0.12.release

共有1个答案

董庆
2023-03-14

首先,ack-mode是一个特定于Kafka的使用者绑定属性,您在公共属性中有它。

spring:
  cloud:
    function:
      definition: testEvents
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          configuration:
            auto.offset.reset: latest
        bindings:
          testEvents-in-0:
            consumer:
              ack-mode: TIME
      bindings:
        testEvents-in-0:
          destination: test_topic
          group: ${spring.application.name}_test_topic
          consumer:
            concurrency: 2

其次,没有ACK-TIME属性,您必须通过容器定制器在容器上设置它:

@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> cust() {
    return (container, dest, group) -> {
        container.getContainerProperties().setAckTime(2000L);
        container.getContainerProperties().setLogContainerConfig(true);
    };
}
 类似资料:
  • 我目前正在从具有特定偏移量的主题中获取消息。我正在使用寻求()来实现它。但是当我将enable.auto.commit设置为true或使用手动同步(委托同步()/委托同步())时,Seek()不起作用,因为它没有轮询来自特定偏移量的消息,而是从最后提交的偏移量中选择。 因此,在使用Seek()时,是否必须将偏移量存储在外部DB中,而不提交给Kafka?Seek和Commit不能并行工作吗? 客户端

  • 问题内容: 使用javascript,我知道我的用户时区为UTC +3。 现在,我想用此知识创建DateTime对象: 我得到的回应是: 我究竟做错了什么?我该如何解决? 问题答案: 这个怎么样…

  • 我有一个Kafka消费者,我从它消费数据从一个特定的主题,我看到下面的例外。我使用的是Kafka版本。 我添加了这两个额外的消费者属性,但仍然没有帮助: 那个错误意味着什么?我该如何解决它?我需要添加一些其他消费者属性吗?

  • 我有一个ReactorKafka项目,它消耗来自Kafka主题的消息,转换消息,然后写入到另一个主题。 我的理解是,只有在Reactor中成功完成所有顺序步骤后,才会提交偏移量。对吗?我想确保不会处理下一条记录,除非当前记录成功发送到目标Kafka主题。

  • 我已经将enable.auto.commit设置为true,并将auto.commit.interval.ms设置为10,000(即10秒)。现在我的问题是--消费者是每个记录的提交偏移量,还是根据10秒内消耗的记录数提交并提前偏移量?

  • 问题内容: 我想运行一组通用的食谱,这些食谱将在我们的实验室中供用户选择我们产品的版本,并希望使用jenkins来完成。Jenkins作业在节点引导期间传递此信息的最佳方法是什么?是否通过- j选项,例如-j’{“ load_version”:“ $ LOAD_VERSION”}’,其中$ LOAD_VERSION是Jenins作业的参数,有几种选择? 问题答案: 是的,可以让您为食谱的第一个Ch