当前位置: 首页 > 知识库问答 >
问题:

Apache Kafka客户端(Java):列出主题并检查主题是否被日志压缩

巫朝明
2023-03-14

出身背景

我们公司有由动物园管理员管理的阿帕奇·Kafka。我们的一个Spring Boot应用程序需要检查所有可用主题的列表,并列出哪些主题启用了日志压缩(cleanup.policy=compact)。

当前代码


    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerList);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerGroup);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return new DefaultKafkaConsumerFactory<>(props);
    }

...
...

    public List<String> getTopics() {
        Map<String, List<PartitionInfo>> topics = consumerFactory().createConsumer().listTopics();
        List<String> topicList = new ArrayList<>();
        topics.keySet().remove(CONSUMER_OFFSETS);
        topicList.addAll(topics.keySet());
        return topicList;
    }

问题

使用上述代码,应用程序可以获得主题列表。有没有办法也知道单个主题是否被日志压缩?我所寻找的是某种“Java”方式,以获得与从终端运行以下Apache Kafka CLI命令时相同的响应。

kafka-topics --zookeeper localhost:2181 --describe --topic TestTopicCompact

这是一个示例响应

Topic:TestTopicCompact  PartitionCount:1    ReplicationFactor:1 Configs:cleanup.policy=compact
    Topic: TestTopicCompact Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001

共有2个答案

卫琛
2023-03-14

首先需要创建一个带有属性文件的kafka管理客户端。

import org.apache.kafka.clients.admin.AdminClient;
import java.util.*;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.common.config.ConfigResource;

admin = AdminClient.create(properties);

然后需要使用管理员。describeConfigs方法,此方法将ConfigResources的集合作为参数,定义如下。

Collection<ConfigResource> cr =  Collections.singleton( new ConfigResource(ConfigResource.Type.TOPIC, "<Your Topic Name>") 

DescribeConfigsResult ConfigsResult = admin.describeConfigs(cr));

使用ConfigsResult,您需要评估和解析此数据类型,在这里我将其硬转换为配置数据类型。

Config all_configs = (Config)ConfigsResult.all().get().values().toArray()[0];

然后需要遍历包含许多ConfigResource对象的all\u configs数据类型。下面的代码构建了一个迭代器,遍历并查找用于清理的配置策略的配置值,您可以使用相同的逻辑来查找其他字段的配置值。

    Iterator ConfigIterator = all_configs.entries().iterator();

    while (ConfigIterator.hasNext()) 
    {
          ConfigEntry currentConfig = (ConfigEntry) ConfigIterator.next();
          if (currentConfig.name().equals("cleanup.policy")) {
                System.out.println(currentConfig.value());
          }
        }
养昊天
2023-03-14

您应该使用AdminClient API来检索该信息。

  • 首先使用listTopics()检索主题列表
  • 然后使用describeConfigs()获取每个主题的配置
  • 最后,从您将获得的ConfigEntry对象中,您可以过滤具有压缩的主题作为清理。政策

这基本上就是Kafka主题工具所做的,因此您可以查看其源代码Kafka。管理TopicCommand。尽管它是Scala,但概念是相似的。

 类似资料:
  • 我在一个输入主题上构建KTable,并且在两个Kafka Stream应用程序实例上加入KStream。 KTable的输入主题已经是一个日志压缩主题。因此,当我的一个应用程序实例关闭时,通过读取input log compacted主题,另一个实例状态存储似乎会用整个状态刷新。 所以不需要为我的KTable存储启用日志记录(更改日志)? 我的源输入日志压缩主题可能有数百万条记录,所以如果我在KT

  • 我在《掌握Kafka Streams and ksqlDB》一书中遇到了以下两个短语,作者使用了两个术语,它们的真正含义是“压缩主题”和“未压缩主题” 他们对“日志压缩”有什么看法吗? 表可以被认为是对数据库的更新。在日志的这种视图中,只保留每个键的当前状态(给定键的最新记录或某种聚合)。表通常是从压缩的主题构建的。 用数据库的说法,流可以被视为插入。每个不同的记录都保留在此日志视图中。流通常是从

  • 问题内容: 我想知道是否有一种方法可以测试您是否已订阅Android方面的主题。 基本上,我 希望 所有的设备将其安装在订阅一个主题,当第一次由设备获取的令牌。但是,设备总是有可能无法订阅。FCM注册令牌应该在设备上安装很长时间,因此,在不清除数据,卸载/重新安装等情况下,不应再次调用onTokenRefresh()方法。 我的想法是测试以查看设备是否已订阅MainActivity中的主题,如果没

  • 一个与主题压缩有关的问题。在压缩主题中,当日志清理器在清理特定键的以前偏移量(3,4,5)时出现延迟(假设5是最新的偏移量),而作为使用者使用这些偏移量时,即使3和4还没有压缩,我会只看到该键的最新偏移量(5)吗?还是使用者将按照该顺序获得(3,4,5)?

  • 我有一个带有Kafka使用者的spring应用程序,它使用@KafKalisterner注释。正在使用的主题是日志压缩的,我们可能会遇到必须再次使用主题消息的情况。以编程方式实现这一目标的最佳方法是什么?我们不控制Kafka主题配置。

  • 我收到了一个数据库更改流,这些更改最终形成了一个压缩的主题。流基本上是键/值对,并且键空间很大(~4 GB)。 这个主题由一个kafka流进程使用,该进程将数据存储在RockDB中(每个消费者/碎片单独使用)。处理器做两件不同的事情: 将数据连接到另一个流中。 检查来自主题的邮件是新密钥还是对现有密钥的更新。如果是更新,则将旧的键/值和新的键/值对发送到不同的主题(更新很少)。 null