我有一个Kafka主题(1.0.0),只有一个分区。消费者被封装在EAR中,当部署到Wildfly 10时,最后一条消息的轮询始终返回0条消息。虽然主题不是空的。
java prettyprint-override">final TopicPartition tp = new TopicPartition(topic, 0);
final Long beginningOffset = consumer.beginningOffsets(Collections.singleton(tp)).get(tp);
final Long endOffset = consumer.endOffsets(Collections.singleton(tp)).get(tp);
consumer.assign(Collections.singleton(tp));
consumer.seek(tp, endOffset - 1);
当我做民意测验时,我得到0条记录。尽管日志记录显示:
Consumer is now at position 377408 while Topic begin is 0 and end is 377409
当我更改为-2时,如:
consumer.seek(tp, endOffset - 2);
我确实收到一条消息:
Consumer is now at position 377407 while Topic begin is 0 and end is 377409
当然,这不是正确的记录,消息377408在哪里?
尝试了许多方法来寻求结束等,但它从来没有工作。
这是我的消费者配置:
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Configuration.KAFKA_SERVERS.getAsString());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
注意:我尝试了read\u uncommitted和read\u committed,两者的结果相同。
正如Javadoc中提到的,这是因为endOffsets()
返回:
最后一条成功复制的消息的偏移量加一
这实际上是下一条消息将获得的偏移量。
这就是为什么搜索endOffset-1不返回任何信息,而搜索endOffset-2只返回最后一条消息。
我同意这可能不是最直观的行为,但这就是它目前的工作方式!
我在使用者组中轮询来自 Kafka 的消息时遇到问题。我的使用者对象分配给给定的分区 之后,消费者向该分区分配: 之后,我可以计算分区内的消息 和 ..... 在我的主题中有超过30000条消息。问题是我只收到一条消息。 具有< code > max _ poll _ records = 200 < code > AUTO _ OFFSET _ RESET 的消费者配置是最早的 这是我的函数,我正
Kafka consumer有一个配置< code>max.poll.records,它控制对poll()的单次调用中返回的最大记录数,其默认值为500。我将它设置为一个很高的数字,这样我就可以在一次轮询中获得所有的消息。然而,即使这个主题有更多的信息,在一次呼叫中,民意调查只返回几千条信息(大约6000条)。< br> 如何进一步增加单个消费者阅读的邮件数量?
假设我的使用者从一个代理轮询,该代理有多个主题,每个主题有多个分区。我在同一个消费群体中总共有5个消费者。如果我的每个消费者都进行投票,将返回的数据顺序是什么? topicD-分区5 我的问题是,在这个单一的1轮询中,在按顺序移动到下一个主题/分区之前,我会收到来自该主题/分区的所有可用消息吗?意思例如: 在一次投票循环中,我收到了这个... 或者在那个单一的1轮询循环中,有可能接收到这个消息顺序
我想在特定时间停止对特定主题的轮询。 Spring防尘套2.X Springkafka 2.5.5 Kafka版本2.5.1 比如即使有消息进来测试题目分区,消息也是从00到01堆在分区里,没有消耗。 01点之后,我想再次使用有关TEST主题的消息。 如何暂停和恢复?
我用的是Kafka0.8.2。正如文件所说: batch.num.messages指定: 使用异步模式时要在一批中发送的消息数。生产者将等待该数量的消息准备好发送或排队。缓冲器已达到最大毫秒。 和请求。必修的。acks控制代理对请求的确认。 我想知道Kafka经纪人如何发送这个确认,它是否发送批次确认字符,还是每个单独的消息?
我正在尝试从 kafka 主题中获取消息,并看到如果我将 auto.commit.reset 策略设置为“最早”,则所有消息都会得到正确处理。但是,如果设置为“最新”,则第一条消息将丢失,其余消息将得到正确处理。如果我在这里错过了什么,任何人都可以帮忙吗?