当前位置: 首页 > 知识库问答 >
问题:

连接到Kafka的Spring Cloud Stream的多个StreamListeners

唐声
2023-03-14

在一个使用Spring Cloud Stream连接到Kafka的Spring Boot应用程序中,我试图设置两个单独的Stream侦听器方法:

    null
Exception in thread "test-d44cb424-7575-4f5f-b148-afad034c93f4-StreamThread-2" java.lang.IllegalArgumentException: Assigned partition t1-0 for non-subscribed topic regex pattern; subscription pattern is t3
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignFromSubscribed(SubscriptionState.java:195)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:225)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:848)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:805)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:771)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:741)

我认为重要的部分是:“为非订阅的主题正则表达式模式分配分区t1-0;订阅模式是T3”。这是两个不相关的主题,所以据我所知,任何与t3相关的东西都不应该订阅任何与T1相关的东西。引起问题的确切主题也会断断续续地发生变化:有时被提及的是自动生成的主题之一,而不是t1本身。

以下是两个流侦听器的设置方式(以Kotlin表示):

@StreamListener
fun listenerForT1AndT2(
        @Input("t1") t1KTable: KTable<String, T1Obj>,
        @Input("t2") t2KTable: KTable<String, T2Obj>) {

    t2KTable
        .groupBy(...)
        .aggregate(
                { ... },
                { ... },
                { ... },
                Materialized.with(Serdes.String(), JsonSerde(SomeObj::class.java)))
        .join(t1KTable,
                { ... },
                Materialized.`as`<String, SomeObj, KeyValueStore<Bytes, ByteArray>>("test")
                        .withKeySerde(Serdes.String())
                        .withValueSerde(JsonSerde(SomeObj::class.java)))
}

@StreamListener
fun listenerForT3(@Input("t3") t3KStream: KStream<String, T3Obj>) {
    events.map { ... }
}

但是,当我只使用@streamlistener注释一个方法来设置代码,并为所有三个主题取参数时,一切工作都很好,例如。

@StreamListener
fun compositeListener(
        @Input("t1") t1KTable: KTable<String, T1Obj>,
        @Input("t2") t2KTable: KTable<String, T2Obj>,
        @Input("t3") t3KStream: KStream<String, T3Obj>) {
    ...
}

我还尝试将这两个侦听器方法分成两个单独的类,每个类只对它感兴趣的接口使用@enablebinding(即t1和t2的一个接口,t3的一个单独的接口),但这没有帮助。

我发现的所有与这个错误消息相关的东西,例如这里,都是关于拥有多个应用程序实例的,但在我的例子中,只有一个Spring Boot应用程序实例。

共有1个答案

东门茂实
2023-03-14

每个StreamListener方法都需要单独的应用程序id。这里有一个例子:

spring.cloud.stream.kafka.streams.bindings.t1.consumer.application-id=processor1-application-id spring.cloud.stream.kafka.streams.bindings.t2.consumer.application-id=processor1-application-id spring.cloud.stream.kafka.streams.bindings.t3.consumer.application-id=processor2-application-id

您可能希望使用最新的快照(2.1.0)进行测试,因为绑定器处理应用程序id的方式最近发生了一些变化。

更多详情请看这里的更新。下面是多个StreamListener方法的工作示例,这些方法是Kafka流处理器。

 类似资料:
  • 我有一个Spring启动应用程序,它使用来自 Kafka 集群中某个主题(例如 topic1)的消息。这就是我的代码目前的样子。 现在我想从另一个Kafka集群中的不同主题开始消费。一种方法是为此创建另一个bean。但是有更好的方法吗?

  • 我想我需要为每个StreamListener方法单独的应用程序id,但是如果我正在监听相同的主题,我如何在application.yml文件中配置它呢?

  • 问题内容: 即时通讯使用c#为大学项目创建.Net应用程序,需要一些帮助! 我在网络驱动器上托管有一个mdf文件,并且需要多个客户端才能从其应用程序访问此数据库。每个客户端计算机将运行它们自己的sql express实例。 我可以从我的应用程序访问文件,但是当另一个客户端尝试访问它们时,他们会收到以下错误消息 无法打开用户默认数据库。登录失败。用户“ EEEC \ 40023753”的登录失败。

  • 如何以可伸缩的方式编写连接多个Kafka主题的使用者? 我有一个主题用一个键发布事件,第二个主题用相同的键发布与第一个主题的子集相关的其他事件。我想编写一个订阅这两个主题的使用者,并为出现在这两个主题中的子集执行一些额外的操作。 理想情况下,我需要将主题绑定在一起,以便以相同的方式对它们进行分区,并同步地将分区分配给使用者。我怎么能这么做? 我知道Kafka Streams将主题连接在一起,这样键

  • 我是Kafka连线的新手。我有一个如下的用例: > 有一个共享主题,我在其中收到不同实体的消息,例如员工、部门(实际表名称不同) 员工和部门的模式在模式注册表中注册 使用Kafka接收器连接器,是否可以根据架构分离每个实体的数据并写入相应的表示例,进入主题的员工数据应转到员工表,部门数据应转到部门表 如果没有,还有其他更好的方法吗?

  • 我按照以下说明设置了一个多节点kafka集群。现在,如何连接到动物园管理员?在JAVA中,只连接一个来自生产者/消费者端的动物园管理员可以吗?或者有办法连接所有的动物园管理员节点吗? 设置多节点阿帕奇动物园守护者集群 在集群的每个节点上,将以下行添加到文件kafka/config/zookeeper.properties中 在群集的每个节点上,在由 dataDir 属性表示的文件夹中创建一个名为