我在用kafka-consumer-groups.sh工具重置我的滞后时发现了这一点。但是我需要在应用程序中重置它。发现了这个例子,但是好像没有重置。kafka-python在消费者重启后从最后产生的消息中读取示例
consumer = KafkaConsumer("MyTopic", bootstrap_servers=self.kafka_server + ":" + str(self.kafka_port),
enable_auto_commit=False,
group_id="MyTopic.group")
consumer.poll()
consumer.seek_to_end()
consumer.commit()
... continue on with other code...
运行bin\windows\kafka-consumer-groups.bat--bootstrap-serverlocalhost:9092MyTopic.group--group
仍然显示两个分区都有一个LAG。如何将当前偏移量设置为“快速前进”到最后?
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
MyTopic 0 52110 66195 14085 kafka-python-1.4.2-6afb6901-c651-4534-a482-15358db42c22 /Host1 kafka-python-1.4.2
MyTopic 1 52297 66565 14268 kafka-python-1.4.2-c70e0a71-7d61-46a1-97bc-aa2726a8109b /Host2 kafka-python-1.4.2
为了“快进”消费组的偏移量,即清除LAG,您需要创建将加入同一组的新消费者。
控制台命令是:
kafka-console-consumer.sh --bootstrap-server <brokerIP>:9092 --topic <topicName> --consumer-property group.id=<groupName>
同时,您可以运行该命令以查看您描述的滞后,您将看到滞后被擦除。
你可能想要这个:
def consumer_from_offset(topic, group_id, offset):
"""return the consumer from a certain offset"""
consumer = KafkaConsumer(bootstrap_servers=broker_list, group_id=group_id)
tp = TopicPartition(topic=topic, partition=0)
consumer.assign([tp])
consumer.seek(tp, offset)
return consumer
consumer = consumer_from_offset('topic', 'group', 0)
for msg in consumer:
# it will consume the msg beginning from offset 0
print(msg)
试图理解消费者补偿和消费者群体补偿之间的关系。 下面的堆栈溢出链接提供了对消费群体补偿管理的极好理解<什么决定Kafka消费补偿?现在问题来了, 情节: 我们在一个消费者组组1中有消费者(c1)。 偏移值是否将存储在消费者(c1)和组(group1)两个级别?或者如果消费者属于任何消费者组,偏移量将存储在仅消费者组级别? 如果偏移值将存储在两个级别中,它是否是消费者级别偏移值将覆盖消费者组级别偏移
我有多个Kafka消费者和制作人,主题不同。使用独立应用程序,我想监控Kafka消费者的延迟。 我使用Kafka0.10.0.1,因为Kafka现在存储消费者偏移Kafka本身,所以我怎么能读到相同的。 我能够读取每个分区的主题偏移量。
我有Kafka流应用程序。我的应用程序正在成功处理事件。 如何使用重新处理/跳过事件所需的偏移量更改Kafka committed consumer offset。我试过如何更改topic?的起始偏移量?。但我得到了“节点不存在”错误。请帮帮我。
我正在使用Kafka2.0版和java消费者API来消费来自一个主题的消息。我们使用的是一个单节点Kafka服务器,每个分区有一个使用者。我注意到消费者正在丢失一些消息。场景是:消费者投票主题。我为每个线程创建了一个消费者。获取消息并将其交给处理程序来处理消息。然后使用“至少一次”的Kafka消费者语义来提交Kafka偏移量来提交偏移量。同时,我有另一个消费者使用不同的group-id运行。在这个
我有一个话题是两个消费群体消费的。题目中有10条留言。 现在我开始应用程序2(消费者组2),它正在消费相同的主题。它不在处理消息。当我描述kafka-consumer-groups(带有--group consumerGroup2)时,它令人惊讶地显示CURRENT-OFFSET=10和LOG-END-OFFSET=10。 理想情况下,这种情况不应该发生,并且kafka应该能够识别对于消费者组2没
谢了。