当前位置: 首页 > 知识库问答 >
问题:

在Kafka主题中使用了所有可用的消息后,如何返回包含消息列表的未来?

龚运乾
2023-03-14

我编写的代码是

val sink = Sink.fold[List[KafkaMessage], KafkaMessage](List[KafkaMessage]()) { (list, kafkaMessage) =>
list :+ kafkaMessage
}

def consume(topic: String) =
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
  .map { message =>
    logger.info(s"Consuming ${message.record.value}")
    KafkaMessage(Some(message.record.key()), Some(message.record.value()))
  }
  .buffer(bufferSize, overflowStrategy)
  .runWith(sink)

然而,未来永远不会回来,它消耗必要的消息,然后继续反复轮询主题。有没有办法把未来还回去,然后把消费者关起来?

共有1个答案

汲丰茂
2023-03-14

正如Kafka用于流数据一样,没有所谓的“所有消息”,因为可以在任何时候将新数据附加到主题中。

我想,你可以做两件事:

  1. 检查上次轮询和终止或
  2. 返回的记录数
  3. 您需要通过endoffsets获取“日志的当前结束”,并将其与每个分区的最新记录的偏移量进行比较。如果两者匹配,则可以返回。
 类似资料:
  • 我看过与此相关的类似问题,但并没有找到正确的答案。我只想从 Kafka 主题中删除消息,而不是更改保留超时。我已经安装了kafka_2.11-0.8.2.1,并使用蝙蝠文件在Windows上运行它。我想知道我是否可以删除主题中发布的所有消息,而不删除整个主题。

  • 我有以下用例: 我有两个Kafka主题,一个是用来处理传入消息流的,另一个是用来存储记录的,作为应用程序初始状态的引导。 有没有办法做到以下几点: 当应用程序启动时,读取Kafka主题中的所有消息,并将该主题中用于将应用程序引导至初始状态的所有存储在内存中 只有在读取了所有消息后,才允许处理流主题中的 因为在应用程序运行时,状态主题上可能会有其他记录,以便在不必重新启动应用程序的情况下将它们合并到

  • 我有一个批处理作业,它将一天触发一次。要求是 使用该时间点上关于Kafka主题的所有可用消息 处理消息 如果进程已成功完成,则提交偏移量。 当前,我poll()while循环中的消息,直到ConsumerRecords.isEmpty()为true。当ConsumerRecords.isEmpty()为true时,我假设Topic在该时间点的所有可用记录都已被使用。应用程序维护偏移量并关闭kafk

  • 我需要一个Kafka主题存储的消息数量。这与任何消费者是否消费了消息无关。 以上是否等于Kafka主题中当前存储的消息数?

  • 我正在开发一个模块,它使用来自Kafka主题的消息并发布到下游系统。在下游系统不可用的情况下,消费者不确认Kakfa消息。因此,当我的消费者收到消息时,当下游系统不可用时,kakfa的偏移量将不会被提交。但是如果我在下游系统启动后收到新消息,并且当我确认该消息时,最新的偏移量将被提交,并且消费者永远不会收到主题中没有偏移量提交的那些消息。