我正在使用Spring Kafka consumer和Avro模式构建我的应用程序。
但是,如果消息无法反序列化到我构建的指定Avro特定记录,消费者将不断地反复尝试相同的消息(无限重试)。
在这种情况下,如果我的使用者出现反序列化程序异常,我如何配置使用者应用程序以跳过当前消息并移动到下一个偏移量。
我已经研究了Spring Kafka错误句柄,它只能处理侦听器中的异常,而不是在反序列化阶段。
我的消费者应用程序非常简单:
@KafkaListener(id = "demo-consumer-stream-group", topics = "customer-output-")
public void process(ConsumerRecord<String, Customer> record) {
LOGGER.info("Customer key: {} and value: {}", record.key(), record.value());
LOGGER.info("topic: {}, partition: {}, offset: {}", record.topic(), record.partition(), record.offset());
}
基于此代码,有时接收到的消息可能无法反序列化到正确的Customer
对象。
此外,我看到最近的一个解决方案是使用Spring Kafka的ErrorHandlingDeserializer2
来处理这个问题,但既然我使用的是Kafkavrodeserializer
我该如何计算这些配置呢?我目前的配置是:
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
留档里有解释。
通过custom Spring属性将反序列化程序设置为错误处理反序列化程序及其委托。
您可以使用DefaultKafkaConsumerFactory构造函数,该构造函数接受键和值反序列化器对象,并连接到已配置了适当委托的相应ErrorHandlingDeserializer2实例中。或者,您可以使用使用者配置属性(ErrorHandlingDeserializer使用这些属性)来实例化委托。属性名为ErrorHandlingDeserializer2。键_反序列化器_类和错误处理反序列化器2。值\反序列化程序\类。属性值可以是类或类名。以下示例显示了如何设置这些属性:
... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);
您需要将当前值键反序列化器设置为ErrorHandlingDeserializer.class并将当前值键反序列化器设置为ErrorHandlingDeserializer Key/Value反序列化器属性
这将看起来类似于这样的东西:
... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, KafkaAvroDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
在使用Spring Kafka Consumer时,我有时会收到以下错误消息。如代码片段所示,我至少实现了一次语义 1)我的疑问是,我是否错过了来自消费者的任何信息? 2) 我需要处理这个错误吗。由于 org.apache.kafka.clients.consumer.提交失败异常:无法完成偏移提交,因为消费者不是自动分区分配的活动组的一部分;消费者很可能被踢出组。 我的SpringKafka消费
我创建了一个简单的生产者-消费者应用程序,使用自定义序列化器和反序列化器。 在我生成的Message类中添加了一个新方法之后,使用者开始在反序列化时被堆栈。我的生产者使用的是新类(有新方法),消费者使用的是旧类(没有方法)。 JSONSerializer/Deserializer可以处理这些类型的修复吗?如果我要使用JSONSerialzier,它应该只关心类的模式,对吗?
有没有一种方法让我忽略这些异常并在消费者处移动偏移量?我想,因为我使用手动偏移提交,我有这个问题。有人知道如何配置kafka-avro-serializer-6.0.0.jar来完成我想要的任务吗? 多谢了。
我正在尽可能地简化我的消费者。问题是,当我看到Kafka监听器中的记录时: <代码>列表 我在使用时注意到: SpringKafka。消费者值反序列化器=io。汇合的。Kafka。序列化程序。KafkaavroderializerSpring。Kafka。消费者键反序列化器=组织。阿帕奇。Kafka。常见的序列化。字符串反序列化器 我得到以下错误: 2020-12-02 17:04:42.745调
我们正在考虑在我们的for消息传递中使用Kafka,我们的应用程序是使用Spring开发的。所以,我们已经计划用Spring-Kafka。 生产者将消息作为HashMap对象放入队列。我们有JSON序列化器,并且假设映射将被序列化并放入队列。这是生产者配置。 我们看到的文章很少,建议是这样做: 我们不想为创建反序列化程序编写一些代码。有没有我们缺少的样板?任何帮助都将不胜感激!!
我正在使用kafka从源接收数据,我正在使用用< code>Node.js编写的消费者应用程序,并使用< code>kafka-node连接到kafka服务器。另一方面,生产者是用< code>Java编写的,他们使用一些kafka流库来产生带有模式的avro消息。我可以接收消息,但它们是avro序列化的,下面是我接收的序列化消息格式- 我正在尝试反序列化它,但无法使用 npm模块,因为avsc只