我编写了一个Spring Cloud Streams Kafka Streams Binder应用程序,它将多个Kafka输入主题多路复用到一个流中:
spring:
cloud:
stream:
bindings:
process-in-0:
destination: test.topic-a,test.topic-b
(来源:https://spring . io/blog/2019/12/03/stream-processing-with-spring-cloud-stream-and-Apache-Kafka-streams-part-2-programming-model-continued)
但是每当我在输入目的地(用逗号分隔)设置多个主题时,就会出现以下错误:
2022-06-17 14:07:07.648 INFO --- [-StreamThread-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=test-processor-2ba8d1d3-5bbe-45d3-a832-6a24cf2f5549-StreamThread-1-consumer, groupId=test-processor] Subscribed to topic(s): test-processor-KTABLE-AGGREGATE-STATE-STORE-0000000005-repartition, test.topic-a,test.topic-b
2022-06-17 14:07:07.660 WARN --- [-StreamThread-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=test-processor-2ba8d1d3-5bbe-45d3-a832-6a24cf2f5549-StreamThread-1-consumer, groupId=test-processor] Error while fetching metadata with correlation id 2 : {test-processor-KTABLE-AGGREGATE-STATE-STORE-0000000005-repartition=UNKNOWN_TOPIC_OR_PARTITION, test.topic-a,test.topic-b=INVALID_TOPIC_EXCEPTION}
2022-06-17 14:07:07.660 ERROR --- [-StreamThread-1] org.apache.kafka.clients.Metadata : [Consumer clientId=test-processor-2ba8d1d3-5bbe-45d3-a832-6a24cf2f5549-StreamThread-1-consumer, groupId=test-processor] Metadata response reported invalid topics [test.topic-a,test.topic-b]
2022-06-17 14:07:07.660 INFO --- [-StreamThread-1] org.apache.kafka.clients.Metadata : [Consumer clientId=test-processor-2ba8d1d3-5bbe-45d3-a832-6a24cf2f5549-StreamThread-1-consumer, groupId=test-processor] Cluster ID: XYZ
2022-06-17 14:07:07.663 ERROR --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams : stream-client [test-processor-2ba8d1d3-5bbe-45d3-a832-6a24cf2f5549] Encountered the following exception during processing and Kafka Streams opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.
org.apache.kafka.streams.errors.StreamsException: org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [test.topic-a,test.topic-b]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:627) ~[kafka-streams-3.2.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:551) ~[kafka-streams-3.2.0.jar:na]
Caused by: org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [test.topic-a,test.topic-b]
我尝试使用以下依赖项:
implementation 'org.apache.kafka:kafka-clients:3.2.0'
implementation 'org.apache.kafka:kafka-streams:3.2.0'
implementation "org.springframework.cloud:spring-cloud-stream"
implementation "org.springframework.cloud:spring-cloud-stream-binder-kafka"
implementation "org.springframework.cloud:spring-cloud-stream-binder-kafka-streams"
implementation "org.springframework.kafka:spring-kafka"
当我只设置一个输入主题时,一切正常。
我无法确定是什么导致了InvalidTopicException,因为我只在主题名称中使用了允许的字符,而且逗号分隔符似乎也是正确的(否则会出现不同的异常)。
实际上,在发布问题后,我自己找到了解决方案。因此,这里是未来的帮助:
显然,当我的处理器拓扑需要一个KTable
作为输入类型时,我不能复用输入主题。当我将处理器签名更改为KStream
时,它突然起作用:
不工作:
@Bean
public Function<KTable<String, Object>, KStream<String, Object>> process() {
return stringObjectKTable ->
stringObjectKTable
.mapValues(...
工作:
@Bean
public Function<KStream<String, Object>, KStream<String, Object>> process() {
return stringObjectKStream ->
stringObjectKStream
.toTable()
.mapValues(...
我不确定这是否是预期的行为,或者是否有其他问题,所以如果有更多的潜在问题,我希望得到任何提示。
我的眼球因为亮度而流血。有没有办法在JetBrains(IntelliJ IDEA、PhpStorm、WebStorm、PyCharm、Android Studio)中获得一个看起来不坏或需要数小时配置的黑暗主题? 我下载了一个主题,但当我试图将其导入JetBrains时,我收到一个错误消息: “导入失败:主题它不是一个有效的方案” 当前主题 提前谢谢。
我正在将一个应用程序迁移到SpringCloudStream的新的基于功能的编程模型中,但阻止了事件路由。 我必须路由来自两个不同kafka主题的事件,我不知道如何将functionRouter-in-0绑定到两个不同的目的地。 路由可以通过添加
我正在使用Spring Cloud Stream Kafka Binder。我有以下Kafka活页夹函数。 在yml中,我有: 如果我想从同一个功能向两个不同的主题发送数据,我需要做什么?
我在启动应用程序时遇到以下错误: 我认为这与有关,但想不通。
但当我运行:Caused by:java.lang.IllegalStateException时:您需要将一个theme.AppCompat主题(或后代)与此活动一起使用。 我不明白,谢谢^^
我已经读到,主题建模(从文本中提取可能的主题)最常用的技术是潜在的Dirichlet分配(LDA)。但最近我了解到另一款lda2vec。然而,我感兴趣的是尝试Word2Vec输出作为LDA的输入是否是一个好主意。 你认为为了一些研究而采用这种方法有意义吗?因为我正在做主题建模,所以需要一些新颖的方法。