当前位置: 首页 > 知识库问答 >
问题:

Kafka主题在spring boot启动后未在远程Kafka上自动创建(并在本地Kafka服务器上创建)

仲柏
2023-03-14

1)我在机器上启动Kafka

2)我用配置启动Spring Boot服务器:

@Bean
public NewTopic MyTopic() {
    return new NewTopic("my-topic", 5, (short) 1);
}

@Bean
public ProducerFactory<String, byte[]> greetingProducerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, byte[]> unpMessageKafkaTemplate() {
    return new KafkaTemplate<>(greetingProducerFactory());
}
12:35:09.880 [                  main] [INFO ]    o.a.k.clients.admin.AdminClientConfig: [] AdminClientConfig values: 
    bootstrap.servers = [localhost:9092]
@Bean
public KafkaAdmin admin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "remote_host:9092");
    return new KafkaAdmin(configs);
}

1)为什么会发生这种情况?

2)我必须创建kafkaadmin吗?为什么不需要本地Kafka?

埃德迪特

spring:
  kafka:
    bootstrap-servers: remote:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringDeserializer
      value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
@Configuration
public class KafkaTopicConfig {

    @Value("${response.topics.topicName}")
    private String topicName;

    @Bean
    public NewTopic responseTopic() {
        return new NewTopic(topicName, 5, (short) 1);
    }

}
bootstrap.servers = [remote:9092]
    client.id = 
    connections.max.idle.ms = 300000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 65536

共有1个答案

西门嘉澍
2023-03-14

KafkaAdmin是kafka spring对象,它在spring上下文中查找NewTopic对象并创建它们。如果你没有一个KafkaAdmin,就不会发生任何创造。您可以显式地创建KafkaAdmin(如代码段所示),也可以通过spring kafka配置属性间接地对其创建进行排序。

KafkaAdmin是一个很好的,它与生产或消费无关,适用于您的应用程序代码的主题。

编辑

spring:
  kafka:
    bootstrap-servers: remote:9092

2019-03-21 09:18:18.354  INFO 58301 --- [           main] o.a.k.clients.admin.AdminClientConfig    
        : AdminClientConfig values: 
    bootstrap.servers = [remote:9092]
    ...
 类似资料:
  • 我在windows中启动Kafka服务器时遇到问题 命令\bin\windows\kafka服务器启动。球棒\配置\服务器。属性 错误消息: 该命令的语法不正确。错误:无法找到或加载主类文件\IBM\WebSphere 知道吗?

  • 在Ansible Playbook中,我在运行start service模块时出现了一个错误,因为我想在远程主机上启动数据库。我对Ansible是新手。 致命:[10.138.12.67]:失败!=>{“changed”:false,“msg”:“找不到请求的服务mysql:host”}

  • 我正在尝试让Debezium的SQLServerConnector在Kafka Connect中工作,除了没有创建主题外,所有的东西似乎都运行得很好。我是不是漏掉了什么? 它将使用正确的配置启动任务... 获取快照设置...... 并且连接器任务正在运行... 用户名和密码在SQL Server上有一个SPID,并运行CDC查询以获取更改,但在Kafka主题中没有显示任何内容。 我有什么不明白的?

  • 在多次尝试并更改bat文件后,我终于成功地在Windows 7中运行了Kafka和Zookeeper。这是一台运行Java 7的32位计算机。但不幸的是,我无法创建主题。我尝试在这里使用教程:http://janschulte.wordpress.com/2013/10/13/apache-kafka-0-8-on-windows/ 我执行了命令: C:\Cambria\kafka_2.8.0-0

  • 我们有一个kafka集群,正在根据这些标准锁定特定节点:https://docs.hortonworks.com/HDPDocuments/HDP3/HDP-3.1.0/zookeeper-acls/content/zookeeper_acls_best_practices_kafka.html 一旦我们用SASL:kafka:cdrwa锁定/broker/topics节点,就无法使用kafka主

  • 使用命令时: /usr/local/kafka/bin/kafka-topics.sh--创建--zookeeper localhost:2181--复制-因子1--分区1--主题测试 提前道谢。