当前位置: 首页 > 知识库问答 >
问题:

连接到Apache Kafka多节点集群中的Zookeeper

金承嗣
2023-03-14

我按照以下说明设置了一个多节点kafka集群。现在,如何连接到动物园管理员?在JAVA中,只连接一个来自生产者/消费者端的动物园管理员可以吗?或者有办法连接所有的动物园管理员节点吗?

设置多节点阿帕奇动物园守护者集群

在集群的每个节点上,将以下行添加到文件kafka/config/zookeeper.properties中

    server.1=zNode01:2888:3888
    server.2=zNode02:2888:3888
    server.3=zNode03:2888:3888
    #add here more servers if you want
    initLimit=5
    syncLimit=2

在群集的每个节点上,在由 dataDir 属性表示的文件夹中创建一个名为 myid 的文件(默认情况下,该文件夹为 /tmp/动物园管理员 )。myid 文件应仅包含节点的 id(“1” 表示 zNode01,“2” 表示 ZNode02,依此类推)

设置多代理Apache Kafka集群

在群集的每个节点上,修改属性 zookeeper.从文件 kafka/config/server.属性中连接:

    zookeeper.connect=zNode01:2181,zNode02:2181,zNode03:2181

在群集的每个节点上,修改 host.name 从文件Kafka/配置/服务器属性的属性:主机名称=zNode0x

在集群的每个节点上,修改kafka/config/server.properties文件中的属性broker.id(集群中的每个代理都应该有一个唯一的id)

共有2个答案

韩良策
2023-03-14

无需在Kafka客户端(Producer

从Kafkav9及以上版本来看,Kafka生产者和消费者不会与动物园管理员沟通。

柴良哲
2023-03-14

您可以传递生产者或消费者中的所有节点。Kafka足够智能,它将连接到具有基于复制因子或分区的所需数据的节点

这是消费者代码:

Properties props = new Properties();
     props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000");
     props.put("session.timeout.ms", "30000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
     }

你可以在这里找到更多信息

注意:这个approch的问题是它将打开多个连接来找出哪个节点保存数据。对于更强大和可扩展的系统,你可以维护分区号和节点名称的映射,这也将有助于负载平衡。

这是生产者示例

Properties props = new Properties();
 props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092");
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("batch.size", 16384);
 props.put("linger.ms", 1);
 props.put("buffer.memory", 33554432);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 Producer<String, String> producer = new KafkaProducer<>(props);
 for(int i = 0; i < 100; i++)
     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

 producer.close();

更多信息请点击此处

 类似资料:
  • 我的第二个问题是:是否需要?我将来可能会添加更多的节点。

  • 我使用他们的web UI在EMR上创建了一个AWS Spark2.2集群(这里是新手)。我知道我需要连接到主节点,以便开始发出pyspark命令来学习Spark。但是,当我尝试连接到主节点时,它给我一个错误。在浏览了internet之后,我发现使用可能有助于调试正在进行的操作,但我找不到任何有用的信息。下面是我的ssh调试日志。 有人能指出这里的问题是什么吗?编辑:我已经尝试过将端口22添加到安全

  • 我不知道如何通过N连接到AWS的ElastiCache Redisode.js.我已经成功地通过node_redisNPM连接到主主机(001),但是我无法使用ioredis的集群能力,因为显然ElastiCache没有实现CLUSTER命令。 我认为必须有另一种方法,但用于节点的AWS SDK只有用于管理ElastiCache的命令,而不是用于实际连接到ElastiCache的命令。 如果不使用

  • 因为每个 Disque 节点都会将自己的配置信息储存在 disque-server 运行的文件夹里面, 而同一个文件夹只能有一份这样的配置信息, 所以如果我们打算同时运行多个节点, 那么就必须在不同的文件夹里面运行 disque-server , 并为每个节点指定不同的端口。 假设我们现在打算运行三个 Disque 节点, 那么首先要做的就是创建三个文件夹, 然后分别在这些文件夹里面运行 disq

  • 我需要在不同的机器上配置一个Kafka集群,但它不起作用,当我启动生产者和消费者时,将显示以下错误: 你能帮帮我吗。

  • 我试图找到这个问题的答案,但在kubernetes文档或任何问答论坛中都找不到。 我有一个运行有4个节点的kubernetes集群。是否可以创建第二个集群,重用前一个集群中的一个或多个节点?或者一个节点被限制在单个kubernetes集群中? 我正在使用RKE(用于部署k8集群的牧场工具)运行实际的集群,我发现这个问题让我怀疑这种可能性。 感谢您的澄清。