当前位置: 首页 > 知识库问答 >
问题:

如何获取阿帕奇Kafka中的所有话题?

顾光明
2023-03-14
    @RequestMapping(value = "/getTopics",method = RequestMethod.GET)
    @ResponseBody
    public Response getAllTopics() {
        ZkClient zkClient = new ZkClient(ZookeeperProps.zookeeperURL, ZookeeperProps.connectionTimeoutMs,
                ZookeeperProps.sessionTimeoutMs, ZKStringSerializer$.MODULE$);
        Seq<String> topics = ZkUtils.getAllTopics(zkClient);
        scala.collection.Iterator<String> topicIterator = topics.iterator();
        String allTopics = "";
        while(topicIterator.hasNext()) {
            allTopics+=topicIterator.next();
            allTopics+="\n";
        }

        Response response = new Response();
        response.setResponseMessage(allTopics);
        return response;

    }

我是apache kafka的新手。现在,有几天想和动物园管理员一起了解Kafka。我想获取与动物园管理员相关的主题。所以我尝试了以下几点
a:)首先我创建了一个动物园管理员客户端,如下所示:

ZkClient(ZookeeperProps.zookeeperURL, ZookeeperProps.connectionTimeoutMs, ZookeeperProps.sessionTimeoutMs, ZKStringSerializer$.MODULE$);
Seq<String> topics = ZkUtils.getAllTopics(zkClient);

但是使用Java代码执行时主题是空白集。我不明白这里有什么问题。我的动物园管理员道具如下:字符串zkConnect="127.0.0.1:2181";动物园管理员运行得非常好。
请帮助伙计们。

共有3个答案

谢锦程
2023-03-14

我更喜欢使用kafka-topics.sh,它是kafka的一个内置shell脚本来获取主题。

宓昂雄
2023-03-14

您可以使用 kafka AdminClient 。下面的代码片段可能会帮助你:

Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

AdminClient adminClient = AdminClient.create(properties);

ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
listTopicsOptions.listInternal(true);

System.out.println(adminClient.listTopics(listTopicsOptions).names().get());
澹台文博
2023-03-14

这很简单。(我的例子是用Java写的,但在Scala中几乎是一样的。)

import java.util.List;

import org.apache.zookeeper.ZooKeeper;

public class KafkaTopicListFetcher {

    public static void main(String[] args) throws Exception {
        ZooKeeper zk = new ZooKeeper("localhost:2181", 10000, null);
        List<String> topics = zk.getChildren("/brokers/topics", false);
        for (String topic : topics) {
            System.out.println(topic);
        }
    }
}

当我有三个主题时的结果:测试、测试2和测试3

test
test2
test3

下图是我为自己的博客帖子画的。当你理解Kafka使用的动物园管理员树的结构时,这将是有帮助的。(这里看起来挺小的。请在新标签中打开图像并放大。)

 类似资料:
  • 我正试图找出这两种设置之间的区别。大小和缓冲区。Kafka制作人的记忆。 据我所知。大小:这是可以发送的批次的最大大小。 文档描述了缓冲区。memory as:生产者可以用来缓冲等待发送的记录的内存字节。 我不明白这两者之间的区别。有人能解释一下吗? 谢啦

  • 我们需要的是直接的API来设置和使用集群消息队列。我们最初的计划是使用Camel在集群JMS或ActiveMQ队列上进行消费/生产。Kafka如何使这项任务变得更容易?在任何一种情况下,应用程序本身都将在WebLogic服务器上运行。 消息传递将是点对点类型,其中有多个相同服务的实例在运行,但根据负载平衡策略,只有一个实例应该处理消息并发出结果。消息队列也是群集的,因此服务实例或队列实例的失败都不

  • Apache Kafka:分布式消息传递系统 Apache Storm:实时消息处理 我们如何在实时数据管道中使用这两种技术来处理事件数据? 在实时数据管道方面,我觉得两者做的工作是一样的。如何在数据管道上同时使用这两种技术?

  • 我正在尝试用我的Storm设置实现最大的性能。我正在通过Kafka发送数以万计的消息,这些消息将被Storm拓扑接收。 当我在Storm UI中查看时,我注意到所有消息都流向一个执行器,而不是在所有执行器之间进行负载平衡。(见附件截图)。

  • 下面是我成功运行两三天后持续获得的异常的详细信息。有人能指导我吗?