当前位置: 首页 > 知识库问答 >
问题:

SpringKafka搜索到当前错误处理程序最大故障在并发级别小于分区数时不起作用

尉迟景福
2023-03-14

我使用的是spring kafka 2.2.7,我的消费者配置代码如下:

@Slf4j
@Configuration
@EnableKafka
public class KafkaConfiguration {

  @Bean
  ConcurrentKafkaListenerContainerFactory<String, Customer> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Customer> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    // less than number of partition, will do infinite retry
    factory.setConcurrency(1);
    SeekToCurrentErrorHandler errorHandler =
        new SeekToCurrentErrorHandler((record, exception) -> {
          LOGGER.info("***in error handler data, {}", record);
        }, 1);
    factory.setErrorHandler(errorHandler);
    return factory;
  }

  @Bean
  public ConsumerFactory<String, Customer> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  }

  @Bean
  public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
    props.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, KafkaAvroDeserializer.class);

    props.put("schema.registry.url", "http://127.0.0.1:8081");
    props.put("specific.avro.reader", "true");

    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    return props;
  }
}


@Component
@Slf4j
public class KafkaConsumerService {

  @KafkaListener(id = "demo-consumer-stream-group", topics = "kafka-demo-avro")
  public void process(ConsumerRecord<String, Customer> record) {
    LOGGER.info("Customer key: {} and value: {}", record.key(), record.value());
    LOGGER.info("topic: {}, partition: {}, offset: {}", record.topic(), record.partition(), record.offset());
    throw new RuntimeException("force to retry");
  }
}

因此,如果我的侦听器中发生了异常,如果我指定的并发级别小于我的主题的分区计数,即使我在配置中配置了maxFailures,消费者也会永远重试失败的消息。

只有当我至少每隔一秒发送一条消息时,它才会起作用。如果我以批处理的方式发送消息,这种行为就不会起作用。除非我重新启动消费者,它将正常工作。

重现步骤:1.创建一个包含1个以上分区的主题,例如3或6 2。在Spring kafka 配置中,将并发级别指定为 1 3。对于“寻道到当前错误处理程序”,将最大失败指定为正值,例如 3 4。向主题发送十几条消息

您将看到每个失败的消息将进行无限重试,而不是我指定的< code>maxFailure。此外,我可以看到许多信息落后于消费者的滞后。

但是,如果您停止监听器并再次启动监听器,它将正确地跳过失败的消息。

共有2个答案

刘和玉
2023-03-14

我正在使用2.2.8,我仍然可以重现该错误,但配置更加复杂。我需要同时具有“寻转电流”和重试指数随机回退策略”的模板。因此,仍然存在遇到它的风险。在这里报告。

勾炜
2023-03-14

这是Spring Kafka2.2.7.RELEASE的一个bug,但已在2.2.8.RELEASE中修复。

 类似资料:
  • 问题内容: 我有一个用于捕获任何分段错误或ctrl- c的应用程序。使用下面的代码,我能够捕获分段错误,但是该处理程序一次又一次地被调用。我该如何阻止他们。供您参考,我不想退出我的申请。我只是可以小心释放所有损坏的缓冲区。 可能吗? 处理程序就是这样。 在这里,对于Segmentation故障信号,处理程序被多次调用,并且很明显MyfreeBuffers()给我释放已释放的内存的错误。我只想释放一

  • 我有一个应用程序,我用它来捕捉任何分割错误或ctrl-c。使用下面的代码,我能够捕获分段错误,但是处理程序被一次又一次地调用。我怎样才能阻止他们。告诉你,我不想退出我的申请。我只是可以小心释放所有损坏的缓冲区。 可能吗? handler是这样的。 这里的分段故障信号,处理程序被多次调用,因为明显的MyFreeBuffers()给我释放已经释放的内存的错误。我只想免费一次,但仍然不想退出应用程序。

  • 问题内容: 我正在尝试使用JavaScript中的两个字符串进行不区分大小写的搜索。 通常情况如下: 该标志将不区分大小写。 但是我需要搜索第二个字符串。没有标志,它可以完美地工作: 如果我在上面的示例中添加标志,它将搜索searchstring而不是变量“ searchstring”中的内容(下一个示例不起作用): 我该如何实现? 问题答案: 是的,使用而不是。调用的结果将返回匹配自身的实际字符

  • 我有一本区分大小写的字典, 所以我可以在这本字典里找到区分大小写的键。 例如,我可以有下面的键值对, {test,10} {测试,20} {test1,30} {test2,40} ... 当有人传递密钥时,我想检索该值。检索应该部分不区分大小写,这意味着,如果匹配准确的大小写,则返回区分大小写的结果,如果区分大小写的键不存在,则检索区分大小写的键值。 例如,在字典中插入上述值 如果用户通过“测试

  • 问题内容: 您好,我注意到,这个简单的代码无法正常工作…… 该测试已在localhost上运行,我的意思是:我加载页面,关闭本地Web服务器,然后触发请求(通过一个简单的按钮,其中onclick指向此功能)。错误永远不会被调用,我得到的是调用成功处理程序,它具有textStatus =“ success”和data = null。我什至注意到请求在10秒之前就超时了。在Firefox(最新版本),

  • 问题内容: 我有一个Lucene索引,该索引当前区分大小写。我想添加的 选项 有不区分大小写作为后备的。这意味着与案例匹配的结果将获得更大的权重,并且将首先出现。例如,如果结果数限制为10,并且有10个匹配项符合我的情况,那就足够了。如果仅找到7个结果,则可以从不区分大小写的搜索中再添加3个结果。 我的案子实际上更复杂,因为我有不同重量的物品。理想情况下,匹配“错误”的表壳会增加一些重量。不用说,