当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud Stream Kafka批次-手动提交整个批次

万浩淼
2023-03-14

我们正在使用Spring云流霍克斯顿。SR4使用来自Kafka主题的消息。我们启用了spring.cloud.stream.bindings.。consumer.batch-Mode=true,每次轮询获取2000条记录。我想知道是否有一种方法可以手动确认/提交整个批次。

共有2个答案

东门奕
2023-03-14

有没有一种方法可以在批处理消费者中检索包含标题的消息列表,类似的(更新了上面的示例)

@Bean
Consumer<List<Message<Foo>>> consume() {
    return list -> {
        list.forEach(msg -> {
            System.out.println(msg.getPayload());
        });
    };
}

虽然我的问题是关于批处理消耗与头(如KafkaHeaders。MESSAGE_KEY)作为我的生产者发送所需的数据部分在关键和有效载荷Rest。我没有找到离我要找的最近的这条线。

羊舌赞
2023-03-14

SR4非常古老;当前霍克斯顿版本是SR9,当前Spring云流版本是3.0.10。释放(Hoxton.SR9拉入3.0.9)。

您需要使用Message并从标头获取确认。

@SpringBootApplication
public class So652289261Application {

    public static void main(String[] args) {
        SpringApplication.run(So652289261Application.class, args);
    }

    @Bean
    Consumer<Message<List<Foo>>> consume() {
        return msg -> {
            System.out.println(msg.getPayload());
            msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class).acknowledge();
        };
    }

    @Bean
    public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
        return (container, dest, group) -> container.getContainerProperties()
                .setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
        return args -> {
            template.send("consume-in-0", "{\"bar\":\"baz\"}".getBytes());
            template.send("consume-in-0", "{\"bar\":\"qux\"}".getBytes());
        };
    }

    public static class Foo {

        private String bar;

        public Foo() {
        }

        public Foo(String bar) {
            this.bar = bar;
        }

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

        @Override
        public String toString() {
            return "Foo [bar=" + this.bar + "]";
        }

    }

}

启动2.3.6和Cloud Hoxton的属性。SR9

spring.cloud.stream.bindings.consume-in-0.group=so65228926
spring.cloud.stream.bindings.consume-in-0.consumer.batch-mode=true
spring.cloud.stream.kafka.bindings.consume-in-0.consumer.auto-commit-offset=false

spring.kafka.producer.properties.linger.ms=50

Boot 2.4.0和Cloud 2020.0.0-M6的属性

spring.cloud.stream.bindings.consume-in-0.group=so65228926
spring.cloud.stream.bindings.consume-in-0.consumer.batch-mode=true
spring.cloud.stream.kafka.bindings.consume-in-0.consumer.ack-mode=MANUAL

spring.kafka.producer.properties.linger.ms=50
[Foo [bar=baz], Foo [bar=qux]]
... Committing: {consume-in-0-0=OffsetAndMetadata{offset=14, leaderEpoch=null, metadata=''}}
 类似资料:
  • 我有一个kafkalistener,可以一次监听一批消息,如下所示 我的问题是,有没有一种方法可以监听多批消息并只提交一次。例如,如果我在Kafka主题中有1000条消息,我希望以10批的形式一次听100条消息,并在处理10批消息后提交偏移量。

  • 我正在实现spring kafka批处理侦听器,它读取来自kafka主题的消息列表,并将数据发布到REST服务。我想了解在REST服务停止的情况下的偏移管理,不应该提交批处理的偏移,应该为下一次轮询处理消息。我已经阅读了spring kafka文档,但在理解侦听器错误处理程序和批量查找当前容器错误处理程序之间的区别时存在困惑。我使用的是spring-boot-2.0.0。M7及以下版本是我的代码。

  • 提前批 非star 岗位类型:推荐方向llm算法 一面: 自我介绍。 聊项目。 无八股。 手撕是一个滑动窗口 有些小细节没写对 但最后还是给过了。 二面: 自我介绍 然后紧接伪代码手撕多头注意力,撕完面试官说rms和残差链接呢?我说这个一般不在多头注意力这个类里面写,又给他写了一下transformersblock里面forward的伪代码。 紧急八股,rms和一般layer norm的区别。 l

  • 我们正在从Oracle DB迁移到Azure SQL Server,用于我们的Spring批处理应用程序。 我断断续续地得到以下错误 错误:01.03.2022:1458(40.269)[]main]命令行JobRunner:作业因错误而终止:创建名为“dateStoreList”的bean时出错:设置bean属性“jobRepository”时无法解析对bean“jobRepository”的引

  • 1)读取器读取的任何记录都应通过处理器的处理传递给写入器 2)我的阅读器通过SQL查询读取记录,所以如果阅读器读取了100条记录,那么所有记录都应该一次传递给writer 3)如果读取1000条记录,则应同时通过所有1000条记录 4)所以从本质上说,提交间隔在这里是动态的,而不是固定的。 5)我们有什么办法可以做到这一点吗? 编辑: 现在我们想要的是动态提交间隔。读者正在阅读的任何内容,都将立即

  • 电商技术部 自我介绍 项目(简单介绍了一下,他听不太懂,我说共享屏幕结合博客给他讲,他说直接看看代码,我就打开vscode简单讲了讲代码) 问我c++/go哪个更熟(我说都差不多,选了go,go八股少)。应该是从题库抽题,他问我答,他也不说对错。 1. go字符串拼接有哪些,性能怎么样(之前背过但不记得了,说只用过+,其它不太了解) 2. interface底层 3. interface可以比较嘛