当前位置: 首页 > 知识库问答 >
问题:

Kafka流的超时

丁经略
2023-03-14

我期待关于Kakfa主题的n消息,一旦我收到n消息,我就会发出关于新主题的消息。我使用streams API来实现这一点,它很简单。但是,由于系统不可靠,我可能永远不会接收到n,但如果已经接收到n消息的x%(例如95%),并且在y秒内没有记录新消息,我仍然希望发出消息。这在Kafka streams中是可能的吗?还是我需要为它写一个consumer?

如果Kafka Streams有类似于Rx(http://reactivex.io/documentation/operators/timeout.html)的超时概念,我认为这是可能的,但我还没有在streaming API中找到。

共有1个答案

段渊
2023-03-14

没有超时概念,但可以使用标点符号来做你想做的事情。您将需要使用Kafka1.0.0,它添加了挂钟时间标点符号,还允许取消标点符号计划。

因此,每次收到一条记录并点击x%标记时,您都可以注册一个具有所需超时的计划。如果在超时前收到下一条消息,则可以取消计划并注册一条新的。此外,如果标点符号触发器,您可以发出并取消当前计划。

关于这个atm的文档并不多,因为汇流开源4.0还没有发布(它在内部使用了Kafka 1.0.0)。但您可以查看设计方案中的一些细节:https://cwiki.apache.org/confluence/display/kafka/kip-138%3a+change+标点+semantics

注意:您不需要升级您的代理,只要您愿意,就可以将您的流库升级到1.0.0。参见https://docs.confluent.io/current/streams/upgrade-guide.html#兼容性(1.0.0与0.11.0.x一样,对旧代理具有相同的向后兼容性)

 类似资料:
  • 在Spring Boot应用程序中,我试图配置Kafka流。用简单的Kafka主题,一切都很好,但我无法得到工作SpringKafka流。 这是我的配置: 我想创建一个基于主题的流。应用一个简单的转换并将此流中的消息发送到test主题。 我向发送以下消息,其中是我自己的复杂类型,但是我现在不知道如何将它转换为中的,以便能够在中使用它。 请建议如何使其工作。

  • 本文向大家介绍Kafka流的特点?相关面试题,主要包含被问及Kafka流的特点?时的应答技巧和注意事项,需要的朋友参考一下 答:Kafka流的一些最佳功能是 Kafka Streams具有高度可扩展性和容错性。 Kafka部署到容器,VM,裸机,云。 我们可以说,Kafka流对于小型,中型和大型用例同样可行。 此外,它完全与Kafka安全集成。 编写标准Java应用程序。 完全一次处理语义。 而且

  • 我需要从KafkaAvroDeserializer而不是标准的kafka反序列化器消耗的主题创建一个流。这是因为它将被发送到汇流JDBC接收器连接器(不支持标准序列化器/反序列化器)中使用的主题。在创建主题时,我对key和value都使用了KafkaAvroSerializer。 我的原始代码(在更改为密钥使用Kafka Avro序列化器之前)是: 注意:上面的Serdes.String不会正确反

  • 我正在尝试用《Spring的云流》和《Kafka》。下面是示例代码。但它似乎没有任何作用。它总是创建一个名为“输出”的主题。但这些价值观尚未公布。 应用亚马尔 我的目标就是创造价值。 依赖性-2.2.6。释放

  • 我有一个关于kafka流应用程序中的控制流的基本问题。如果有两个源主题 我做了一个非常初步的测试,当记录被消费时,我偷看了一下,然后用一个简单的速溶软件打印了它们被处理的瞬间。现在 这些是主题中记录的开始和结束时间戳 主题B记录在主题A之前提取。Sysout显示主题B中的所有记录。有人能帮助理解这一点吗?我希望在编写具有多个输入源的流式应用程序时使用这种理解。 提前感谢

  • 我想连接两个主题流(左连接),并在连接的流上进行基于窗口的聚合。然而,聚合将某些消息计数两倍,因为在连接期间,根据正确主题中的延迟,某些消息将发出两倍。以下是POC的代码。 它是否可以修复以避免因连接而重复?