当前位置: 首页 > 知识库问答 >
问题:

带有Avro消息的Spring Cloud Stream JSON死信队列

宋涵忍
2023-03-14

我正在使用带有Avro和汇流模式注册表的Spring云流。我正在为所有服务使用一个单独的DLQ主题,因此具有不同模式的消息可能会落在这个主题中。我已禁用动态架构注册,以确保不传递错误消息(schemaspring.cloud.stream.schema.avro.DynamicSchemaGenerationEnabled=false)。

然而,问题是由于dlq上缺少模式,我可能会在进入这个主题时丢失一条消息。因此,我希望能够以JSON格式向dlq生成消息,并在管道的其余部分使用Avro。如果有人能帮助我如何做到这一点,或者能为我指出这件事的一个例子,我将不胜感激。

共有1个答案

叶国兴
2023-03-14

如果您使用的是Stream2.1或更高版本,请禁用绑定器中的DLQ处理,并使用ListenerContainerCustomizerbean向侦听器容器添加自定义的ErrorHandler;您可以将SeekToCurrEnterrorHandler与自定义恢复器一起使用-您可以将DeadLetterPublishingRecoverer作为起点-重写此方法...

/**
 * Subclasses can override this method to customize the producer record to send to the
 * DLQ. The default implementation simply copies the key and value from the consumer
 * record and adds the headers. The timestamp is not set (the original timestamp is in
 * one of the headers). IMPORTANT: if the partition in the {@link TopicPartition} is
 * less than 0, it must be set to null in the {@link ProducerRecord}.
 * @param record the failed record
 * @param topicPartition the {@link TopicPartition} returned by the destination
 * resolver.
 * @param headers the headers - original record headers plus DLT headers.
 * @param data the value to use instead of the consumer record value.
 * @param isKey true if key deserialization failed.
 * @return the producer record to send.
 * @see KafkaHeaders
 */
protected ProducerRecord<Object, Object> createProducerRecord(ConsumerRecord<?, ?> record,
        TopicPartition topicPartition, RecordHeaders headers, @Nullable byte[] data, boolean isKey) {
 类似资料:
  • 因此,我们计划使用Avro在融合的Kafka生态系统上进行交流。我目前对Avro的理解是,每条消息都有自己的模式。如果是这样的话,我们需要模式注册表来解决版本更新吗? 我问,因为在每条消息中携带模式可以防止需要像模式注册表这样的东西来将消息ID映射到模式。还是我在这里错过了什么?

  • 就像“休斯顿我们这里有一个问题”,在第一次尝试处理事件失败后,我需要安排/延迟消息5分钟。我已经在这个场景中实现了死信交换。 失败的消息会路由到DLX-->Retry队列,并在TTL 5分钟后返回工作队列进行另一次尝试。 下面是我正在使用的配置: producer.java: consumer.java:

  • 这种需求类似于通过公开的REST服务API(Spring Boot)处理来自死信队列的消息。以便一旦调用REST服务,就会从DL队列中消耗一条消息,并将再次发布到主队列中进行处理。@RabbitListener(queues=“queue_name”)立即使用消息,这在场景中是不需要的。该消息只需由REST服务API使用。有什么建议或解决办法吗?

  • 我已经设置了Apache camel,在其中我使用来自一个队列的消息并对其进行某种操作,然后将其传输到其他队列。 现在,如果异常来了,我希望它应该回滚,然后在6次尝试后,它发送到死信队列,目前回滚发生5-6次,但我的消息没有转移到死信队列。 这里会发生什么-->Queue1->>(消耗)-->Operation(引发异常)-->Rollback-->Queue1->>(消耗)-->Operatio

  • 今天,我使用Spring Cloud Streams和RabbitMQ,根据本文档编写了以下代码: 我的接口: 和我的属性文件: