我使用的是spring kafka 2.2.7,我的消费者配置代码如下:
@Slf4j
@Configuration
@EnableKafka
public class KafkaConfiguration {
@Bean
ConcurrentKafkaListenerContainerFactory<String, Customer> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Customer> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// less than number of partition, will do infinite retry
factory.setConcurrency(1);
SeekToCurrentErrorHandler errorHandler =
new SeekToCurrentErrorHandler((record, exception) -> {
LOGGER.info("***in error handler data, {}", record);
}, 1);
factory.setErrorHandler(errorHandler);
return factory;
}
@Bean
public ConsumerFactory<String, Customer> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, KafkaAvroDeserializer.class);
props.put("schema.registry.url", "http://127.0.0.1:8081");
props.put("specific.avro.reader", "true");
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
return props;
}
}
@Component
@Slf4j
public class KafkaConsumerService {
@KafkaListener(id = "demo-consumer-stream-group", topics = "kafka-demo-avro")
public void process(ConsumerRecord<String, Customer> record) {
LOGGER.info("Customer key: {} and value: {}", record.key(), record.value());
LOGGER.info("topic: {}, partition: {}, offset: {}", record.topic(), record.partition(), record.offset());
throw new RuntimeException("force to retry");
}
}
因此,如果我的侦听器中发生了异常,如果我指定的并发级别小于我的主题的分区计数,即使我在配置中配置了maxFailures,消费者也会永远重试失败的消息。
只有当我至少每隔一秒发送一条消息时,它才会起作用。如果我以批处理的方式发送消息,这种行为就不会起作用。除非我重新启动消费者,它将正常工作。
重现步骤:1.创建一个包含1个以上分区的主题,例如3或6 2。在Spring kafka 配置中,将并发级别指定为 1 3。对于“寻道到当前错误处理程序”
,将最大失败指定
为正值,例如 3 4。向主题发送十几条消息
您将看到每个失败的消息将进行无限重试,而不是我指定的< code>maxFailure。此外,我可以看到许多信息落后于消费者的滞后。
但是,如果您停止监听器并再次启动监听器,它将正确地跳过失败的消息。
我正在使用2.2.8,我仍然可以重现该错误,但配置更加复杂。我需要同时具有“寻转电流”和
“重试指数随机回退策略”的
模板
。因此,仍然存在遇到它的风险。在这里报告。
这是Spring Kafka2.2.7.RELEASE
的一个bug,但已在2.2.8.RELEASE
中修复。
问题内容: 我有一个用于捕获任何分段错误或ctrl- c的应用程序。使用下面的代码,我能够捕获分段错误,但是该处理程序一次又一次地被调用。我该如何阻止他们。供您参考,我不想退出我的申请。我只是可以小心释放所有损坏的缓冲区。 可能吗? 处理程序就是这样。 在这里,对于Segmentation故障信号,处理程序被多次调用,并且很明显MyfreeBuffers()给我释放已释放的内存的错误。我只想释放一
我有一个应用程序,我用它来捕捉任何分割错误或ctrl-c。使用下面的代码,我能够捕获分段错误,但是处理程序被一次又一次地调用。我怎样才能阻止他们。告诉你,我不想退出我的申请。我只是可以小心释放所有损坏的缓冲区。 可能吗? handler是这样的。 这里的分段故障信号,处理程序被多次调用,因为明显的MyFreeBuffers()给我释放已经释放的内存的错误。我只想免费一次,但仍然不想退出应用程序。
问题内容: 我正在尝试使用JavaScript中的两个字符串进行不区分大小写的搜索。 通常情况如下: 该标志将不区分大小写。 但是我需要搜索第二个字符串。没有标志,它可以完美地工作: 如果我在上面的示例中添加标志,它将搜索searchstring而不是变量“ searchstring”中的内容(下一个示例不起作用): 我该如何实现? 问题答案: 是的,使用而不是。调用的结果将返回匹配自身的实际字符
我有一本区分大小写的字典, 所以我可以在这本字典里找到区分大小写的键。 例如,我可以有下面的键值对, {test,10} {测试,20} {test1,30} {test2,40} ... 当有人传递密钥时,我想检索该值。检索应该部分不区分大小写,这意味着,如果匹配准确的大小写,则返回区分大小写的结果,如果区分大小写的键不存在,则检索区分大小写的键值。 例如,在字典中插入上述值 如果用户通过“测试
问题内容: 您好,我注意到,这个简单的代码无法正常工作…… 该测试已在localhost上运行,我的意思是:我加载页面,关闭本地Web服务器,然后触发请求(通过一个简单的按钮,其中onclick指向此功能)。错误永远不会被调用,我得到的是调用成功处理程序,它具有textStatus =“ success”和data = null。我什至注意到请求在10秒之前就超时了。在Firefox(最新版本),
问题内容: 我有一个Lucene索引,该索引当前区分大小写。我想添加的 选项 有不区分大小写作为后备的。这意味着与案例匹配的结果将获得更大的权重,并且将首先出现。例如,如果结果数限制为10,并且有10个匹配项符合我的情况,那就足够了。如果仅找到7个结果,则可以从不区分大小写的搜索中再添加3个结果。 我的案子实际上更复杂,因为我有不同重量的物品。理想情况下,匹配“错误”的表壳会增加一些重量。不用说,