当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud Stream Kafka批次-手动提交整个批次

万浩淼
2023-03-14

我们正在使用Spring云流霍克斯顿。SR4使用来自Kafka主题的消息。我们启用了spring.cloud.stream.bindings.。consumer.batch-Mode=true,每次轮询获取2000条记录。我想知道是否有一种方法可以手动确认/提交整个批次。

共有2个答案

东门奕
2023-03-14

有没有一种方法可以在批处理消费者中检索包含标题的消息列表,类似的(更新了上面的示例)

@Bean
Consumer<List<Message<Foo>>> consume() {
    return list -> {
        list.forEach(msg -> {
            System.out.println(msg.getPayload());
        });
    };
}

虽然我的问题是关于批处理消耗与头(如KafkaHeaders。MESSAGE_KEY)作为我的生产者发送所需的数据部分在关键和有效载荷Rest。我没有找到离我要找的最近的这条线。

羊舌赞
2023-03-14

SR4非常古老;当前霍克斯顿版本是SR9,当前Spring云流版本是3.0.10。释放(Hoxton.SR9拉入3.0.9)。

您需要使用Message并从标头获取确认。

@SpringBootApplication
public class So652289261Application {

    public static void main(String[] args) {
        SpringApplication.run(So652289261Application.class, args);
    }

    @Bean
    Consumer<Message<List<Foo>>> consume() {
        return msg -> {
            System.out.println(msg.getPayload());
            msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class).acknowledge();
        };
    }

    @Bean
    public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
        return (container, dest, group) -> container.getContainerProperties()
                .setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
        return args -> {
            template.send("consume-in-0", "{\"bar\":\"baz\"}".getBytes());
            template.send("consume-in-0", "{\"bar\":\"qux\"}".getBytes());
        };
    }

    public static class Foo {

        private String bar;

        public Foo() {
        }

        public Foo(String bar) {
            this.bar = bar;
        }

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

        @Override
        public String toString() {
            return "Foo [bar=" + this.bar + "]";
        }

    }

}

启动2.3.6和Cloud Hoxton的属性。SR9

spring.cloud.stream.bindings.consume-in-0.group=so65228926
spring.cloud.stream.bindings.consume-in-0.consumer.batch-mode=true
spring.cloud.stream.kafka.bindings.consume-in-0.consumer.auto-commit-offset=false

spring.kafka.producer.properties.linger.ms=50

Boot 2.4.0和Cloud 2020.0.0-M6的属性

spring.cloud.stream.bindings.consume-in-0.group=so65228926
spring.cloud.stream.bindings.consume-in-0.consumer.batch-mode=true
spring.cloud.stream.kafka.bindings.consume-in-0.consumer.ack-mode=MANUAL

spring.kafka.producer.properties.linger.ms=50
[Foo [bar=baz], Foo [bar=qux]]
... Committing: {consume-in-0-0=OffsetAndMetadata{offset=14, leaderEpoch=null, metadata=''}}
 类似资料:
  • 我有一个kafkalistener,可以一次监听一批消息,如下所示 我的问题是,有没有一种方法可以监听多批消息并只提交一次。例如,如果我在Kafka主题中有1000条消息,我希望以10批的形式一次听100条消息,并在处理10批消息后提交偏移量。

  • 我正在实现spring kafka批处理侦听器,它读取来自kafka主题的消息列表,并将数据发布到REST服务。我想了解在REST服务停止的情况下的偏移管理,不应该提交批处理的偏移,应该为下一次轮询处理消息。我已经阅读了spring kafka文档,但在理解侦听器错误处理程序和批量查找当前容器错误处理程序之间的区别时存在困惑。我使用的是spring-boot-2.0.0。M7及以下版本是我的代码。

  • 我们正在从Oracle DB迁移到Azure SQL Server,用于我们的Spring批处理应用程序。 我断断续续地得到以下错误 错误:01.03.2022:1458(40.269)[]main]命令行JobRunner:作业因错误而终止:创建名为“dateStoreList”的bean时出错:设置bean属性“jobRepository”时无法解析对bean“jobRepository”的引

  • 1)读取器读取的任何记录都应通过处理器的处理传递给写入器 2)我的阅读器通过SQL查询读取记录,所以如果阅读器读取了100条记录,那么所有记录都应该一次传递给writer 3)如果读取1000条记录,则应同时通过所有1000条记录 4)所以从本质上说,提交间隔在这里是动态的,而不是固定的。 5)我们有什么办法可以做到这一点吗? 编辑: 现在我们想要的是动态提交间隔。读者正在阅读的任何内容,都将立即

  • 列表数据是前端push的数据,接口没有,然后选择复选框打钩,点击弹框确定按钮把列表id给提交的给接口ids,状态选择了工艺分析,最后把工艺分析给列表的样品流程里,大佬们,列表没有调后台的接口情况下,点击提交按钮后怎么让页面也变成工艺分析,这怎么实现呢 目前的提交代码:

  • 我计划使用SpringKafka批处理侦听器进行批处理。我正在寻找这两种方案的几个示例。 我们如何通过批处理实现过滤记录策略?更新 :来自文档 - “此外,还提供了过滤批处理消息警报器适配器,用于使用批处理消息侦听器时。我没有看到任何容器工厂方法来设置此筛选器批处理消息驱动程序对象或筛选器实现。 以下是我的批处理侦听器过滤策略代码: 现在我想到的另一个问题是,上述方案如何与单个消费者和多个消费者一