这个问题是针对Spring Kafka的,与高级消费者的Apache Kafka相关:跳过损坏的消息
是否有方法配置Spring Kafka consumer以跳过无法读取/处理(已损坏)的记录?
我看到的情况是,如果不能反序列化,消费者就会被卡在同一条记录上。这就是消费者抛出的错误。
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not construct instance of java.time.LocalDate: no long/Long-argument constructor/factory method to deserialize from Number value
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
您需要errorhandlingdeserializer
:https://docs.spring.io/spring-kafka/docs/2.2.0.release/reference/html/_reference.html#error-handling-deserializer
如果您不能移动到2.2
版本,请考虑实现您自己的版本,并为那些不能正确反序列化的记录返回null
。
源代码在这里:https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/errorhandlingdeserializer2.java
问题内容: 有没有一种方法可以配置Spring Kafka使用者以跳过无法读取/处理(已损坏)的记录? 我看到一种情况,如果无法反序列化,则消费者将停留在同一记录上。这是消费者抛出的错误。 使用者轮询该主题,并一直循环循环打印相同的错误,直到程序被杀死为止。 在具有以下消费者工厂配置的@KafkaListener中, 问题答案: 您需要:https : //docs.spring.io/sprin
问题内容: 如何序列化未实现Serializable的对象?我不能将其标记为Serializable,因为该类来自第3方库。 问题答案: 您不能序列化未实现的类,但可以将其包装在可以实现的类中。为此,您应该在包装器类上实现和,以便可以以自定义方式序列化其对象。 首先,使您的非序列化字段。 在中,首先调用流以存储所有非瞬态字段,然后调用其他方法来序列化不可序列化对象的各个属性。 在中,首先调用流以读
我正在使用Spring Kafka consumer和Avro模式构建我的应用程序。 但是,如果消息无法反序列化到我构建的指定Avro特定记录,消费者将不断地反复尝试相同的消息(无限重试)。 在这种情况下,如果我的使用者出现反序列化程序异常,我如何配置使用者应用程序以跳过当前消息并移动到下一个偏移量。 我已经研究了Spring Kafka错误句柄,它只能处理侦听器中的异常,而不是在反序列化阶段。
所以我有 我读过Baeldung的文章和其他东西,但似乎我的选项是自定义类型(这是一个原始类型)或全局类型。如何实现这种自定义序列化?
我们在kafka中使用Ktabke进行聚合,它非常基本的用途,并参考了kafka文件。 使用Kafka的Streams API处理坏消息的KStream参考 我的用例非常简单,对于任何类型的异常,只需移到错误主题并移到不同的消息
问题内容: 我在android / java中对Location的子类进行序列化遇到了麻烦 位置不可序列化。我有一个名为FALocation的第一个子类,它没有任何实例变量。我已经宣布它可序列化。 然后,我有一个名为Waypoint的第二个类,看起来像这样: 序列化工作正常。 反序列化会产生跟随翼异常(腿对象包含一个航路点): 问题答案: 序列化位置绝对必要吗?也许您可以将其标记为瞬态,并在反序列