当前位置: 首页 > 知识库问答 >
问题:

在Kafka-python中重置消费者组中的kafka LAG(更改偏移量)

潘佐
2023-03-14

我在用kafka-consumer-groups.sh工具重置我的滞后时发现了这一点。但是我需要在应用程序中重置它。发现了这个例子,但是好像没有重置。kafka-python在消费者重启后从最后产生的消息中读取示例

    consumer = KafkaConsumer("MyTopic", bootstrap_servers=self.kafka_server + ":" + str(self.kafka_port),
                             enable_auto_commit=False,
                             group_id="MyTopic.group")
    consumer.poll()
    consumer.seek_to_end()
    consumer.commit()

    ... continue on with other code...

运行bin\windows\kafka-consumer-groups.bat--bootstrap-serverlocalhost:9092MyTopic.group--group仍然显示两个分区都有一个LAG。如何将当前偏移量设置为“快速前进”到最后?

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                             HOST             CLIENT-ID
MyTopic         0          52110           66195           14085           kafka-python-1.4.2-6afb6901-c651-4534-a482-15358db42c22 /Host1  kafka-python-1.4.2
MyTopic         1          52297           66565           14268           kafka-python-1.4.2-c70e0a71-7d61-46a1-97bc-aa2726a8109b /Host2  kafka-python-1.4.2

共有2个答案

能修谨
2023-03-14

为了“快进”消费组的偏移量,即清除LAG,您需要创建将加入同一组的新消费者。
控制台命令是:

kafka-console-consumer.sh --bootstrap-server <brokerIP>:9092 --topic <topicName> --consumer-property group.id=<groupName>

同时,您可以运行该命令以查看您描述的滞后,您将看到滞后被擦除。

有德业
2023-03-14

你可能想要这个:

def consumer_from_offset(topic, group_id, offset):
    """return the consumer from a certain offset"""
    consumer = KafkaConsumer(bootstrap_servers=broker_list, group_id=group_id)
    tp = TopicPartition(topic=topic, partition=0)
    consumer.assign([tp])
    consumer.seek(tp, offset)

    return consumer

consumer = consumer_from_offset('topic', 'group', 0)
for msg in consumer:
    # it will consume the msg beginning from offset 0
    print(msg)

 类似资料:
  • 试图理解消费者补偿和消费者群体补偿之间的关系。 下面的堆栈溢出链接提供了对消费群体补偿管理的极好理解<什么决定Kafka消费补偿?现在问题来了, 情节: 我们在一个消费者组组1中有消费者(c1)。 偏移值是否将存储在消费者(c1)和组(group1)两个级别?或者如果消费者属于任何消费者组,偏移量将存储在仅消费者组级别? 如果偏移值将存储在两个级别中,它是否是消费者级别偏移值将覆盖消费者组级别偏移

  • 我有多个Kafka消费者和制作人,主题不同。使用独立应用程序,我想监控Kafka消费者的延迟。 我使用Kafka0.10.0.1,因为Kafka现在存储消费者偏移Kafka本身,所以我怎么能读到相同的。 我能够读取每个分区的主题偏移量。

  • 我有Kafka流应用程序。我的应用程序正在成功处理事件。 如何使用重新处理/跳过事件所需的偏移量更改Kafka committed consumer offset。我试过如何更改topic?的起始偏移量?。但我得到了“节点不存在”错误。请帮帮我。

  • 我正在使用Kafka2.0版和java消费者API来消费来自一个主题的消息。我们使用的是一个单节点Kafka服务器,每个分区有一个使用者。我注意到消费者正在丢失一些消息。场景是:消费者投票主题。我为每个线程创建了一个消费者。获取消息并将其交给处理程序来处理消息。然后使用“至少一次”的Kafka消费者语义来提交Kafka偏移量来提交偏移量。同时,我有另一个消费者使用不同的group-id运行。在这个

  • 我有一个话题是两个消费群体消费的。题目中有10条留言。 现在我开始应用程序2(消费者组2),它正在消费相同的主题。它不在处理消息。当我描述kafka-consumer-groups(带有--group consumerGroup2)时,它令人惊讶地显示CURRENT-OFFSET=10和LOG-END-OFFSET=10。 理想情况下,这种情况不应该发生,并且kafka应该能够识别对于消费者组2没