我有一个配置bean
@Bean("alertsTopic")
public NewTopic alertsTopic() {
return TopicBuilder.name(PORTAL_ALERTS)
.config(TopicConfig.DELETE_RETENTION_MS_CONFIG, String.valueOf(Duration.ofHours(1).getSeconds() * 1000))
.compact()
.build();
}
然而,当我检查时
./kafka-configs.sh --bootstrap-server kafka:9092 --describe --topic portal.alerts --all
似乎没有设置我的任何设置。
我假设我可能在放入配置之前无意中创建了主题。我的问题是如何让我的应用程序检测到NewTopic的值与当前的配置匹配,否则会终止应用程序上下文。
或者强制更新。
如果主题已经存在,则KafkaAdmin将进行的唯一修改是增加分区的数量。
对于其他更改,您必须自己使用AdminClient。
制作者或管理员创建了Kafka主题后,如何更改该主题的副本数?
我们执行以下步骤以删除主题-hgpo.llo.prmt.processed 但即使在12小时后,主题文件夹仍未从/var/kafka/kafka-logs中删除 注意-我们set-delete.topic.enable=true 在/var/kafka/kafka-logs下,我们有许多主题文件夹,如: ..
在kafka消费者留档https://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html中,它指出需要小心,以确保每隔一段时间调用投票,否则经纪人将假设消费者已经死亡。 最可靠的程序非常复杂: 对于消息处理时间变化不可预测的用例,这两个选项都可能不够。处理这些情况的推
我正在使用3代理Kafka。如果我停止第三个节点:没问题,我可以从节点1或2消耗日志。如果我停止第二个节点:没问题,我可以从节点1或3消耗日志。如果我停止第一个节点:我不能消耗任何东西。 当我重新启动kafka runner服务时。日志在这里,写得很好。 我的主题配置是:主题:log_topic主题ID:xxx分区计数:32复制因子:3配置:segment.bytes=1073741824,ret
我有一个关于主题偏移在Kafka中是如何工作的问题,它们是否存储在Kafka中的B树状结构? 我要求它的具体原因,让我们说我有一个主题,在主题中有1000万条记录,这将意味着1000万偏移,如果没有发生压缩或关闭,现在如果我使用consumer.seek(5000000),它将像LinkList的意思是,它将转到0偏移量,并尝试从那里跳到第500000个偏移量,或者它确实有类似索引的结构,可以准确
问题内容: 我需要在创建过程中配置特定主题的保留策略。我试图寻找解决方案,我只能找到如下所示的命令级别alter命令 ./bin/kafka-topics.sh –zookeeper本地主机:2181-更改–topic我的主题–config保留.ms = 1680000 有人可以让我知道一种在创建过程中进行配置的方法,例如spring-mvc中的xml或属性文件配置。 问题答案: Spring K