当前位置: 首页 > 知识库问答 >
问题:

Kafka Streams 2.5.0需要输入主题

鄂和璧
2023-03-14

从Kafka Streams 2.5.0开始,拓扑似乎必须包含一个输入主题。在Kafka2.4.1(以及更早的版本)中,情况并非如此。

我有一个应用程序,其中的拓扑只是创建一些全局状态存储,从其他应用程序写入的主题中读取数据。

使用Kafka 2.5.0,我得到以下错误:

13:24:27.161 [<redacted>-7cf1b5c9-4a6e-4bf2-9f77-f7f85f2df3bb-StreamThread-1] ERROR o.a.k.s.p.internals.StreamThread - stream-thread [<redacted>-7cf1b5c9-4a6e-4bf2-9f77-f7f85f2df3bb-StreamThread-1] Encountered the following error during processing:
java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1228)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)

如果添加一个虚拟输入主题(例如,通过streamsbuilder.stream(pattern.compile(“hack”);),应用程序启动良好。

这种行为是意料之中的,还是Kafka Streams 2.5.0中的无意更改?

更多细节:上面的用例可能看起来有点奇怪,我不得不同意。首先,这样做的原因是交互式查询的一个缺点,即在一段时间内,应用程序无法回答查询。我看到这个问题已经通过KIP-535在Kafka Streans 2.5.0中得到了修复,这很棒。我希望以后再调查智商。

共有1个答案

甄霖
2023-03-14

当我们切换到使用集合订阅时,在2.5.0中引入了一个回归。刚刚合并了一个修复程序,所以您应该在发布它们时升级到2.5.1或2.6。

 类似资料:
  • 这是菜单类 import java.util.Scanner; 公共类菜单{private String[]Menu_options; } 这是我使用的菜单驱动程序 公共类 Menutester { } 我第一次输入一些东西时,它根本不会重新生成,当我尝试在eclipse中调试它时,它会给我错误FileNotFoundException(Throwable)。(字符串)行:第一次输入195。 这就

  • 我在我的第一个java程序中扫描用户输入时遇到了一些麻烦。当我编译并运行它时,会立即提示输入(即命令行停止并闪烁)。当我输入任何东西时,第一行被打印出来,要求我输入一个整数。然后打印第二行,并提示我输入另一个值。 这个程序的输出是我输入的前两个值。这很难解释,但它基本上要求3个输入值,只使用两个。

  • 问题内容: 我有一个UNIX本机可执行文件,它要求像这样输入参数 prog.exe <foo.txt。 foo.txt有两行:bar baz 我正在使用java.lang.ProcessBuilder来执行此命令。不幸的是,prog.exe仅能使用文件重定向功能。有什么办法可以模仿Java中的这种行为? 当然, 不起作用。 谢谢! 问题答案: 未经测试,但类似的东西应该起作用。

  • 我遇到了一个循环问题,我必须输入两次温度才能启动循环。我想我知道问题在哪里,只是不知道如何解决。我已经编写了总共三周的代码,所以我在这方面完全是个初学者。 这是我有问题的代码部分: 我想你明白我的意思了。我唯一能让循环工作的方法就是重复int.Parse(Console.ReadLine()),但也许还有另一种修复方法可以修复必须输入两次温度的问题? 真希望有人能帮我解决这个问题。

  • 我正在努力使用 JOLT 转换复杂的 json。 输入JSON: 预期输出: 我无法理解如何在基于“字段名称”的输出中访问和分配“字段值”。请帮我做一下震动测试。 注意:输入JSON中的名称、标题和公司的顺序将是混乱和随机的,这意味着在“data”数组下,第一个对象仅与“Name”相关不是强制性的。

  • 但当我运行:Caused by:java.lang.IllegalStateException时:您需要将一个theme.AppCompat主题(或后代)与此活动一起使用。 我不明白,谢谢^^