我想使用Kafka Streams API对带有KTable的KStream执行左联接,将表的一些字段添加到流中。
使用包含所有相关条目的较小版本的表(大约1300个条目),一切工作都很好。
使用整个表(大约200,000个条目)后,在获得KTable的Avro消息(GenericRecord)的相关字段的行中会得到一个NullPointerException
。
当我在KSQL中执行相同的左联接操作时,从表中添加的字段为NULL。相关的联接键存在于表中,但在KSQL中查询它们需要大约。20秒后他们出现。
表是否太大,无法执行左联接?如果是这样的话,我可以做些什么来使连接使用整个表工作吗?
堆栈跟踪:
[myclass-68507371-7b8e-4bdc-8715-73d0307c9058-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks - stream-thread [myclass-68507371-7b8e-4bdc-8715-73d0307c9058-StreamThread-1] Failed to process stream task 3_0 due to the following error:
java.lang.NullPointerException
at MyClass.lambda$main$6(MyClass.java:184)
at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:73)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:162)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:364)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:420)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:890)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
该字段在Avro架构中是可为空的。
正如堆栈跟踪所指出的(错误源自MyClass.lambda$main$6(MyClass.java:184)
),您自己的代码中的异常,必须是ValueJoiner
实现。当您执行leftjoin()
时,传入的“ktable”值可能是null
。我假设您的代码没有正确处理null
。如果您想要完全避免获得null
并在没有相应的KTable记录时删除KStream记录,则可以使用join()
而不是leftjoin()
。
我正在导入一个DB,其中包含一些表示多对多和一对多关系的链接表。 1-到目前为止,根据我对Kafka流的理解,我似乎需要为每个链接表提供一个流,以便执行聚合。KTable将不可用,因为记录是按键更新的。但是,聚合的结果可能是Ktable中的一个。 2-然后是外键上的连接问题。似乎唯一的方法是通过GlobalKtable。link-table-topic->link-table-stream->li
我有两个Kafka主题-和。第一个主题包含由唯一Id(称为)键入的recommendations对象。每个产品都有一个用户可以单击的URL。 主题获取通过单击推荐给用户的产品URL生成的消息。它是如此设置的,这些单击消息也由键控。 请注意 > 每个单击对象都会有一个相应的推荐对象。 click对象的时间戳将晚于Recommensions对象。 建议和相应的点击之间的间隔可能是几秒钟到几天(最多7天
我需要帮助理解在Kafka2.2中使用max.task.idle.ms时的Kafka流行为。 我有一个KStream-KTable联接,其中KStream已被重新键入: 所有主题都有10个分区,为了测试,我将max.task.idle.ms设置为2分钟。myTimeExtractor只有在消息被标记为“快照”时才更新消息的事件时间:stream1中的每个快照消息都将其事件时间设置为某个常数T,st
我想连接两个主题流(左连接),并在连接的流上进行基于窗口的聚合。然而,聚合将某些消息计数两倍,因为在连接期间,根据正确主题中的延迟,某些消息将发出两倍。以下是POC的代码。 它是否可以修复以避免因连接而重复?
我正在尝试加入两个Kafka主题的两个数据流。 每个主题都有一个key值对,其中key是整数数据类型,value包含字符串格式的json。来自这两个源的数据类似于下面的示例(key、value): 现在我正尝试基于ProductID左联接这两个流,因此所有这些记录的键都设置为ProductID。但不幸的是,我在连接的正确流值中不断得到空值。甚至连一条记录都没有正确连接。下面是我加入这两个记录的代码
我开始阅读Kafka Stream应用程序,在每个教程/示例中,通过比较KStream和GlobalkTable中的键来丰富数据。在我的情况下,我需要将KStream记录的值中的一个项与GlobalKTable中的一个键进行比较。如何实现这一点的任何想法或例子。