当前位置: 首页 > 知识库问答 >
问题:

如何在spring kafka Exception handler中获取原始记录

刁远
2023-03-14

我正在为Spring kafka项目实施Exceptive处理,我有自己的DeadLetterPublishingRecoverer,它处理Kafka侦听器中发生的异常,整个流程是完美的,即当我抛出逻辑中的任何异常时,我可以根据框架实现将其记录并发送到DLQ主题。消费者记录出现问题,如果我更改消费者记录值中的任何内容,相同的消费者记录将发布到DLQ主题,更改实际上是错误的,我想记录原始消息。

...

       @KafkaListener(topics = "${message.topic.name}",containerFactory = "kafkaListenerContainerFactory")  
public void listenGroupFoo(ConsumerRecord<String, MyEvent> consumerRecord) throws InterruptedException, ExecutionException {         

    System.out.println("Received Message" + consumerRecord.value());

    MyEvent consumedEvent=consumerRecord.value();
    consumedEvent.setQuantity(1000);
    
    throw new InvalidProductCodeException();

}

...

我发送到主题的实际消息只包含10个数量,我正在更改一些内容,如将数量更改为1000,并引发一些异常(如处理时发生的任何异常)。。。

  Received MessageOrderObject CreatedDate=Mon Sep 14 19:38:15 IST 2020, productCode=TES, price=10, quantity=10, customerId=0002

...

在我抛出错误后,我的记录将是

...

Received MessageOrderObject [createdDate=Tue Sep 15 13:20:16 IST 2020, productCode=TES, price=10, quantity=10, customerId=0002]
Error  Record  OrderObject [createdDate=Tue Sep 15 13:20:16 IST 2020, productCode=TES, price=10, quantity=1000, customerId=0002]

这是我的DLQ子类

...

 @Component
 public class DeadLetterSubclass extends DeadLetterPublishingRecoverer {

@Value(value="${error.topic.name}")
static
String errorTopicName;


BusinessUtils utils=new BusinessUtils();
private static final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> 
 MY_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition("stock-order-error-topic", cr.partition());

public DeadLetterSubclass(KafkaOperations<? extends Object, ? extends Object> template) {
    super(template,MY_DESTINATION_RESOLVER);        
    this.setHeadersFunction((consumerRecord,exception)->{
        System.out.println("Error  Record  "+consumerRecord.value());           
        return consumerRecord.headers();
    });
        
}

...

我想记录发生异常时要发布的原始事件对象(ConsumerRecord)。在我的例子中,我的实际订单数量为10,但记录的订单数量为1000,这不是实际订单。

共有1个答案

沈子昂
2023-03-14

由于您对原始消费者记录有硬引用,因此如果您需要它在错误处理程序中保持不变,则不应更改它。

该框架无法预料您可能会更改侦听器中的值。制作副本“以防万一”的开销太大。

您不应该改变原始值-而是克隆它,然后改变克隆。

 类似资料:
  • 问题内容: 有没有办法从获取原始/原始JSON值? 问题: 现在是,但是我需要原始字符串。 有没有办法获得这个原始价值?另外,我无法更改创建方式(例如更改设置),因为它作为参数传递给我的课程… (参考:原始的NJsonSchema问题) 问题答案: 您无法获取原始字符串,无法识别日期字符串并将其转换为自身内部的结构。如果您这样做,则可以看到以下内容: 但是,您可以通过执行以下操作以ISO 8601

  • 下面的API接受来自客户端的json字符串,并将其映射到电子邮件对象中。如何将请求正文()作为原始字符串获取?(我需要原始字符串和键入版本的参数) PS:这个问题不是重复的:如何在SpringREST控制器中访问普通json主体?

  • 问题内容: 我试图从原始值获取枚举类型: 但是似乎对于带空格的字符串不起作用: 任何建议如何获得它? 问题答案: 太复杂了,只需将原始值直接分配给案例 如果案例名称与原始值匹配,您甚至可以忽略它 在Swift 3+中,所有枚举都是

  • 问题内容: 我正在尝试从文件夹中包含在项目中的原始文件获取。但是无论如何,我都会得到一个。 该文件是一个文件,也尝试过使用,也不起作用。使用DOES 播放两个文件都可以。 的回报: 我的代码: 错误: 问题答案: 尝试这种方法,用作您的inputStream。沿着这个地方: 返回一个InputStream 编辑:如果您使用上述方法,请删除这些代码 希望这有帮助,祝你好运!^^

  • 问题内容: 我在Nginx或Apache中对此地址进行了重写: 像这样的脚本 如何在PHP中访问此重写的URL?因为,如果我当然使用,我会得到: 但我只想要: 这可能吗?感谢您的帮助。 更新nginx cnf 问题答案: 在conf中,我们需要添加用户标头: 并阅读: 更新 由于某些原因,nginx不喜欢标题名称中的符号“ _”,之前不知道它的工作方式,也许在nginx更新后有所更改。现在我正在使

  • 如何获得原始json输出。如果可能的话,我不想实现用户数据类和解析器。有什么办法吗? 标记重复的帖子(获得原始HTTP响应与改造)不是为Kotlin和我需要Kotlin版本。