出身背景
我们公司有由动物园管理员管理的阿帕奇·Kafka。我们的一个Spring Boot应用程序需要检查所有可用主题的列表,并列出哪些主题启用了日志压缩(cleanup.policy=compact)。
当前代码
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerList);
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerGroup);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props);
}
...
...
public List<String> getTopics() {
Map<String, List<PartitionInfo>> topics = consumerFactory().createConsumer().listTopics();
List<String> topicList = new ArrayList<>();
topics.keySet().remove(CONSUMER_OFFSETS);
topicList.addAll(topics.keySet());
return topicList;
}
问题
使用上述代码,应用程序可以获得主题列表。有没有办法也知道单个主题是否被日志压缩?我所寻找的是某种“Java”方式,以获得与从终端运行以下Apache Kafka CLI命令时相同的响应。
kafka-topics --zookeeper localhost:2181 --describe --topic TestTopicCompact
这是一个示例响应
Topic:TestTopicCompact PartitionCount:1 ReplicationFactor:1 Configs:cleanup.policy=compact
Topic: TestTopicCompact Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
首先需要创建一个带有属性文件的kafka管理客户端。
import org.apache.kafka.clients.admin.AdminClient;
import java.util.*;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.common.config.ConfigResource;
admin = AdminClient.create(properties);
然后需要使用管理员。describeConfigs方法,此方法将ConfigResources的集合作为参数,定义如下。
Collection<ConfigResource> cr = Collections.singleton( new ConfigResource(ConfigResource.Type.TOPIC, "<Your Topic Name>")
DescribeConfigsResult ConfigsResult = admin.describeConfigs(cr));
使用ConfigsResult,您需要评估和解析此数据类型,在这里我将其硬转换为配置数据类型。
Config all_configs = (Config)ConfigsResult.all().get().values().toArray()[0];
然后需要遍历包含许多ConfigResource对象的all\u configs数据类型。下面的代码构建了一个迭代器,遍历并查找用于清理的配置策略的配置值,您可以使用相同的逻辑来查找其他字段的配置值。
Iterator ConfigIterator = all_configs.entries().iterator();
while (ConfigIterator.hasNext())
{
ConfigEntry currentConfig = (ConfigEntry) ConfigIterator.next();
if (currentConfig.name().equals("cleanup.policy")) {
System.out.println(currentConfig.value());
}
}
您应该使用AdminClient API来检索该信息。
这基本上就是Kafka主题工具所做的,因此您可以查看其源代码Kafka。管理TopicCommand。尽管它是Scala,但概念是相似的。
我在一个输入主题上构建KTable,并且在两个Kafka Stream应用程序实例上加入KStream。 KTable的输入主题已经是一个日志压缩主题。因此,当我的一个应用程序实例关闭时,通过读取input log compacted主题,另一个实例状态存储似乎会用整个状态刷新。 所以不需要为我的KTable存储启用日志记录(更改日志)? 我的源输入日志压缩主题可能有数百万条记录,所以如果我在KT
我在《掌握Kafka Streams and ksqlDB》一书中遇到了以下两个短语,作者使用了两个术语,它们的真正含义是“压缩主题”和“未压缩主题” 他们对“日志压缩”有什么看法吗? 表可以被认为是对数据库的更新。在日志的这种视图中,只保留每个键的当前状态(给定键的最新记录或某种聚合)。表通常是从压缩的主题构建的。 用数据库的说法,流可以被视为插入。每个不同的记录都保留在此日志视图中。流通常是从
问题内容: 我想知道是否有一种方法可以测试您是否已订阅Android方面的主题。 基本上,我 希望 所有的设备将其安装在订阅一个主题,当第一次由设备获取的令牌。但是,设备总是有可能无法订阅。FCM注册令牌应该在设备上安装很长时间,因此,在不清除数据,卸载/重新安装等情况下,不应再次调用onTokenRefresh()方法。 我的想法是测试以查看设备是否已订阅MainActivity中的主题,如果没
一个与主题压缩有关的问题。在压缩主题中,当日志清理器在清理特定键的以前偏移量(3,4,5)时出现延迟(假设5是最新的偏移量),而作为使用者使用这些偏移量时,即使3和4还没有压缩,我会只看到该键的最新偏移量(5)吗?还是使用者将按照该顺序获得(3,4,5)?
我有一个带有Kafka使用者的spring应用程序,它使用@KafKalisterner注释。正在使用的主题是日志压缩的,我们可能会遇到必须再次使用主题消息的情况。以编程方式实现这一目标的最佳方法是什么?我们不控制Kafka主题配置。
我收到了一个数据库更改流,这些更改最终形成了一个压缩的主题。流基本上是键/值对,并且键空间很大(~4 GB)。 这个主题由一个kafka流进程使用,该进程将数据存储在RockDB中(每个消费者/碎片单独使用)。处理器做两件不同的事情: 将数据连接到另一个流中。 检查来自主题的邮件是新密钥还是对现有密钥的更新。如果是更新,则将旧的键/值和新的键/值对发送到不同的主题(更新很少)。 null