顾名思义,有没有一种方法可以在java中获取特定主题的消费者列表?直到现在我才能得到这样的主题列表
final ListTopicsResult listTopicsResult = adminClient.listTopics();
KafkaFuture<Set<String>> kafkaFuture = listTopicsResult.names();
Set<String> map = kafkaFuture.get();
但我还没有找到一种方法来获得每个主题的消费者名单
我知道这个话题有点老了,但我目前正在开发一个仪表板,需要列出相关消费者群体的话题。Katya的回答是可以的,但它仍然不会为没有活跃成员的消费者群体列出TopicPartitions。我偶然发现了这个解决方案,希望有所帮助:
Properties props = ...//here you put your properties
AdminClient kafkaClient = AdminClient.create(props);
Map<TopicPartition, OffsetAndMetadata> offsets = kafkaClient
.listConsumerGroupOffsets("consumer_group_name")
.partitionsToOffsetAndMetadata()
.get();
主题偏移量映射中的分区可以针对特定主题进行解析。此列表可以重复整个组Id列表,从Katya的答案中。
消费者不受主题的约束。他们与消费者群体紧密相连
要获取某个主题的所有使用者,您必须首先列出所有组,然后在每个组中筛选出您的主题
它将以< code > admin client . listconsumergroups 开头,后跟< code > admin client . describeconsumergroups
这将为您提供一个包含“成员”的描述列表,您可以在其中找到主题
https://Kafka . Apache . org/20/javadoc/org/Apache/Kafka/clients/admin/consumergroupdescription . html
注意:有一些外部工具可以使这一过程更加简单,如Hortonworks SMMhttps://docs.hortonworks.com/HDPDocuments/SMM/SMM-1.2.0/monitoring-kafka-clusters/content/smm-monitoring-consumers.html
我最近正在为我的kafka客户端工具解决同样的问题。这并不容易,但我从代码中发现的唯一方法如下:
Properties props = ...//here you put your properties
AdminClient kafkaClient = AdminClient.create(props);
//Here you get all the consumer groups
List<String> groupIds = kafkaClient.listConsumerGroups().all().get().
stream().map(s -> s.groupId()).collect(Collectors.toList());
//Here you get all the descriptions for the groups
Map<String, ConsumerGroupDescription> groups = kafkaClient.
describeConsumerGroups(groupIds).all().get();
for (final String groupId : groupIds) {
ConsumerGroupDescription descr = groups.get(groupId);
//find if any description is connected to the topic with topicName
Optional<TopicPartition> tp = descr.members().stream().
map(s -> s.assignment().topicPartitions()).
flatMap(coll -> coll.stream()).
filter(s -> s.topic().equals(topicName)).findAny();
if (tp.isPresent()) {
//you found the consumer, so collect the group id somewhere
}
}
这个API从2.0版开始提供。可能有更好的方法,但我找不到。你也可以在我的bitbucket上找到代码
我做了一个poc,其中我使用spark流从Kafka读取数据。但我们的组织要么使用ApacheFlink,要么使用Kafka消费者从ApacheKafka读取数据,作为标准流程。所以我需要用Kafka消费者或ApacheFlink替换Kafka流媒体。在我的应用程序用例中,我需要从kafka读取数据,过滤json数据并将字段放入cassandra中,因此建议使用kafka consumer而不是f
我对骆驼生产商有很好的了解,但我不能对各种骆驼消费者保持清醒的头脑。特别是事件驱动消费者和轮询消费者,camel如何知道为这些消费者调用回调? 消费者的一般流量是多少?
我知道什么是生产者和消费者。但官方文件显示 < li >它是流媒体平台。 < li >它是企业消息系统。 < li>Kafka具有从数据库和其他系统导入和导出数据的连接器。 这是什么意思? 我知道生产者是向Kafka Broker发送数据的客户端应用程序,消费者也是从Kafka Broker读取数据的客户端应用程序。 但我的问题是,消费者可以将数据推送到Kafka Broker吗? 据我所知,我认
我已经和ApacheCamel合作了一段时间,做了一些基本的工作,但现在我正在尝试创建一个路由,在该路由中,我可以让多个“消费者”访问同一条路由,或者在路由中添加一个消费者,然后处理消息。 我的想法是拥有一个由事件触发的事件驱动消费者,然后例如从ftp读取文件。我正计划做这样的事情: 所以这个想法是我有一个事件(例如直接或来自消息队列),它具有“fileName”属性,然后使用该属性从ftp下载/
但是,consumer只从主题中第一个未提交的消息开始轮询。我希望总是从偏移量0开始,不管提交的消息是什么。使用Alpakka消费者,如何手动指定偏移量?
以下是《行动中的骆驼》中关于生产者和消费者的定义。 使用者可以从外部服务接收消息,在某些系统上轮询消息,甚至创建消息本身。然后,该消息流经一个处理组件,该组件可以是企业集成模式(EIP)、处理器、拦截器或其他一些自定义创建。消息最终被发送到一个目标endpoint,该endpoint是生产者的角色。路由可能有许多修改消息或将其发送到另一个位置的处理组件,也可能没有,在这种情况下,它将是一个简单的管