当前位置: 首页 > 知识库问答 >
问题:

如何在Spring Cloud Stream Kafka Streams Binder中使用多路复用输入主题解决无效主题异常?

朱俊雅
2023-03-14

我编写了一个Spring Cloud Streams Kafka Streams Binder应用程序,它将多个Kafka输入主题多路复用到一个流中:

spring:
  cloud:
    stream:
      bindings:
        process-in-0:
          destination: test.topic-a,test.topic-b

(来源:https://spring . io/blog/2019/12/03/stream-processing-with-spring-cloud-stream-and-Apache-Kafka-streams-part-2-programming-model-continued)

但是每当我在输入目的地(用逗号分隔)设置多个主题时,就会出现以下错误:

2022-06-17 14:07:07.648  INFO --- [-StreamThread-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=test-processor-2ba8d1d3-5bbe-45d3-a832-6a24cf2f5549-StreamThread-1-consumer, groupId=test-processor] Subscribed to topic(s): test-processor-KTABLE-AGGREGATE-STATE-STORE-0000000005-repartition, test.topic-a,test.topic-b  
2022-06-17 14:07:07.660  WARN --- [-StreamThread-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=test-processor-2ba8d1d3-5bbe-45d3-a832-6a24cf2f5549-StreamThread-1-consumer, groupId=test-processor] Error while fetching metadata with correlation id 2 : {test-processor-KTABLE-AGGREGATE-STATE-STORE-0000000005-repartition=UNKNOWN_TOPIC_OR_PARTITION, test.topic-a,test.topic-b=INVALID_TOPIC_EXCEPTION}  
2022-06-17 14:07:07.660 ERROR --- [-StreamThread-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=test-processor-2ba8d1d3-5bbe-45d3-a832-6a24cf2f5549-StreamThread-1-consumer, groupId=test-processor] Metadata response reported invalid topics [test.topic-a,test.topic-b]  
2022-06-17 14:07:07.660  INFO --- [-StreamThread-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=test-processor-2ba8d1d3-5bbe-45d3-a832-6a24cf2f5549-StreamThread-1-consumer, groupId=test-processor] Cluster ID: XYZ 
2022-06-17 14:07:07.663 ERROR --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams    : stream-client [test-processor-2ba8d1d3-5bbe-45d3-a832-6a24cf2f5549] Encountered the following exception during processing and Kafka Streams opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.   

org.apache.kafka.streams.errors.StreamsException: org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [test.topic-a,test.topic-b]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:627) ~[kafka-streams-3.2.0.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:551) ~[kafka-streams-3.2.0.jar:na]
Caused by: org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [test.topic-a,test.topic-b]

我尝试使用以下依赖项:

implementation 'org.apache.kafka:kafka-clients:3.2.0'
implementation 'org.apache.kafka:kafka-streams:3.2.0'
implementation "org.springframework.cloud:spring-cloud-stream"
implementation "org.springframework.cloud:spring-cloud-stream-binder-kafka"
implementation "org.springframework.cloud:spring-cloud-stream-binder-kafka-streams"
implementation "org.springframework.kafka:spring-kafka"

当我只设置一个输入主题时,一切正常。

我无法确定是什么导致了InvalidTopicException,因为我只在主题名称中使用了允许的字符,而且逗号分隔符似乎也是正确的(否则会出现不同的异常)。

共有1个答案

韦寒
2023-03-14

实际上,在发布问题后,我自己找到了解决方案。因此,这里是未来的帮助:

显然,当我的处理器拓扑需要一个KTable作为输入类型时,我不能复用输入主题。当我将处理器签名更改为KStream时,它突然起作用:

不工作:

@Bean
public Function<KTable<String, Object>, KStream<String, Object>> process() {
  return stringObjectKTable ->
    stringObjectKTable
      .mapValues(...

工作:

@Bean
public Function<KStream<String, Object>, KStream<String, Object>> process() {
  return stringObjectKStream ->
    stringObjectKStream 
      .toTable()
      .mapValues(...

我不确定这是否是预期的行为,或者是否有其他问题,所以如果有更多的潜在问题,我希望得到任何提示。

 类似资料:
  • 我的眼球因为亮度而流血。有没有办法在JetBrains(IntelliJ IDEA、PhpStorm、WebStorm、PyCharm、Android Studio)中获得一个看起来不坏或需要数小时配置的黑暗主题? 我下载了一个主题,但当我试图将其导入JetBrains时,我收到一个错误消息: “导入失败:主题它不是一个有效的方案” 当前主题 提前谢谢。

  • 我正在将一个应用程序迁移到SpringCloudStream的新的基于功能的编程模型中,但阻止了事件路由。 我必须路由来自两个不同kafka主题的事件,我不知道如何将functionRouter-in-0绑定到两个不同的目的地。 路由可以通过添加

  • 我正在使用Spring Cloud Stream Kafka Binder。我有以下Kafka活页夹函数。 在yml中,我有: 如果我想从同一个功能向两个不同的主题发送数据,我需要做什么?

  • 我在启动应用程序时遇到以下错误: 我认为这与有关,但想不通。

  • 但当我运行:Caused by:java.lang.IllegalStateException时:您需要将一个theme.AppCompat主题(或后代)与此活动一起使用。 我不明白,谢谢^^

  • 我已经读到,主题建模(从文本中提取可能的主题)最常用的技术是潜在的Dirichlet分配(LDA)。但最近我了解到另一款lda2vec。然而,我感兴趣的是尝试Word2Vec输出作为LDA的输入是否是一个好主意。 你认为为了一些研究而采用这种方法有意义吗?因为我正在做主题建模,所以需要一些新颖的方法。