我按照以下说明设置了一个多节点kafka集群。现在,如何连接到动物园管理员?在JAVA中,只连接一个来自生产者/消费者端的动物园管理员可以吗?或者有办法连接所有的动物园管理员节点吗?
设置多节点阿帕奇动物园守护者集群
在集群的每个节点上,将以下行添加到文件kafka/config/zookeeper.properties中
server.1=zNode01:2888:3888
server.2=zNode02:2888:3888
server.3=zNode03:2888:3888
#add here more servers if you want
initLimit=5
syncLimit=2
在群集的每个节点上,在由 dataDir 属性表示的文件夹中创建一个名为 myid 的文件(默认情况下,该文件夹为 /tmp/动物园管理员 )。myid 文件应仅包含节点的 id(“1” 表示 zNode01,“2” 表示 ZNode02,依此类推)
设置多代理Apache Kafka集群
在群集的每个节点上,修改属性 zookeeper.从文件 kafka/config/server.属性中连接:
zookeeper.connect=zNode01:2181,zNode02:2181,zNode03:2181
在群集的每个节点上,修改 host.name 从文件Kafka/配置/服务器属性的属性:主机名称=zNode0x
在集群的每个节点上,修改kafka/config/server.properties文件中的属性broker.id(集群中的每个代理都应该有一个唯一的id)
无需在Kafka客户端(Producer
从Kafkav9及以上版本来看,Kafka生产者和消费者不会与动物园管理员沟通。
您可以传递生产者或消费者中的所有节点。Kafka足够智能,它将连接到具有基于复制因子或分区的所需数据的节点
这是消费者代码:
Properties props = new Properties();
props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
你可以在这里找到更多信息
注意:这个approch的问题是它将打开多个连接来找出哪个节点保存数据。对于更强大和可扩展的系统,你可以维护分区号和节点名称的映射,这也将有助于负载平衡。
这是生产者示例
Properties props = new Properties();
props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();
更多信息请点击此处
我的第二个问题是:是否需要?我将来可能会添加更多的节点。
我使用他们的web UI在EMR上创建了一个AWS Spark2.2集群(这里是新手)。我知道我需要连接到主节点,以便开始发出pyspark命令来学习Spark。但是,当我尝试连接到主节点时,它给我一个错误。在浏览了internet之后,我发现使用可能有助于调试正在进行的操作,但我找不到任何有用的信息。下面是我的ssh调试日志。 有人能指出这里的问题是什么吗?编辑:我已经尝试过将端口22添加到安全
我不知道如何通过N连接到AWS的ElastiCache Redisode.js.我已经成功地通过node_redisNPM连接到主主机(001),但是我无法使用ioredis的集群能力,因为显然ElastiCache没有实现CLUSTER命令。 我认为必须有另一种方法,但用于节点的AWS SDK只有用于管理ElastiCache的命令,而不是用于实际连接到ElastiCache的命令。 如果不使用
因为每个 Disque 节点都会将自己的配置信息储存在 disque-server 运行的文件夹里面, 而同一个文件夹只能有一份这样的配置信息, 所以如果我们打算同时运行多个节点, 那么就必须在不同的文件夹里面运行 disque-server , 并为每个节点指定不同的端口。 假设我们现在打算运行三个 Disque 节点, 那么首先要做的就是创建三个文件夹, 然后分别在这些文件夹里面运行 disq
我需要在不同的机器上配置一个Kafka集群,但它不起作用,当我启动生产者和消费者时,将显示以下错误: 你能帮帮我吗。
我试图找到这个问题的答案,但在kubernetes文档或任何问答论坛中都找不到。 我有一个运行有4个节点的kubernetes集群。是否可以创建第二个集群,重用前一个集群中的一个或多个节点?或者一个节点被限制在单个kubernetes集群中? 我正在使用RKE(用于部署k8集群的牧场工具)运行实际的集群,我发现这个问题让我怀疑这种可能性。 感谢您的澄清。