当前位置: 首页 > 知识库问答 >
问题:

使用过滤策略和手动提交的spring-kafka批处理示例

常鸿朗
2023-03-14

我计划使用SpringKafka批处理侦听器进行批处理。我正在寻找这两种方案的几个示例。

  1. 我们如何通过批处理实现过滤记录策略?更新 :来自文档 - “此外,还提供了过滤批处理消息警报器适配器,用于使用批处理消息侦听器时。我没有看到任何容器工厂方法来设置此筛选器批处理消息驱动程序对象或筛选器实现。

以下是我的批处理侦听器过滤策略代码:

@Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {

        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
        configurer.configure(factory, kafkaConsumerFactory);
        factory.setBatchListener(true);
        factory.setAckDiscarded(true);
        factory.setRecordFilterStrategy(new RecordFilterStrategy<Object, Object>() {
            @Override
            public boolean filter(ConsumerRecord<Object, Object> consumerRecords) {

                //log.info("Retrieved the record {} from the partition {} with offset {}", consumerRecord.value(), consumerRecord.partition(), consumerRecord.offset());

                return true;
            }
        });
        
        return factory;
    }

现在我想到的另一个问题是,上述方案如何与单个消费者和多个消费者一起工作。

假设案例1:单一消费者

假设我们有一个具有5个分区的主题。当我们订阅该主题时,我们假设我们从该主题中获得了100条消息,其中每个分区有20条消息。如果我们想提交这些消息偏移量,确认对象是否保存了最后一条消息的每个分区和最后一个偏移量?

案例2:多个消费者

对于案例1中提到的相同输入,如果我们启用分区计数的相等的消费者数量,ack对象是否保存分区和最后一条消息的偏移量?

你能帮我解决这个问题吗?

共有1个答案

金皓君
2023-03-14

>

  • 请参见过滤BatchMessageListenerAdapterhttps://docs.spring.io/spring-kafka/docs/current/reference/html/#filtering-消息

    处理批处理异常的最简单方法是将RecoveringBatchErrorHandlerDeadLetterPublishingRecoverer一起使用。抛出一个<code>BatchListenerFailedException</code>以指示批处理中的哪个记录失败;成功记录的偏移量将被提交,剩余记录(包括失败的记录)将被重新提交,直到重试次数(如果配置)耗尽,此时失败的记录将转到死信主题,其余记录将被重新发送。

    https://docs.spring.io/spring-kafka/docs/current/reference/html/#recovering-批量eh

    是的,当批处理被确认时,将提交批处理中每个分区的最新偏移量(1)。

    如果您有多个消费者,分区将分布在这些消费者之间。

  •  类似资料:
    • 我有以下步骤在批处理工作。 当某个异常抛出时,我在parseStepSkipListener中捕捉他并登录数据库。 我期待以下行为: 作业已启动 执行前面的步骤 开始执行解析步骤 阅读项目 进程项 写 哦,异常。 捕获异常,登录数据库,转到下一个块(读取、处理、写入)。 作业已启动 执行前面的步骤 开始执行解析步骤 阅读项目 进程项 写 哦,异常。 进程项 写入项 哦,异常。 捕获异常,登录数据库

    • 我正在实现spring kafka批处理侦听器,它读取来自kafka主题的消息列表,并将数据发布到REST服务。我想了解在REST服务停止的情况下的偏移管理,不应该提交批处理的偏移,应该为下一次轮询处理消息。我已经阅读了spring kafka文档,但在理解侦听器错误处理程序和批量查找当前容器错误处理程序之间的区别时存在困惑。我使用的是spring-boot-2.0.0。M7及以下版本是我的代码。

    • 我们正在使用Spring云流霍克斯顿。SR4使用来自Kafka主题的消息。我们启用了spring.cloud.stream.bindings.。consumer.batch-Mode=true,每次轮询获取2000条记录。我想知道是否有一种方法可以手动确认/提交整个批次。

    • 为了辅助批处理系统的设计和实现、应该通过结构示意图和代码实例的形式为设计师和程序员提供基础的批处理程序构建模块和以及处理模式. 在设计批处理Job时,应该将业务逻辑分解成一系列的步骤,使每个步骤都可以利用以下的标准构建模块来实现: 转换程序(Conversion Applications): 由外部系统提供或需要写入到外部系统的各种类型的文件,我们都需要为其创建一个转换程序, 用来将所提供的事务记

    • 此场景的解决方案-提交间隔为10,跳过限制为10,总输入记录为20,前9个记录有效,其余无效。 当已读取9条记录时,第10条无效。块大小是10,跳过限制是10,那么Spring批处理会在输出文件中写入那9条记录吗?如果不是,则继续读取剩余的记录,当读取第20条记录时,错误记录的计数为11,定义的跳过限制为10。因此该过程将立即停止。首先读取的有效记录的命运如何。 前9条记录是否写入输出文件。 请让

    • 我正在使用方法级别的@KafkaHandler(类级别的@KafkaListener)消费Kafka事件。 我见过很多例子,其中有一个“acknowledge”参数可用,在这个参数上可以调用“acknowledge()”方法来提交事件的消耗,但是,当我将它作为参数包含到我的方法中时,我无法填充acknowledge对象。使用KafkaHandler时如何手动提交?有可能吗? 代码示例: 使用Spr