具有特定组id的使用者连接到代理,监听主题不到1分钟,然后断开连接(根据业务逻辑)。当它监听主题时,它可以使用一些消息。当同一个使用者重复这个动作时,它会使用相同的消息!
我发现Kafka用间隔1分钟保存偏移。这意味着消费者必须听超过1分钟的主题。我怎样才能缩短这个间隔?
我发现了这样的属性:
log.flush.offset.checkpoint.interval.ms=6000
log.flush.start.offset.checkpoint.interval.ms=6000
offset.flush.interval.ms=6000
<?php
$topicConf = new \RdKafka\TopicConf();
$topicConf->set('auto.offset.reset', 'smallest');
$conf = new \RdKafka\Conf();
$conf->set('group.id', 'foo');
$kafkaConsumer = new \RdKafka\Consumer($conf);
$kafkaConsumer->addBrokers('queue.a:9092');
$kafkaConsumer->setLogLevel(LOG_DEBUG);
$topicConf = new \RdKafka\TopicConf();
$topicConf->set('auto.offset.reset', 'smallest');
$queue = $kafkaConsumer->newQueue();
$topic = $kafkaConsumer->newTopic('topic_name', $topicConf);
$topic->consumeQueueStart(0, \RD_KAFKA_OFFSET_STORED, $queue);
while (true) {
$msg = $queue->consume(2000);
if ($msg !== null) {
var_dump($msg);
}
}
您应该尝试显式提交使用者中的偏移量:
在消费者中显式提交偏移量如果使用自动偏移量提交,则不需要担心显式提交偏移量。但是,如果您决定需要对偏移量提交的时间进行更多的控制,那么您确实需要考虑如何提交偏移量--或者是为了最小化重复,或者是因为您在主使用者轮询循环之外进行事件处理。
摘自Kafka权威指南,第127页。(这是一本免费的电子书,你可以下载)
while (true) {
$msg = $queue->consume(2000);
if ($msg !== null) {
var_dump($msg);
$kafkaConsumer->commit($msg);
}
}
根据我的理解,消费者阅读特定主题的消息,并且消费者客户机将定期提交偏移量。 因此,如果由于某种原因,使用者失败了一个特定的消息,该偏移量将不会被提交,然后您可以返回并重新处理该消息。 是否有任何东西跟踪您刚刚消耗的偏移和您随后提交的偏移?
我们在Kubernetes中基于<code>gcr.io/google_containers/Kubernetes-Kafka:1.0-10.2.1</code>docker映像运行一个Kafka集群,使用<code>gcr.io/google_containers/Kubernetes-zookeeper:1.0-3.4.10</code>,使用三个Kafka和zookeer实例。 我们有几个不
我们有一个非常简单的Kafka Consumer(v 2.6.2)。它是使用者组中唯一的使用者,并且该组是唯一一个阅读主题的组(有6个分区,其中有大约300万个事件)。Broker也是2.6.x版本 由于我们需要实现一个“只有一次”的场景,我们深入研究了一下,如果我们真的只使用一次写入主题的每个事件。不幸的是,我们发现:消费者有时会跳过一个偏移量,有时甚至会跳过一组分区的偏移量。 消费者除了记录之
在一个消费者群体中的所有消费者都失败后,kafka会将该消费者群体的补偿存储多长时间?是否有此配置变量?
一个与主题压缩有关的问题。在压缩主题中,当日志清理器在清理特定键的以前偏移量(3,4,5)时出现延迟(假设5是最新的偏移量),而作为使用者使用这些偏移量时,即使3和4还没有压缩,我会只看到该键的最新偏移量(5)吗?还是使用者将按照该顺序获得(3,4,5)?
是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?