当前位置: 首页 > 知识库问答 >
问题:

Kafka将消息持久化,直到被所有组使用

郑狐若
2023-03-14

我有一个Kafka主题,它有多个消费者群体。我需要主题上的消息在其持续时间到期时不被删除,如果它们尚未被所有消费者组读取
是否可以在持续时间之外设置其他持久性规则?我需要这些信息始终停留在一个主题上,如果它们从未被使用过
如果邮件未被使用且其持续时间已过期,是否可以“刷新”该邮件的超时?

共有1个答案

司徒杜吟
2023-03-14

这在Kafka是不可能的。Kafka与许多传统的消息代理不同,它不跟踪哪些消息已经被使用或没有被使用。这是消费者的责任。由于代理不跟踪此,因此无法基于此进行主题清理。

在某些情况下,您可以使用压缩主题,这些主题将保留每个键的最后一条消息。多亏了这一点,即使是连接较晚的消费者也可能能够恢复状态。但这仅适用于特定的数据类型,例如状态更改等。

 类似资料:
  • 我试着把我的头缠绕在Kafka的溪流和一些根本的问题,我似乎无法解决,我自己。我理解和Kafka状态存储的概念,但我很难决定如何实现它。我还在使用Spring Cloud Streams,这在此基础上增加了另一个层次的复杂性。 我的用例: 一些有状态规则如下所示: 我当前的实现将所有这些信息存储在内存中,以便能够执行分析。由于显而易见的原因,它不容易扩展。所以我想我会坚持到Kafka的州立商店。

  • 我见过(旧?),我一直在考虑使用压缩键,但是对于kafka来说,是否有一个简单的选项可以永远删除消息? 或者最好的选择是给保留期一个可笑的高值?

  • 我正在使用网络逻辑10.3。我正在尝试配置一个持久订阅,其中包含由 jdbc 存储(在 Oracle DB 中)支持的持久消息。我有一个主题,MDB 正在作为持久订阅者侦听该主题。在场景-1下:如果我发送消息,它会命中MDB。 在场景2中:我挂起了MDB,希望发送到主题的消息只要不被MDB(它是唯一注册的持久订阅者)使用,就会一直存在。但是当我向主题发送消息时,它短暂地出现在那里,然后就消失了(我

  • 消息应答 ack >[danger] noAck: false 手动接收消息模式 async consume() { const ch = await this.app.amqplib.createChannel(); await ch.assertQueue(queueName, { durable: false }); const msg = await new Pro

  • 我有一个名为“test-topic”的主题,有3个分区。 当我启动一个将group-id设置为“test-group”的使用者(consumer-1)时,它连接并读取主题上的所有分区。到目前为止还好。 当我在同一个组中启动另一个消费者(consumer-2)时,问题就出现了。我希望在两个消费者之间划分分区时能够重新平衡,例如,消费者-1得到分区0和2,消费者-2得到分区1。这种情况不会发生,当然我

  • 我们有一个camel路由,在这里我们从输入队列读取消息,处理它,设置一些JMS头(使用exchange.getin().setheader(...)),然后将消息路由到某个输出队列。在MQ故障转移方案期间,将重新传递消息。但是,当重新传递消息时,我前面放置的JMS头丢失了。是否有任何方法可以在重新交付后保留JMS头?