当前位置: 首页 > 知识库问答 >
问题:

无法使用kafka管理客户端API创建具有所需分区的kafka主题

戎元忠
2023-03-14

注意:主题创建是在代理级别启用的。另外,这个主题正在被创建,但是它是用分区1创建的。

NewTopic newTopic = new NewTopic(TOPIC_NAME, 10, (short) 1);
        CreateTopicsResult createTopicsResult = null;
        try {
            createTopicsResult = KafkaAdminClient.create(getAdminProperties()).createTopics(Collections.singletonList(newTopic));
        } catch (ClassNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

但是,我可以使用Kafka管理客户端API的

共有1个答案

段干瑞
2023-03-14

我试图使用以下代码复制该代码,但没有成功:

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;

import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class AdminApiDemo {

    private static final String BOOTSRAP_SERVER = "localhost:9092";
    private static final String TOPIC_NAME = "demoTopic";
    private static final int NUM_PARTITIONS = 3;
    private static final short NUM_REPLICAS = 1;

    private final AdminClient adminClient;

    private AdminApiDemo(Properties properties) {
        this.adminClient = KafkaAdminClient.create(properties);
    }

    public static void main(String[] args) {
        final Properties properties = new Properties();
        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSRAP_SERVER);

        new AdminApiDemo(properties).createTopic(TOPIC_NAME, NUM_PARTITIONS, NUM_REPLICAS);
    }

    private void createTopic(String topicName, int numPartitions, short numReplicas) {
        try {
            final NewTopic newTopic = new NewTopic(topicName, numPartitions, numReplicas);
            final CreateTopicsResult result = adminClient.createTopics(Collections.singletonList(newTopic));
            result.values().get(topicName).get(5, TimeUnit.SECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

Kafka-topics-description显示了以下内容:

root@kafka:/# kafka-topics --bootstrap-server localhost:9092 --describe --topic demoTopic
Topic:demoTopic PartitionCount:3    ReplicationFactor:1 Configs:
    Topic: demoTopic    Partition: 0    Leader: 1   Replicas: 1 Isr: 1
    Topic: demoTopic    Partition: 1    Leader: 1   Replicas: 1 Isr: 1
    Topic: demoTopic    Partition: 2    Leader: 1   Replicas: 1 Isr: 1

我想,好吧,如果主题可能在创建之前就已经存在怎么办,但是我又得到了一个java.util.concurrent.executionexception:org.apache.kafka.common.errors.topicexistsexception:主题'demo topic'已经存在。,所以这也不可能是您的情况。

 类似资料:
  • 我是Apache Kafka的一个新用户,我还在了解它的内部结构。 在我的用例中,我需要从Kafka Producer客户端动态增加一个主题的分区数量。 我发现了关于增加分区大小的其他类似问题,但它们使用了zookeeper配置。但是我的kafkaProducer只有Kafka broker配置,而没有zookeeper配置。 有没有什么方法我可以增加一个主题的分区数量从生产者端?我运行的是Kaf

  • null camel-kafka中是否有任何配置,我们可以使用它来增加kafka主题分区计数?

  • 我发现maven repo中有几个Kafka。 阿帕奇的maven回购协议中有两个Kafka。https://mvnrepository.com/artifact/org.apache.kafka/kafka https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients 它们都可以从kafka服务器生成Mesg并消耗msg。 我

  • 我们最近对我们的Kafka集群进行了Kerbertic化,我们开始在阅读来自代理上的主题的消息时遇到问题。 我们使用的是spring kafka 1.1.2版本和kafka client 0.10.0.1。 在研究了Apache Kafka文档中的建议之后,我在项目中做了以下更改。 在使用者属性中添加了security.protocol SASL_PLAINTEXT。 添加了适当的JAAS文件,并

  • 我使用的是kafka-clients-0.10.1.1(单节点单代理) auto.create.topics.enable的默认值为true。 1.我正在使用以下方式向主题发送消息: 用于消费:

  • 我刚开始接触Kafka。我已经经历了这一切。它只表示kafka流DSL的数据/主题管理。任何人都可以共享Kafka流处理器API的相同数据管理的任何链接吗?我对处理器API的用户和内部主题管理特别感兴趣。 在流处理器开始使用输入数据之前,从哪里用输入数据填充此源主题? 简而言之,我们可以像制片人写主题一样,使用流来写Kafka的“源”主题吗?或者流仅用于主题的并行消费?我相信我们应该像“Kafka