当前位置: 首页 > 知识库问答 >
问题:

Kafka流联接表太大

沃念
2023-03-14

我想使用Kafka Streams API对带有KTable的KStream执行左联接,将表的一些字段添加到流中。

使用包含所有相关条目的较小版本的表(大约1300个条目),一切工作都很好。

使用整个表(大约200,000个条目)后,在获得KTable的Avro消息(GenericRecord)的相关字段的行中会得到一个NullPointerException

当我在KSQL中执行相同的左联接操作时,从表中添加的字段为NULL。相关的联接键存在于表中,但在KSQL中查询它们需要大约。20秒后他们出现。

表是否太大,无法执行左联接?如果是这样的话,我可以做些什么来使连接使用整个表工作吗?

堆栈跟踪:

[myclass-68507371-7b8e-4bdc-8715-73d0307c9058-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks - stream-thread [myclass-68507371-7b8e-4bdc-8715-73d0307c9058-StreamThread-1] Failed to process stream task 3_0 due to the following error:
java.lang.NullPointerException
  at MyClass.lambda$main$6(MyClass.java:184)
  at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:73)
  at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
  at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
  at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:162)
  at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122)
  at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
  at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:364)
  at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)
  at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:420)
  at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:890)
  at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
  at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)

该字段在Avro架构中是可为空的。

共有1个答案

海翔宇
2023-03-14

正如堆栈跟踪所指出的(错误源自MyClass.lambda$main$6(MyClass.java:184)),您自己的代码中的异常,必须是ValueJoiner实现。当您执行leftjoin()时,传入的“ktable”值可能是null。我假设您的代码没有正确处理null。如果您想要完全避免获得null并在没有相应的KTable记录时删除KStream记录,则可以使用join()而不是leftjoin()

 类似资料:
  • 我正在导入一个DB,其中包含一些表示多对多和一对多关系的链接表。 1-到目前为止,根据我对Kafka流的理解,我似乎需要为每个链接表提供一个流,以便执行聚合。KTable将不可用,因为记录是按键更新的。但是,聚合的结果可能是Ktable中的一个。 2-然后是外键上的连接问题。似乎唯一的方法是通过GlobalKtable。link-table-topic->link-table-stream->li

  • 我有两个Kafka主题-和。第一个主题包含由唯一Id(称为)键入的recommendations对象。每个产品都有一个用户可以单击的URL。 主题获取通过单击推荐给用户的产品URL生成的消息。它是如此设置的,这些单击消息也由键控。 请注意 > 每个单击对象都会有一个相应的推荐对象。 click对象的时间戳将晚于Recommensions对象。 建议和相应的点击之间的间隔可能是几秒钟到几天(最多7天

  • 我需要帮助理解在Kafka2.2中使用max.task.idle.ms时的Kafka流行为。 我有一个KStream-KTable联接,其中KStream已被重新键入: 所有主题都有10个分区,为了测试,我将max.task.idle.ms设置为2分钟。myTimeExtractor只有在消息被标记为“快照”时才更新消息的事件时间:stream1中的每个快照消息都将其事件时间设置为某个常数T,st

  • 我想连接两个主题流(左连接),并在连接的流上进行基于窗口的聚合。然而,聚合将某些消息计数两倍,因为在连接期间,根据正确主题中的延迟,某些消息将发出两倍。以下是POC的代码。 它是否可以修复以避免因连接而重复?

  • 我正在尝试加入两个Kafka主题的两个数据流。 每个主题都有一个key值对,其中key是整数数据类型,value包含字符串格式的json。来自这两个源的数据类似于下面的示例(key、value): 现在我正尝试基于ProductID左联接这两个流,因此所有这些记录的键都设置为ProductID。但不幸的是,我在连接的正确流值中不断得到空值。甚至连一条记录都没有正确连接。下面是我加入这两个记录的代码

  • 我开始阅读Kafka Stream应用程序,在每个教程/示例中,通过比较KStream和GlobalkTable中的键来丰富数据。在我的情况下,我需要将KStream记录的值中的一个项与GlobalKTable中的一个键进行比较。如何实现这一点的任何想法或例子。