@StreamListener("countries")
@SendTo("aggregated-statistic")
public KStream<?, AggregatedCountry> process(KStream<Object, Country> input) {
return input
.groupBy((key, value) -> value.getCountryCode())
.aggregate(this::initialize,
this::aggregateAmount,
materializedAsPersistentStore("countries", Serdes.String(),
Serdes.serdeFrom(new JsonSerializer<>(),
new JsonDeserializer<>(AggregatedCountry.class))))
.toStream()
.map((key, value) -> new KeyValue<>(null, value));
}
@StreamListener("countries")
@SendTo("daily-statistic")
public KStream<?, List<DailyStatistics>> daily(KStream<Object, Country> input) {
return input
.groupBy((key, value) -> value.getCountryCode())
.aggregate(this::initializeDailyStatistics,
this::dailyStatistics,
materializedAsPersistentStore("daily", Serdes.String(),
Serdes.serdeFrom(new JsonSerializer<>(),
new JsonDeserializer<>(List.class))))
.toStream()
.map((key, value) -> new KeyValue<>(null, value));
}
Exception in thread "kafka-stream-f4f8166b-cbeb-42ca-b461-2b3a23885a5d-StreamThread-1" java.lang.IllegalStateException: Consumer was assigned partitions [kafka-stream-daily-repartition-0] which didn't correspond to subscription request [kafka-stream-countries-repartition, countries]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.handleAssignmentMismatch(ConsumerCoordinator.java:218)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:424)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:353)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:963)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:863)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
我想我需要为每个StreamListener方法单独的应用程序id,但是如果我正在监听相同的主题,我如何在application.yml文件中配置它呢?
您需要提供两个单独的输入绑定(并且这两个绑定都可以指向同一个主题)。不能在多个StreamListener
上使用相同的绑定名称。然后,您可以在输入绑定上为多个基于StreamListener
的处理器设置application.id
。对于例如。
spring.cloud.stream.kafka.streams.bindings.countries1.consumer.applicationId
而且
spring.cloud.stream.kafka.streams.bindings.countries2.consumer.applicationId
参见参考文档中的本节。
在一个使用Spring Cloud Stream连接到Kafka的Spring Boot应用程序中,我试图设置两个单独的Stream侦听器方法: null 我认为重要的部分是:“为非订阅的主题正则表达式模式分配分区t1-0;订阅模式是T3”。这是两个不相关的主题,所以据我所知,任何与t3相关的东西都不应该订阅任何与T1相关的东西。引起问题的确切主题也会断断续续地发生变化:有时被提及的是自动生成的主
我在kafka-连接中创建了两个kafka连接器,它们使用相同的连接器类,但它们听的主题不同。 当我在节点上启动进程时,两个连接器最终都会在此进程上创建任务。但是,我希望一个节点只处理一个连接器/主题。如何将主题/连接器限制为单个节点?我在connect-distributed.properties中没有看到任何配置,其中进程可以指定使用哪个连接器。 谢谢。
分布式模式下Kafka Connect集群的偏移管理行为是什么,即运行多个连接器并监听同一组主题(或一个主题)? 因此,在分布式模式下,Kafka Connect 会将偏移量信息存储在 Kafka 中,此偏移量将由集群中的工作线程读取和提交。如果我在该 Kafka Connect 集群中运行多个连接器侦听同一主题,会发生什么情况?分区的偏移量是否与所有连接器相同,或者每个连接器在分区上的偏移量是否
如何以可伸缩的方式编写连接多个Kafka主题的使用者? 我有一个主题用一个键发布事件,第二个主题用相同的键发布与第一个主题的子集相关的其他事件。我想编写一个订阅这两个主题的使用者,并为出现在这两个主题中的子集执行一些额外的操作。 理想情况下,我需要将主题绑定在一起,以便以相同的方式对它们进行分区,并同步地将分区分配给使用者。我怎么能这么做? 我知道Kafka Streams将主题连接在一起,这样键
我见过,但对于我的(简单的)用例来说,它似乎有些过头了。 我也知道,但我不想仅仅为此编写和维护代码。 我的问题是:有没有一种方法可以用kafka原生工具实现这个主题调度,而不用自己写一个Kafka-Consumer/Producer?
我们有一个Kafka主题,有源源不断的数据。为了处理它,我们有一个无状态的Flink管道,它使用该主题并写入另一个主题。 我们是不是漏掉了什么?我们误会什么了吗?有没有更好的解决办法? 谢了!