我的消费者并不是每次都能收到信息。我有3个代理(3个服务器)的Kafka集群,有3个主题和复制因子3的分区。
Topic: my-topic Partition: 0 Leader: 2 Replicas: 2,1,3 Isr: 3,1,2
Topic: my-topic Partition: 1 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
Topic: my-topic Partition: 2 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
我有Java中的消费者,我将最大轮询记录设置在50000获取字节上,配置在50MB上。应用程序每分钟都进行轮询。当我向主题“my-topic”发送10条消息时,consumer不会给我所有的消息,而是只给我其中的一部分,其余的将在下一次运行中给我。消息是在applicatin睡眠期间由脚本生成的。你认为轮询方法只给我来自服务器/分区的消息是由分区造成的吗?哪个消息首先响应,其余的消息在下一次运行时才响应?
Map<String, Object> configurations = new HashMap<>();
configurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
configurations.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "true");
configurations.put(ConsumerConfig.GROUP_ID_CONFIG, groupId ;
configurations.put(ConsumerConfig.CLIENT_ID_CONFIG, groupId);
configurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
configurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configurations.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
configurations.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "52428800");
configurations.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "52428800");
configurations.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "3600000");
configurations.put(JsonDeserializer.TRUSTED_PACKAGES, "my.package.model");
configurations.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "50000");
consumer = new KafkaConsumer<Object, Object>(configurations);
consumer.subscribe(Collections.singletonList("my-topic"));
while(true) {
ConsumerRecords<Object, Object> records = consumer.poll(Duration.ofMillis(10000));
if(records.count() > 0) {
LOGGER.debug("records count: {}", records.count());
handleMessages(records);
consumer.commitSync();
}
sleep(60000);
}
2021-11-23 08:27:14.851 [DEBUG] --- records count: 3
2021-11-23 08:27:14.853 [DEBUG] --- offset=1175, client=test-27, ts=1637652419417
2021-11-23 08:27:14.857 [DEBUG] --- offset=1176, client=test-28, ts=1637652419418
2021-11-23 08:27:14.860 [DEBUG] --- offset=1177, client=test-29, ts=1637652419418
2021-11-23 08:28:14.924 [DEBUG] --- records count: 7
2021-11-23 08:28:14.925 [DEBUG] --- offset=232304, client=test-20, ts=1637652419406
2021-11-23 08:28:14.929 [DEBUG] --- offset=232305, client=test-21, ts=1637652419407
2021-11-23 08:28:14.933 [DEBUG] --- offset=232306, client=test-24, ts=1637652419411
2021-11-23 08:28:14.937 [DEBUG] --- offset=1141, client=test-22, ts=1637652419408
2021-11-23 08:28:14.941 [DEBUG] --- offset=1142, client=test-23, ts=1637652419410
2021-11-23 08:28:14.944 [DEBUG] --- offset=1143, client=test-25, ts=1637652419414
2021-11-23 08:28:14.949 [DEBUG] --- offset=1144, client=test-26, ts=1637652419415
有谁知道我在配置中做错了什么或遗漏了一些参数,以及如何修复它吗?
谢谢
这是正常的,Kafka不能保证在一个批处理中获得所有消息。
在轮询之间通常不会Hibernate,因为这会导致客户端超时。相反,您依赖于轮询持续时间来防止繁忙的旋转。
我们正在使用Spring kafka来消费消息。我们已经为每个分区创建了接收消息的接收器。现在我们需要多个接收者从单个分区接收消息。 对于例如。假设我们有一个分区0。目前,我们只有一个接收器(接收器1)从这个分区接收消息。现在我想为同一个分区(分区0)添加另一个接收器(接收器2)。 因此,如果生产者向这个分区发送100条消息,接收器1应该接收50条消息,其余50条消息应该在接收器2中接收。我不希望
我已经在c中创建了kafka消费者,并创建了一个具有10个分区的主题,当我尝试使用消费者读取数据时,它仅从2个分区读取,然后说没有更多的消息。我尝试使用这两种方法,即订阅和分配,但它们都不起作用。我应该如何将所有10个分区分配给单个使用者,这是将分区分配给使用者的正确方法吗?我已经使用此存储库构建了自定义消费者 https://github.com/edenhill/librdkafka/blob
我有一个将消息写入主题/分区的生产者。为了保持顺序,我希望使用单个分区,我希望12个使用者读取来自这个分区的所有消息(没有使用者组,所有消息都应该发送给所有使用者)。这是可以实现的吗?我读过一些论坛,每个分区只有一个用户可以阅读。
使用Kafka Simple Consumer可以读取多个分区吗?简单使用者在以下情况下使用分区: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0SimpleConsumer示例
我知道kafka将一个主题的数据安排在许多分区上,一个消费者组中的消费者被分配到不同的分区,从那里他们可以接收数据: 我的问题是: 术语,它们是由主机/IP标识的,还是由客户端连接标识的? 换句话说,如果我启动两个线程或进程,使用相同的消费者组运行相同的Kafka客户端代码,它们被认为是一个消费者还是两个消费者?
我的消费者代码如下所示: 我是否应该设置另一个属性来允许单个作业从多个分区使用?