当前位置: 首页 > 面试题库 >

如何在Spring Kafka Consumer中跳过损坏的(不可序列化的)消息?

令狐高洁
2023-03-14
问题内容

有没有一种方法可以配置Spring Kafka使用者以跳过无法读取/处理(已损坏)的记录?

我看到一种情况,如果无法反序列化,则消费者将停留在同一记录上。这是消费者抛出的错误。

Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not construct instance of java.time.LocalDate: no long/Long-argument constructor/factory method to deserialize from Number value

使用者轮询该主题,并一直循环循环打印相同的错误,直到程序被杀死为止。

在具有以下消费者工厂配置的@KafkaListener中,

Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

问题答案:

您需要ErrorHandlingDeserializer:https : //docs.spring.io/spring-
kafka/docs/2.2.0.RELEASE/reference/html/_reference.html#error-handling-
deserializer

如果无法升级到该2.2版本,请考虑实施自己的版本并返回null无法正确反序列化的那些记录。

源代码在这里:[https]( https://github.com/spring-projects/spring-
kafka/blob/master/spring-
kafka/src/main/java/org/springframework/kafka/support/serializer/ErrorHandlingDeserializer2.java)
//github.com/spring-projects/spring-kafka/blob/master/spring-
kafka/src/main/java/org/springframework/kafka/support/serializer/ErrorHandlingDeserializer2.java


 类似资料:
  • 这个问题是针对Spring Kafka的,与高级消费者的Apache Kafka相关:跳过损坏的消息 是否有方法配置Spring Kafka consumer以跳过无法读取/处理(已损坏)的记录? 我看到的情况是,如果不能反序列化,消费者就会被卡在同一条记录上。这就是消费者抛出的错误。

  • 问题内容: 如何序列化未实现Serializable的对象?我不能将其标记为Serializable,因为该类来自第3方库。 问题答案: 您不能序列化未实现的类,但可以将其包装在可以实现的类中。为此,您应该在包装器类上实现和,以便可以以自定义方式序列化其对象。 首先,使您的非序列化字段。 在中,首先调用流以存储所有非瞬态字段,然后调用其他方法来序列化不可序列化对象的各个属性。 在中,首先调用流以读

  • 我正在使用Spring Kafka consumer和Avro模式构建我的应用程序。 但是,如果消息无法反序列化到我构建的指定Avro特定记录,消费者将不断地反复尝试相同的消息(无限重试)。 在这种情况下,如果我的使用者出现反序列化程序异常,我如何配置使用者应用程序以跳过当前消息并移动到下一个偏移量。 我已经研究了Spring Kafka错误句柄,它只能处理侦听器中的异常,而不是在反序列化阶段。

  • 问题内容: 一些实现IEnumerable的自定义类型不一定具有后备集合。它们可以动态生成,例如使用“ yield”或LINQ。这是一个例子: 我发现Json.NET的默认序列化是枚举每个值并将这些值存储在JavaScript数组中(我不想要)。然后,默认的反序列化器将无法反序列化集合,因为它无法填充。在这些情况下,我希望Json.NET跳过默认的JavaScript数组序列化,而只存储类的成员。

  • 所以我有 我读过Baeldung的文章和其他东西,但似乎我的选项是自定义类型(这是一个原始类型)或全局类型。如何实现这种自定义序列化?

  • 如果一封邮件被发送到我的收件箱,我会收到一条消息,并将内容插入数据库。我有一个组织。springframework。整合。果心信息如下: 现在,如果出现故障,我希望有故障安全恢复机制,我想的是将消息对象序列化到一个文件中,然后反序列化并更新到DB。 问题1。在这种情况下,如何序列化消息对象?2。除了序列化,还可以使用其他机制吗? 编辑我以前没有做过序列化,我听说类应该实现Serializable,