我有一个Kafka流媒体应用程序,它订阅了许多主题,每个主题都有许多分区。当我创建应用程序拓扑并启动它时,我是否知道哪些主题的哪些分区分配给我的应用程序的当前实例?我想知道这个独立于任何记录是否由这个实例处理。
我知道当我得到一条记录时,我可以做processorContext。分区()
和处理器上下文。topic()
获取正在处理的当前记录的分区/主题信息。但我不是在找那个。
我正在寻找一个等效的KafkaConsumer.assigment
在kafka流侧。
我也尝试了以下代码,但我得到大小s为0。
<Prepare builder and sconfig>
kafkaStream = new KafkaStreams (builder, sconfig);
kafkaStream.start ();
Collection<StreamsMetadata> s = kafkaStream.allMetadata();
System.out.println("StreamsMetadata: size is " + s.size());
for (StreamsMetadata m : s) {
Set<TopicPartition> tp = m.topicPartitions();
System.out.println ("TopicPartition: " + tp.toString());
}
更新答案(2020年11月):
当我创建应用程序拓扑并启动它时,我是否知道哪些主题的分区被分配给我的应用程序的当前实例?
如果我理解正确的话,你可以这样做。在应用程序实例中,使用KafkaStreams#localThreadsMetadata()
获取(该应用程序实例的)所有本地流线程的ThreadMetadata
ThreadMetadata
包含该流线程上所有活动和备用任务的TaskMetadata
TaskMetadata
有一个方法topicPartitions()
来获取输入主题分区。
旧的、过时的答案:据我所知,Kafka流中没有公开这些信息的现有API。可以从Kafka消费者处获取此信息(Kafka流使用此信息),但它不会在Kafka流中公开。
我使用的是Kafka流,具有无状态的简单处理器拓扑结构。 我有一个主题,有100个分区,有2台机器,每台机器有50个线程,运行同一个流媒体应用程序,因此最终我将在它们之间进行1-1映射。 主题中的消息已是键控消息。 我有一个逻辑约束,一旦线程连接到一个或多个分区,它应该继续处理这些分区(当然,直到重新启动发生,它会重新洗牌) 我从日志中看到线程反复(重新)加入消费者组。 我的问题,kafka 流
spring cloud stream如何将多个Kafka分区分配给属于同一消费者组的反应流? 我注意到,如果我使用普通的非反应流侦听器,每个线程将被分配到一个分区,这取决于使用者并发配置。然而,在流(流量输入)的情况下,我没有注意到任何这样的并行行为。似乎只定义了一个流来处理来自所有分区的消息。 我的期望是每个Kafka主题分区都有独立的流,即使在由不同线程备份的同一节点上也是如此。
来自jvisualvm的快照
首先,很抱歉,如果我的术语不准确,我对Kafka很陌生,我已经尽可能多地读过了。我们有一个使用kafkastreams的服务,kafka版本:2.3.1。流应用程序具有一个流拓扑,该流拓扑从“topica”读取,执行转换并发布到另一个主题“topicb”,然后由拓扑的另一个流消费,并使用Ktable(localstore)聚合它。侦听器将ktable更改发布到另一个主题中。 主题有24个分区。我们
本文向大家介绍C#二分查找算法实例分析,包括了C#二分查找算法实例分析的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了C#二分查找算法。分享给大家供大家参考。具体实现方法如下: 希望本文所述对大家的C#程序设计有所帮助。
本文向大家介绍Kafka分区分配的概念?相关面试题,主要包含被问及Kafka分区分配的概念?时的应答技巧和注意事项,需要的朋友参考一下 一个topic多个分区,一个消费者组多个消费者,故需要将分区分配个消费者(roundrobin、range)