大家好,我有一个关于提取器和Kafka流的问题。。。。
在我们的html" target="_blank">应用程序中,有可能接收到无序事件,因此我喜欢根据负载中的业务日期来排序事件,而不是根据它们放置在主题中的时间点。
为此,我编程了一个定制的时间戳提取器,以便能够从有效负载中提取时间戳。我在这里所说的一切都非常有效,但当我构建这个主题的KTable时,我发现我收到的无序事件(从业务角度来看,它不是最后一个事件,而是在最后收到的)显示为对象的最后状态,而ConsumerRecord具有有效负载的时间戳。
我不知道,可能是我错了,以为Kafka流会用TimestampExtractor解决这个问题。
然后在调试过程中,我看到如果TimestampExtractor返回-1,结果Kafka Streams忽略了消息,TimestampExtractor也提供了最后接受事件的时间戳,所以我构建了一个逻辑来实现以下检查(payloadTimestamp
我是否可以处理这样的逻辑或存在哪些其他方法来处理Kafka流中的乱序事件......
谢谢你的回答...
目前(Kafka 2.0),KTable
s在更新时不考虑时间戳,因为假设输入主题中没有无序数据。这种假设的原因是“单编写器原则”——假设对于压缩的KTable输入主题,每个键只有一个生产者,因此,不会有任何关于单键的无序数据。
这是一个众所周知的问题:https://issues.apache.org/jira/browse/KAFKA-6521
对于您的修复:执行此“黑客”不是100%正确或安全的:
要正确处理此问题,您需要在应用程序逻辑中“手动”过滤,而不是无状态和与键无关的
TimestampExtractor
。您可以将其作为流读取,并应用. groupByKey(). duce()
来构建KTable
。在您的Reduer
逻辑中,您比较新旧记录的时间戳,并返回具有较大时间戳的记录。
我正在尝试使用Kafka流来处理Kafka主题中的一些数据。数据来自Kafka0.11.0编写的Kafka主题,该主题没有嵌入时间戳。在网上读了一些书之后,我明白了我可以通过在自定义类中扩展类并将其传递到中来解决这个问题。 我是这样做的- 我基于github上的这段代码 但是,当我运行
这个服务我已经测试了它,使用不同版本的Kafka(更高或等于0.10),它工作良好。 以下是我的配置: Spring:cloud:stream:kafka:streams:binder:brokers:${KAFKA_BROKERS}applicationid:email-MESSAGES-stream configuration:default.key.serde:org.apache.kafk
我有一个关于kafka流应用程序中的控制流的基本问题。如果有两个源主题 我做了一个非常初步的测试,当记录被消费时,我偷看了一下,然后用一个简单的速溶软件打印了它们被处理的瞬间。现在 这些是主题中记录的开始和结束时间戳 主题B记录在主题A之前提取。Sysout显示主题B中的所有记录。有人能帮助理解这一点吗?我希望在编写具有多个输入源的流式应用程序时使用这种理解。 提前感谢
我从教程中创建了示例Kafka Streams应用程序: 不幸的是,这个应用程序不读取输入流。我有一个来自PostgreSQL的JDBC源连接器,它正在处理来自一个数据库的精细流数据(我可以在本主题中的Kafka Connect UI数据上看到)。 我的问题是,即使我在BOOTSTRAP\u SERVERS\u CONFIG的Properties IP is localhost中更改了IP,我也不
我在java中有一个函数,在这个函数中我试图获取未读的消息。例如,如果我在broker中有偏移量为0、1、2的消息,这些消息已经被使用者读取,并且如果我关闭我的使用者一个小时。那时我产生的信息偏移量为3,4,5。之后,当我的消费者启动时,它应该从偏移量3读取消息,而不是从0读取消息。但是,它要么读取所有的消息,要么读取启动Kafka Consumer后产生的消息。我想读那些未读或未提交的消息 我尝
我的场景是我使用make很多共享前缀(例如house.door,house.room)的Kafka主题,并使用Kafka stream regex主题模式API消费所有主题。一切看起来都很好,我得到了数据的密钥和信息。 为了处理数据,我需要主题名,这样我就可以根据主题名进行连接,但我不知道如何在Kafka stream DSL中获得主题名。