当前位置: 首页 > 知识库问答 >
问题:

检索无法反序列化的Kafka消息的有效负载和标头

佟云
2023-03-14

我使用spring kafka 2.1.7来使用JSON消息,我想处理无法正确反序列化的消息<为了覆盖在同一条消息上循环的默认行为,我扩展了JsonDeserializer来覆盖反序列化方法。

public class CustomKafkaJsonDeserializer<T> extends JsonDeserializer<T> {

    public CustomKafkaJsonDeserializer(Class<T> targetType) {
        super(targetType);
        this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true);
    }

    @Override
    public T deserialize(String topic, byte[] data) {
        try {
            return super.deserialize(topic, data);
        } catch (Exception e) {
            log.error("Problem deserializing data " + new String(data) + " on topic " + topic, e.getMessage());
            return null;
        }
    }

}

这是我的消费者及其配置:

@Service
public class Consumer {

    @KafkaListener(topics = "${kafka.topic.out}", containerFactory = "kafkaListenerContainerFactory", errorHandler = "customKafkaListenerErrorHandler")
    public void consume(@Payload Lines lines, @Headers MessageHeaders messageHeaders) {
        //treatment
    }

}

@Configuration
public class ConsumerConfig {

    ...

    @Bean
    public ConsumerFactory<String, Lines> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new CustomKafkaJsonDeserializer<>(Lines.class));
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Lines>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Lines> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put("bootstrap.servers", this.bootstrapServers);
        props.put("group.id", this.appName);
        props.put("key.deserializer", StringDeserializer.class);
        props.put("value.deserializer", CustomKafkaJsonDeserializer.class);
        props.put("security.protocol", this.securityProtocol);
        props.put("sasl.mechanism", this.saslMechanism);
        props.put("sasl.jaas.config", this.saslJaasConfig);
        return props;
    }
}

最后,我实现了自己的错误处理程序,以便将错误数据发送到其他主题。

@Component
public class CustomKafkaListenerErrorHandler implements KafkaListenerErrorHandler {

    @Autowired
    private KafkaErrorService kafkaErrorService;

    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception {
        log.error("error handler for message: {} [{}], exception: {}", message.getPayload(), message.getHeaders(), exception.getMessage());
        kafkaErrorService.sendErrorToKafka(message.getPayload().toString(), exception.getMessage());
        throw new RuntimeException(exception);
    }

}

这是当我使用错误消息时发生的情况:

  • CustomKafkaJsonDeserializer尝试反序列化消息并捕获异常。
  • 可以在catch块中检索有效负载,但不能检索标头。返回null以推进偏移量。
  • 它到达错误处理程序的handleError方法。message.getHeaders()返回正确的标头,但message.getPayload()返回一个KafkaNull对象。因此,我无法在这一步同时发送负载和标头。

对如何实现这一目标有何建议?

共有1个答案

葛昱
2023-03-14

返回一个包含数据和头的富对象,而不是返回null。

 类似资料:
  • 我试图将Kafka中的Avro消息反序列化为从Avro模式生成的POJO。我正在使用Kafkaavroderializer进行此转换。 我可以在

  • 我试图阅读和打印从Kafka使用Apache Flink的原型消息。 我遵循官方文件,但没有成功:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/third_party_serializers/ Flink消费者代码是: 反序列化器代码是:

  • 我正在尝试从同一个Kafka主题反序列化不同的JSON有效负载。这里问的其他问题引导我进行了第一次尝试,但我无法让它运行。 正如Gary所提到的(这里),有一些提示(JsonSerializer.ADD\u TYPE\u INFO\u HEADERS),但当我发送和接收这两条消息时,我会收到一个异常。 ... LoggingErrorHandler在ConsumerRecord中已经提到了一个(正

  • 问题内容: Eclipse JDT编译器似乎存在一个问题,在某些情况下,Java 8 lamda不能正确反序列化,而是抛出。我正在使用最新发布的维护版本,如下所示: 现有的Bug / SO条目报告了已(至少部分地)已解决的类似问题。我已经亲自验证以下问题已解决。 访问实例字段和方法的Java 8Lambda不能反序列化 https://bugs.eclipse.org/bugs/show_bug.

  • Eclipse JDT编译器似乎有一个问题,在某些情况下,Java8个lamda没有正确反序列化,而是抛出一个。我正在使用最近的分布式维护版本,如下所示: 现有的bug/SO条目报告了在中已经(至少部分)解决的类似问题。我亲自验证了以下问题已经得到解决。 无法反序列化访问实例字段和方法的Java 8 lambda https://bugs.eclipse.org/bugs/show_bug.cgi

  • 有没有办法在Kafka消息有效载荷中添加时间戳标头?我想检查消息是何时在消费者端创建的,并基于此应用自定义逻辑。 编辑: 我试图找到一种方法,将一些自定义值(基本上是时间戳)附加到生产者发布的消息上,这样我就可以在特定的时间段内消费消息。现在Kafka只确保消息将按照它们被放入队列的顺序传递。但是在我的例子中,先前生成的记录可能在某个延迟之后到达(因此在时间T1生成的消息可能比在稍后时间T2生成的