当前位置: 首页 > 知识库问答 >
问题:

当我使用ConcurrentMessageListenerContainer时,如何跳过Kafka中有错误的消息?

江正德
2023-03-14

我用的是spring-kafka-2.1.10。RELEASE尝试跳过错误数据,但失败,

它落在一个无限消耗的循环中

我有什么要说的吗?

如果你能帮忙,我将不胜感激

简单代码:

=========================

@Service
public class testConsumerFactoryService {
    @Autowired
    private PlatformTransactionManager tx;
    @Autowired
    private TestRepository testRepository;  // table only have 1 column , String
    @Autowired
    private ConsumerFactory<String, String> consumerFactory;

    @PostConstruct
    public void consumerFactoryTest() {
      ContainerProperties containerProps = new ContainerProperties("test_1");
        containerProperties.setTransactionManager(this.tx);
        containerProperties
                .setMessageListener(new ConsumerAwareMessageListener<String, String>() {
                    @Override
                    public void onMessage(final ConsumerRecord<String, String> record,
                            final Consumer<?, ?> consumer) {
                        final String rec = record.value();
                        //add the 'rec' string to TestRepository's entity , aka: tests
                        try {
                            this.testRepository.save(tests);  //  >> 1. try to save in DB, but failed
                        } catch (final Throwable e) {
                            System.out.println("error !!" + e);
                            System.out.println("##records:" + record);
                            consumer.commitSync();  // >> 2. try to commit offset, but seems not working ?
                            System.out.println("##after:");
                        }
                    }
                }
            );
        final ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(
                this.consumerFactory, containerProperties);
        container.start();

    }
}

====================================================配置:===============================

@Configuration
@EnableKafka
public class Config {
@Bean
ConcurrentKafkaListenerContainerFactory<String, String>
                    kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
                            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    //use AckMode.RECORD
    factory.getContainerProperties().setAckMode(AckMode.RECORD);
    return factory;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
    props.put(ConsumerConfig.GROUP_ID_CONFIG, 'Test1');
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    return props;
}

共有1个答案

司空胤
2023-03-14

在您的ConcurrentKafkaListenerContainerFactory中,只需关联一个SeekTocurInterrorHandler:

factory.setErrorHandler(
        new SeekToCurrentErrorHandler(
            (rec, ex) -> {
              log.error(
                  "An exception happened during a message processing: {}", ex);
              // good to put something like a dispatch to a DLQ etc
            }));

有了这个,你将继续到下一个偏移量,你的应用程序不会因为坏消息等而被阻止。

 类似资料:
  • 我正在为Kafka和SparkStreaming编写一些代码,当我将它们放在Yarn-Cluster上时,它报告了。 但它在我的电脑上运行良好(独立模式) 那它有什么问题呢? //这是代码 这里例外----------------------------------- 19/07/26 18:21:56警告Scheduler.TaskSetManager:在stage 0.0中丢失任务0.0(TI

  • 我正在解析一个Apache日志文件,并将其保存到熊猫数据帧中,以供进一步调查。 但在日志文件中,我有一些错误行,因此发生以下错误: ValueError:第4320行中应包含11个字段,saw 27 为了克服这个问题,我在读取文件时加入了。这没有帮助,因为我遇到以下错误: ValueError:“python”引擎不支持“error\u bad\u line”选项 注意:我显式地使用了,因为我将分

  • (异常出现在生产者中,我在这个应用程序中没有消费者。) 我该怎么做才能摆脱这个例外呢?

  • 我是Checkstyle的新手,并且希望对现有代码实现Checkstyle标准检查,我希望跳过现有的错误/警告,并排除那些有错误/警告的相应文件进行关注检查。因此,如果任何开发人员留下任何新的错误/警告,都应该报告。有没有人可以建议我,我如何实现文件级别和检查特定的排除在checkstyle,一个简单的例子将使grate对我有帮助。

  • 我有一个Kafka集群正在运行,当重新启动应用程序(消费者)时,它会跳过一些在应用程序关闭时推送到主题的消息。 当应用程序启动时,我可以看到它读取带有偏移量的消息,然后将偏移量推送到。然后当应用程序关闭时,带有偏移量的消息被推送到主题。重启应用程序后,它读取并将其偏移量设置为,因此跳过。 这是我的配置: