我需要在创建过程中配置特定主题的保留策略。我试图寻找解决方案,我只能找到如下所示的命令级别alter命令
./bin/kafka-topics.sh –zookeeper本地主机:2181-更改–topic我的主题–config保留.ms =
1680000
有人可以让我知道一种在创建过程中进行配置的方法,例如spring-mvc中的xml或属性文件配置。
Spring
Kafka允许您通过@Bean
在应用程序上下文中声明s来创建新主题。这将需要KafkaAdmin
在应用程序上下文中具有类型的Bean,如果使用Spring
Boot,则会自动创建该类型的Bean 。您可以如下定义主题:
@Bean
public NewTopic myTopic() {
return TopicBuilder.name("my-topic")
.partitions(4)
.replicas(3)
.config(TopicConfig.RETENTION_MS_CONFIG, "1680000")
.build();
}
如果您不使用Spring Boot,则还必须定义KafkaAdmin
bean:
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
return new KafkaAdmin(configs);
}
如果您要编辑现有主题的配置,则必须使用AdminClient
,这是retention.ms
在主题级别更改的代码段:
Map<String, Object> config = new HashMap<>();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
AdminClient client = AdminClient.create(config);
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "new-topic");
// Update the retention.ms value
ConfigEntry retentionEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "1680000");
Map<ConfigResource, Config> updateConfig = new HashMap<ConfigResource, Config>();
updateConfig.put(resource, new Config(Collections.singleton(retentionEntry)));
AlterConfigsResult alterConfigsResult = client.alterConfigs(updateConfig);
alterConfigsResult.all();
在创建期间配置所有主题的保留策略 正在尝试配置租用。ms使用spring,因为我得到一个错误: 原因:java.util.concurrent。ExecutionException:org.apache.kafka.common.errors。PolicyViolationException:保留无效。ms指定。允许的范围是[3600000..2592000000],根据我读取的值,新值是-1(无
问题内容: 我想通过Java在Kafka(kafka_2.8.0-0.8.1.1)中创建一个主题。如果我在命令提示符下创建一个主题,并且如果我通过java api推送消息,它也可以正常工作。但是我想通过java api创建一个主题。经过长时间的搜索,我发现了以下代码, 我尝试了上面的代码,它表明创建了主题,但是无法在该主题中推送消息。我的代码有什么问题吗?还是通过其他方式实现以上目标? 问题答案:
我正在实验Kafka流,我有以下设置: null 有什么方法可以让我的KTable从我的主题中“继承”保留策略吗?这样当记录从主主题过期时,它们在KTable中就不再可用了? 我担心将所有记录转储到KTable中,并使StateStore无限增长。 我能想到的一个解决方案是转换成一个窗口流,其跳跃窗口等于记录的TimeToLive,但我想知道是否有更好的解决方案,以更原生的方式。 多谢了。
我有一个配置bean 然而,当我检查时 似乎没有设置我的任何设置。 我假设我可能在放入配置之前无意中创建了主题。我的问题是如何让我的应用程序检测到NewTopic的值与当前的配置匹配,否则会终止应用程序上下文。 或者强制更新。
假设我有一个多代理(运行在同一主机上)的Kafka设置,其中有3个代理和50个主题,每个主题配置为有7个分区和3个复制因子。 我有50GB的内存要用于kafka,并确保kafka日志永远不会超过这个内存数量,因此我想配置我的保留策略以防止这种情况。 我已设置删除清理策略: 我应该如何配置上述参数,以便每7天删除一次数据,并确保如果需要,可以在较短的窗口中删除数据,这样我就不会耗尽内存?
有没有办法允许(Spring Cloud Stream)应用程序在融合云中自动创建所需的主题? 到目前为止,我不得不手动创建它们,当您考虑还必须设置变更日志主题时,这很容易出错。