当前位置: 首页 > 知识库问答 >
问题:

AdminUtils.createTopicAPI可以连接到多个zookeeper节点吗?

秦雅逸
2023-03-14

在尝试使用下面提到的 createTopics 函数创建主题时,我在多节点 kafka 集群中遇到了“复制因子:1 大于可用代理:多节点集群上的 0”。我有 3 个 kafka 代理,我尝试为每个主题创建 2 个分区,并将复制因子保持在 1。不知道为什么得到这个错误。在单节点设置(1个动物园管理员和1个kafka经纪人)中,这同样可以很好地工作。

非常感谢任何帮助/

错误:

kafka.admin.AdminOperationException: replication factor: 1 larger than available brokers: 0
at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:117)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:403)
at kafka.admin.AdminUtils.createTopic(AdminUtils.scala)
at io.confluent.examples.producer.ZookeeperUtil.createTopics(ZookeeperUtil.java:98)
at io.confluent.examples.producer.ProducerGroup.<init>(ProducerGroup.java:50)
at io.confluent.examples.producer.ProducerGroup.main(ProducerGroup.java:124)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:297)
at java.lang.Thread.run(Thread.java:745)






private static final int DEFAULT_SESSION_TIMEOUT = 10 * 1000;
private static final int DEFAULT_CONNECTION_TIMEOUT = 8 * 1000;
private static final String ZOOKEEPER_CONNECT = "zNode01:2181,zNode02:2181,zNode03:2181";

/**
 * Opens a new ZooKeeper client to access the Kafka broker.
 */
private static ZkClient connectToZookeeper ()
{
    return new ZkClient(ZOOKEEPER_CONNECT,
                        DEFAULT_SESSION_TIMEOUT,
                        DEFAULT_CONNECTION_TIMEOUT,
                        ZKStringSerializer$.MODULE$);
}

/**
 * Given a ZooKeeper client instance, accesses the broker and returns
 * information about Kafka's contents.
 *
 * @param zookeeperClient A ZooKeeper client to access broker information
 *                        through.
 */
private static ZkUtils zookeeperUtility (ZkClient zookeeperClient)
{
    boolean isSecureCluster = false;
    return new ZkUtils(zookeeperClient, 
                       new ZkConnection(ZOOKEEPER_CONNECT),
                       isSecureCluster);
}


public static void createTopics (ArrayList<String> names, int partitions, int replication)
{
    ZkClient zkClient = connectToZookeeper();
    ZkUtils zkUtils = zookeeperUtility(zkClient);

try{
    for (String name: names)
    {
        if (existsTopic(name))
            continue;

        AdminUtils.createTopic(zkUtils, name, partitions, replication, new Properties(),RackAwareMode.Disabled$.MODULE$);

    }

} catch (Exception ex) {
    ex.printStackTrace();
} finally {
    if (zkClient != null) {
        zkClient.close();
    }
}
}

我按照以下说明设置了一个多节点kafka集群。

设置多节点阿帕奇动物园守护者集群

在集群的每个节点上,将以下行添加到文件kafka/config/zookeeper.properties中

    server.1=zNode01:2888:3888
    server.2=zNode02:2888:3888
    server.3=zNode03:2888:3888
    #add here more servers if you want
    initLimit=5
    syncLimit=2

在群集的每个节点上,在由 dataDir 属性表示的文件夹中创建一个名为 myid 的文件(默认情况下,该文件夹为 /tmp/动物园管理员 )。myid 文件应仅包含节点的 id(“1” 表示 zNode01,“2” 表示 ZNode02,依此类推)

设置多代理Apache Kafka集群

在群集的每个节点上,修改属性 zookeeper.从文件 kafka/config/server.属性中连接:

    zookeeper.connect=zNode01:2181,zNode02:2181,zNode03:2181

在群集的每个节点上,修改 host.name 从文件Kafka/配置/服务器属性的属性:主机名称=zNode0x

在集群的每个节点上,修改kafka/config/server.properties文件中的属性broker.id(集群中的每个代理都应该有一个唯一的id)

共有1个答案

孙元明
2023-03-14

错误是您的代理没有运行,而不是您不能为客户使用多个Zookeeper地址(是的,您可以)

在较新版本的Kafka中,您可以将AdminClient引导一起使用。服务器,而不是动物园管理员。连接

 类似资料:
  • 我按照以下说明设置了一个多节点kafka集群。现在,如何连接到动物园管理员?在JAVA中,只连接一个来自生产者/消费者端的动物园管理员可以吗?或者有办法连接所有的动物园管理员节点吗? 设置多节点阿帕奇动物园守护者集群 在集群的每个节点上,将以下行添加到文件kafka/config/zookeeper.properties中 在群集的每个节点上,在由 dataDir 属性表示的文件夹中创建一个名为

  • EasyReact 的重点就是让节点之间的数据流动起来,所以连接节点是很重要的。 如何连接两个节点 两个节点是通过变换来连接的,在源码目录 EasyReact/Classes/Core/NodeTransforms 中我们默认实现了了很多的变换,你也可以通过继承 EZRTransform 类来实现自己的变换,一旦我们创建好一个变换后,就可以通过如下方式进行连接了: EZRMutableNode<N

  • 我在Zookeeper和Kafka(各1个实例)中运行我的locahost。 我成功地从Kafka创建了一个主题: Kafka日志显示: 但Zookeeper的日志显示: 如果我尝试生成消息: server.properties(在Kafka中)是: 看来动物园管理员没有注册任何经纪人。 有什么建议吗?

  • 我正在用Spring编写一个服务,并使用Spring AMQP连接到Rabbitmq。 我有两个rabbitmq集群,一个仅用于发布消息(消息通过联合插件发送到另一个集群),另一个集群用于声明最终用户将从中使用的队列。 节点位于aws lb后面,每个集群有一个lb。 我在代码中使用CachingConnectionFactory和RabbitTemboard、RabbitAdmin,我希望与所有节

  • 我正在使在身份验证期间连接到Zookeeper(与运行它相同)并签入节点,如果此用户提供了正确的密码。 基本上,流程看起来像这样: 问题是,当策展人获取数据时,我总是在线获取。这种行为的原因是什么?在身份验证期间,我不被允许连接到授权客户端的动物园管理员? 低于满堆轨道:

  • 问题内容: 我们正在使用AWS(EC2)上的简单Hello World节点服务器对节点性能进行基准测试。 无论我们使用什么大小的实例,Node总是在最大1000个并发连接上出现(这不是每秒1000个,但是它可以在1次处理1000个)。此后不久,CPU出现峰值,节点基本冻结。 节点v0.10.5 节点应该能够处理比此更正确的吗?任何想法将不胜感激。 还将文件描述符(软,硬,系统)设置为65096)