现在,Golang Kafka库(sarama)提供了使用者组功能,而kafka
10没有任何外部库帮助。如何在任何给定时间获得使用者组正在处理的当前消息偏移量?
以前,我使用kazoo-go(https://github.com/wvanbergen/kazoo-
go
)来获取我的消费者组消息偏移量,因为它存储在Zookeeper中。现在,我使用sarama-
cluster(https://github.com/bsm/sarama-cluster),我不确定使用哪个API来抵消我的消费者组消息。
我还与Sarama和Kafka合作,以抵消一个话题。
您可以使用以下代码获取偏移量。
package main
import (
"gopkg.in/Shopify/sarama"
"fmt"
)
func main(){
client , err := sarama.Client([]string{"localhost:9092"},nil) // I am not giving any configuration
if err != nil {
panic(err)
}
lastoffset, err := client.GetOffset("topic-test",0,sarama.OffsetNewest)
if err != nil {
panic(err)
}
fmt.Println("Last Commited Offset ",lastoffset)
}
让我知道这是否是您要找的答案,是否有帮助。
我有多个Kafka消费者和制作人,主题不同。使用独立应用程序,我想监控Kafka消费者的延迟。 我使用Kafka0.10.0.1,因为Kafka现在存储消费者偏移Kafka本身,所以我怎么能读到相同的。 我能够读取每个分区的主题偏移量。
试图理解消费者补偿和消费者群体补偿之间的关系。 下面的堆栈溢出链接提供了对消费群体补偿管理的极好理解<什么决定Kafka消费补偿?现在问题来了, 情节: 我们在一个消费者组组1中有消费者(c1)。 偏移值是否将存储在消费者(c1)和组(group1)两个级别?或者如果消费者属于任何消费者组,偏移量将存储在仅消费者组级别? 如果偏移值将存储在两个级别中,它是否是消费者级别偏移值将覆盖消费者组级别偏移
我们有一个问题,似乎Kafka消费者没有收到发布到某个主题的消息。(我说这是因为我还没有弄清楚这件事的真相,我可能错了。) 我使用Spring for Apache Kafka,而我的消费者实际上是一个用注释的方法。 这个问题是断断续续的,我很难重新创建它。 有没有一种方法让我看看Kafka经纪人的日志,或任何其他工具,以帮助我找出抵消为我的消费者?我想要具体的证据来证明我的消费者是否收到了信息。
我正在使用事务性KafkaProducer向主题发送消息。这个很管用。我使用的是具有read_committed隔离级别的KafkaConsumer,而我的seek和seekToEnd方法存在问题。根据文档,seek和seekToEnd方法给出了LSO(上次稳定偏移量)。但这有点让人摸不着头脑。因为它给我的价值总是一样的,主题结束了。无论最后一个条目是(由生产者提交的)还是中止的事务的一部分。例如
我已经将enable.auto.commit设置为true,并将auto.commit.interval.ms设置为10,000(即10秒)。现在我的问题是--消费者是每个记录的提交偏移量,还是根据10秒内消耗的记录数提交并提前偏移量?
我正在探索ChronicleQueue来保存我的一个应用程序中生成的事件。我想在经过一些处理后,将保存的事件按其原始发生顺序发布到不同的系统。我有多个应用程序实例,每个实例都可以运行一个单线程appender来将事件添加到ChronicleQueue。尽管跨实例排序是必要的,但我想理解以下两个问题。 2)还需要一些建议,如果有一种方法来保持跨实例的事件顺序。