当前位置: 首页 > 面试题库 >

如何通过Java在Kafka中创建主题

高弘光
2023-03-14
问题内容

我想通过Java在Kafka(kafka_2.8.0-0.8.1.1)中创建一个主题。如果我在命令提示符下创建一个主题,并且如果我通过java
api推送消息,它也可以正常工作。但是我想通过java api创建一个主题。经过长时间的搜索,我发现了以下代码,

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

我尝试了上面的代码,它表明创建了主题,但是无法在该主题中推送消息。我的代码有什么问题吗?还是通过其他方式实现以上目标?


问题答案:

编辑 -较新版本的Kafka不需要Zookeeper。请参阅@Neeleshkumar Srinivasan Mannur的API版本1.01.0以上的答案

API 0.11.0+中的过程似乎已大大简化。使用它,可以完成以下操作

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;

Properties properties = new Properties();
properties.load(new FileReader(new File("kafka.properties")));

AdminClient adminClient = AdminClient.create(properties);
NewTopic newTopic = new NewTopic("topicName", 1, (short)1); //new NewTopic(topicName, numPartitions, replicationFactor)

List<NewTopic> newTopics = new ArrayList<NewTopic>();
newTopics.add(newTopic);

adminClient.createTopics(newTopics);
adminClient.close();

kafka.properties文件的内容如下

bootstrap.servers=localhost:9092
group.id=test
enable.auto.commit=true
auto.commit.interval.ms=1000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

请注意,必须关闭AdminClient的实例才能反映新创建的主题。

原始答案

我修好了..经过长时间的研究..

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

通过上面的代码,ZkClient将创建一个主题,但是该主题信息将不了解kafka。所以我们要做的是,我们需要通过以下方式为ZkClient创建对象,

首先导入以下语句,

import kafka.utils.ZKStringSerializer$;

并通过以下方式为ZkClient创建对象,

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000, ZKStringSerializer$.MODULE$);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

编辑1:(对于@ajkret评论)

由于api已更改,因此上述代码不适用于kafka> 0.9,请使用以下代码适用于kafka> 0.9

import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

public class KafkaTopicCreationInJava
{
    public static void main(String[] args) throws Exception {
        ZkClient zkClient = null;
        ZkUtils zkUtils = null;
        try {
            String zookeeperHosts = "192.168.20.1:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
            int sessionTimeOutInMs = 15 * 1000; // 15 secs
            int connectionTimeOutInMs = 10 * 1000; // 10 secs

            zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
            zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);

            String topicName = "testTopic";
            int noOfPartitions = 2;
            int noOfReplication = 3;
            Properties topicConfiguration = new Properties();

            AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration);

        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            if (zkClient != null) {
                zkClient.close();
            }
        }
    }
}


 类似资料:
  • 问题内容: 在搜索如何通过API创建Kafka主题时,我在Scala中找到了以下示例: 最后一个arg 显然是Scala对象。我不清楚如何使该示例在Java中工作。 这篇文章如何在Clojure中创建Scala对象的问题在Clojure中提出了相同的问题,答案是: 我认为Java中的翻译成: 但是,当我尝试使用该方法(或其他任何数量的变体)时,它们都无法编译。 编译错误是: 我正在使用kafka_

  • 我正在尝试Firebase通知API。当我从控制台向应用程序发送下游消息时,该服务工作得很好,但如何向主题注册用户发送消息呢? 我在Android端做的 但当我尝试从控制台向主题发送下游消息时,它说 编辑:我发现在映射主题后,需要1天才能在Firebase控制台中显示出来

  • 我想获取一个接收器连接器的sink-record-active-count值,这里给出https://docs.confluent.io/current/connect/managing/monitoring.html#sink-task-metrics 我有所有容器运行在docker桌面使用docker compose文件docker ps docker ps 我使用了汇合度量报告器来完成该任务

  • 问题内容: 我需要在创建过程中配置特定主题的保留策略。我试图寻找解决方案,我只能找到如下所示的命令级别alter命令 ./bin/kafka-topics.sh –zookeeper本地主机:2181-更改–topic我的主题–config保留.ms = 1680000 有人可以让我知道一种在创建过程中进行配置的方法,例如spring-mvc中的xml或属性文件配置。 问题答案: Spring K

  • 问题内容: 反射用于加载Java类类并即时对其进行操作。但是我遇到了一个怪异的问题,问我如何通过Reflection快速创建Java类,这意味着这些类在我们要创建之前就没有编译或没有源代码。真的有可能吗?有什么例子吗? 问题答案: 您可以尝试ASM ASM 或字节码工程库 字节码工程库 用于在运行时创建类 在.NET中,我们具有Reflection.Emit(C#)可以执行该 Reflection

  • 使用命令时: /usr/local/kafka/bin/kafka-topics.sh--创建--zookeeper localhost:2181--复制-因子1--分区1--主题测试 提前道谢。