当前位置: 首页 > 知识库问答 >
问题:

用Kafka活页夹重播来自Spring Cloud Stream中死信队列的消息

莫飞翮
2023-03-14

我们正在使用带有汇流模式注册表、Avro和Kafka绑定器的Spring云流。我们已经将数据处理管道中的所有服务配置为使用一个共享的DLQ Kafka主题,以简化异常处理的过程,并能够重播失败的消息。然而,由于某种原因,我们似乎无法正确提取有效负载消息,因为具有不同模式的消息被发布到单个DLQ。因此,我们失去了原始消息的模式轨迹。

我想知道是否有任何方法可以在dlq中维护失败消息的原始schema_id以便将其用于无缝重播。

共有1个答案

南宫才艺
2023-03-14

结果表明,通过将主题命名策略更改为RecordNameStrategy,就可以实现这一点,并且无论主题名称如何,记录都在所有主题上保持原始模式。更多细节可以在这里找到。

 类似资料:
  • 什么是最好的方法来实现死信队列(DLQ)的概念在Spring Boot 2.0应用程序中使用sping-kafka 2.1. x有所有的消息被处理失败的@KafkaListener方法的一些bean发送到一些预定义的Kafka DLQ主题并且不丢失一条信息? 因此,Kafka的记录是: 已成功处理, 处理失败,并发送到DLQ主题, 处理失败,未发送到DLQ主题(由于意外问题),因此将再次被侦听器使

  • 这种需求类似于通过公开的REST服务API(Spring Boot)处理来自死信队列的消息。以便一旦调用REST服务,就会从DL队列中消耗一条消息,并将再次发布到主队列中进行处理。@RabbitListener(queues=“queue_name”)立即使用消息,这在场景中是不需要的。该消息只需由REST服务API使用。有什么建议或解决办法吗?

  • 我有一个lambda函数,我想为它创建一个SQS死信队列。我首先在Terraform中创建SQS: 这是来自Terraform的例子。但是,我被redrive_policy卡住了。 我是否正确理解,这为SQS队列设置了一个死信队列? 如果我设置了redrive_policy,这意味着我在一个DLQ上设置了一个DLQ。我觉得可以在DLQ上设置DLQ,在DLQ上设置DLQ,以此类推。 我找不到这方面的

  • 死信队列(Dead Letter Queue)本质上同普通的Queue没有区别,只是它的产生是为了隔离和分析其他Queue(源Queue)未成功处理的消息。 创建死信队列的方法参见createQueue() API,与创建普通队列无异, 死信队列不可调用deadMessage(), deadMessageBatch API,其他操作都与对普通Queue的操作无异。 为了将源Queue的未能成功处理

  • 对于异步的触发器,平台会对函数失败的任务进行最多3次重试。 在新建触发器的时候,为触发器配置一条死信队列,从用户的EMQ队列中选择一条,用于接收函数失败的任务。 在设置死信队列前,请对group: CIf76b0600-24e9-42c4-acf3-d491fbd9fd71​ 授予 FULL_CONTROL 权限,若不授予权限,平台将丢弃失败的任务信息。 消息的内容如下,以后可能增加字段,请用户在

  • 我有一个使用kafka活页夹的spring cloud stream应用程序,它可以消费和发送消息。在应用程序中,我使用重试策略配置自定义错误处理程序,并将不可重试的异常添加到处理程序中。配置示例: 但是我看到,如果异常抛出,比应用程序重试处理消息3次。预期行为-如果App. MyCustomException.class抛出,将不会重复消费消息。如何为Spring云流kafka绑定应用程序配置重