在我的本地系统中,我已经启动了一个单独的Kafka实例,旁边还有动物园管理员。Zookeper和kafka服务器都运行在默认端口上。
我创建了一个主题“test”,复制因子为1,因为我只有一个kafka实例正在运行。
同时,我还创建了两个分区。
但是当我使用java kafka-client jar创建一个生产者时,即使我对消息使用不同的键,生产者也会将所有消息推送到同一个分区,因为所有消息都是在同一个使用者上接收的。
分区也不是静态的,它在每次运行我的生产者时都在不断变化。
我已经尝试了相同的场景,其中一个生产者从命令提示符启动,配置与我使用java代码提供给kafka-client producer的配置完全相同。命令提示符生成器似乎运行良好,但代码生成器正在将所有消息推送到同一个分区。
public class KafkaProducerParallel {
public static void main(String[] args) throws InterruptedException,
ExecutionException {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "parallelism-
producer");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
LongSerializer.class);
Producer<String, Long> parallelProducer = new KafkaProducer<>
(properties);
for(long i=0;i<100;i++) {
ProducerRecord<String, Long> producerRecord;
if(i<50) {
producerRecord = new ProducerRecord<String,
Long>("second-topic", "Amoeba", i);
}else {
producerRecord = new ProducerRecord<String,
Long>("second-topic", "Bacteria", i);
}
RecordMetadata recordMetadata =
parallelProducer.send(producerRecord).get();
System.out.printf("Sent record : with key %s and value
%d to partition %s", producerRecord.key(), producerRecord.value(),
recordMetadata.partition());
System.out.println();
}
parallelProducer.close();
}
}
根据文档,kafka broker通过使用密钥(生成密钥的散列)来决定将特定消息放入哪个分区。我在一段时间后更改了我的记录的键,但消息仍然每次都要到同一个分区。
代码的控制台输出示例:
Sent record : with key Amoeba and value 0 to partition 1
Sent record : with key Amoeba and value 1 to partition 1
Sent record : with key Amoeba and value 2 to partition 1
Sent record : with key Amoeba and value 3 to partition 1
Sent record : with key Amoeba and value 4 to partition 1
Sent record : with key Amoeba and value 5 to partition 1
Sent record : with key Amoeba and value 6 to partition 1
Sent record : with key Amoeba and value 7 to partition 1
Sent record : with key Amoeba and value 8 to partition 1
Sent record : with key Amoeba and value 9 to partition 1
Sent record : with key Amoeba and value 10 to partition 1
Sent record : with key Amoeba and value 11 to partition 1
Sent record : with key Amoeba and value 12 to partition 1
Sent record : with key Amoeba and value 13 to partition 1
Sent record : with key Bacteria and value 87 to partition 1
Sent record : with key Bacteria and value 88 to partition 1
Sent record : with key Bacteria and value 89 to partition 1
Sent record : with key Bacteria and value 90 to partition 1
Sent record : with key Bacteria and value 91 to partition 1
Sent record : with key Bacteria and value 92 to partition 1
Sent record : with key Bacteria and value 93 to partition 1
Sent record : with key Bacteria and value 94 to partition 1
Sent record : with key Bacteria and value 95 to partition 1
Sent record : with key Bacteria and value 96 to partition 1
Sent record : with key Bacteria and value 97 to partition 1
Sent record : with key Bacteria and value 98 to partition 1
Sent record : with key Bacteria and value 99 to partition 1
一切按预期进行。
在kafkaproducer
(用来确定分区)使用的特定case Partitioner,为两个键计算相同的分区:amoeba
和bacteria
。默认情况下,KafkaProducer使用org.apache.kafka.clients.producer.internals.DefaultPartitioner
。
建议:改变键或增加分区数。
我无法向Kafka主题发布消息,无法得到Kafka制作人的任何回应,它完全卡住了应用程序 Kafka生产者服务代码 2021-05-30 13:29:13.209[0;39M[32M信息[0;39M[35M2472[0;39M[2M---[0;39M[2M[nio-8084-exec-2][0;39M[36MO.apache.coyote.http11.HTTP11Processor[0;39M[
我已经设置了kafka客户端,它可以产生和消费消息,当我们把有效载荷从生产者发送到主题时,它可以正常工作,所以我有问题生产者现在第一个消息我可以发送到主题,我也可以从kafka主题中消费,现在我尝试发送第二个消息,但是消费者没有从kafka主题中读取第二个消息,知道这里发生了什么吗? Producer.js consumer.js
我在向我的Kafka主题发送序列化XML时遇到问题。每当我运行我的代码时,我都不会收到任何异常或错误消息,但我仍然无法在Kafka主题中看到我的任何消息。 我的Kafka制作人设置如下: 当我运行代码时,我得到: 知道怎么做吗?提前谢谢!
我在用Kafka。 我有10k个jsons列表, 我该怎么做呢? 谢谢
假设kafka消息生产者向一个主题发送一条事件消息。然后一个消费者处理这个事件消息。但是,这个消费者进程因为业务错误而抛出异常,所以他想让消息生产者知道它并再次怨恨。 有什么解决办法吗?
我是斯卡拉和Kafka的新手,遇到了一些麻烦。 我正在尝试将scala kafka producer连接到安装在cloudera express服务器上的kafka服务器。我已经用这些指令在VMs中这样做过一次了,没有任何问题。 当我运行producer时,所需的主题被创建,但没有任何消息被发送,或者我是这样认为的。 Kafka制作人 当我执行run方法时,我看到“producer-send:#”