当前位置: 首页 > 知识库问答 >
问题:

通过Spring Kafka列出Kafka主题

柳均
2023-03-14

我们想通过spring-kafka列出所有Kafka主题,以获得类似于kafka命令的结果:

bin/kafka-topics.sh --list --zookeeper localhost:2181

在下面的服务中运行 getTopics() 方法时,我们会得到

配置:

@EnableKafka
@Configuration
public class KafkaConfig {
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2181");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
            StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
            StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
}

服务:

@Service
public class TopicServiceKafkaImpl implements TopicService {
    @Autowired
    private ConsumerFactory<String, String> consumerFactory;

    @Override
    public Set<String> getTopics() {
        try (Consumer<String, String> consumer = 
            consumerFactory.createConsumer()) {
            Map<String, List<PartitionInfo>> map = consumer.listTopics();
            return map.keySet();
    }
}

Kafka已经启动并运行,我们可以成功地从应用程序向主题发送消息。

共有3个答案

许法
2023-03-14

kafka-tops--list是一个外壳脚本,它只是kafka.admin.TopicCommand类的包装器,您可以在其中找到您正在寻找的方法

或者,您也可以使用AdminClient#listTopics方法

左丘烨烁
2023-03-14

您正在连接到动物园管理员 (2181) 而不是 Kafka(默认为 9092)。

爪哇Kafka客户端不再直接与 ZK 通信。

栾和风
2023-03-14

您可以使用管理客户端列出此类主题

    Properties properties = new Properties();
    properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    AdminClient adminClient = AdminClient.create(properties);

    ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
    listTopicsOptions.listInternal(true);

    System.out.println("topics:" + adminClient.listTopics(listTopicsOptions).names().get());
 类似资料:
  • 我正在运行ansible在一台服务器上安装Kafka。(在Dev env上,独立设置)但我的剧本在列出Kafka主题的任务列表时失败了。下面是日志。 Journal alctl-fu动物园管理员 /opt/kafka/kafka\u 2.12-2.2.2/config/zookeeper。属性 动物园管理员。服务

  • 在我们的docker-swarm中运行kafka connect,使用以下撰写文件: kafka connect节点成功启动,我可以设置任务并查看这些任务的状态······ 我是否在撰写文件或任务配置中缺少某些配置?

  • 我已经在集群中配置了3个kafka,我正在尝试与sping-kafka一起使用。 但是在我杀死kafka领导者后,我无法发送其他消息到队列。 我将Spring.kafka.bootstrap-servers属性设置为:“kafka-1:9092;kafka-2:9093,kafka-3:9094”以及我的主机文件中的所有名称。 Kafka0.10版 有人知道如何正确配置? 编辑 我测试过一个东西,

  • 我正在尽可能地简化我的消费者。问题是,当我看到Kafka监听器中的记录时: <代码>列表 我在使用时注意到: SpringKafka。消费者值反序列化器=io。汇合的。Kafka。序列化程序。KafkaavroderializerSpring。Kafka。消费者键反序列化器=组织。阿帕奇。Kafka。常见的序列化。字符串反序列化器 我得到以下错误: 2020-12-02 17:04:42.745调

  • 问题内容: 我想通过Java在Kafka(kafka_2.8.0-0.8.1.1)中创建一个主题。如果我在命令提示符下创建一个主题,并且如果我通过java api推送消息,它也可以正常工作。但是我想通过java api创建一个主题。经过长时间的搜索,我发现了以下代码, 我尝试了上面的代码,它表明创建了主题,但是无法在该主题中推送消息。我的代码有什么问题吗?还是通过其他方式实现以上目标? 问题答案:

  • 我想问你们一些关于阿帕奇·Kafka和压缩主题的问题。我们想提供一些Kafka压缩主题的PII数据。我们想通过墓碑删除这个主题的数据。目前有多个问题需要验证我们的假设: 有没有其他公司像KIP-354那样通过压缩主题和墓碑生成来满足Kafka的gdpr要求(忘记的权利)https://cwiki.apache.org/confluence/display/KAFKA/KIP-354:添加最大日志压