当前位置: 首页 > 知识库问答 >
问题:

Python Kafka 消费者缺少轮询某些消息

甄胡非
2023-03-14

我的Kafka消费者的代码是这样的

def read_messages_from_kafka():
    topic = 'my-topic'
    consumer = KafkaConsumer(
        bootstrap_servers=['my-host1', 'my-host2'],
        client_id='my-client',
        group_id='my-group',
        auto_offset_reset='earliest',
        enable_auto_commit=False,
        api_version=(0, 8, 2)
    )
    consumer.assign([TopicPartition(topic, 0), TopicPartition(topic, 1)])

    messages = consumer.poll(timeout_ms=kafka_config.poll_timeout_ms, max_records=kafka_config.poll_max_records)

    for partition in messages.values():
        for message in partition:
            log.info("read {}".format(message))

    if messages:
        consumer.commit()

    next_offset0, next_offset1 = consumer.position(TopicPartition(topic, 0)), consumer.position(TopicPartition(topic, 1))
    log.info("next offset0={} and offset1={}".format(next_offset0, next_offset1))

while True:
    read_messages_from_kafka()
    sleep(kafka_config.poll_sleep_ms / 1000.0)

我已经意识到,这种消费者设置无法读取所有信息。我无法再现这一点,因为这是一个间歇性的问题。

当我使用 kafka-cat 将最后 100 条消息与此消费者进行比较时,我发现我的消费者间歇性地随机错过了几条消息。我的消费者有什么问题?

kafkacat -C -b my-host1 -X broker.version.fallback=0.8.2.1 -t my-topic -o -100

在python中使用消息的方法太多了。应该有一种最好只有一种明显的方法来做到这一点。

共有1个答案

石正卿
2023-03-14

您的Kafka客户端中存在丢失消息的问题。我在这里找到了解决方案

while True:
    raw_messages = consumer.poll(timeout_ms=1000, max_records=5000)
    for topic_partition, messages in raw_messages.items():
        application_message = json.loads(message.value.decode())

还有另一个Kafka客户端存在:confluent_kafka。它没有这样的问题。

 类似资料:
  • 我是Apache Camel的新手,我试图在一个简单的项目中理解和使用轮询消费者EIP,但我感到有点迷茫…谁能帮我解释一下,甚至用一个小的工作例子。 如有任何帮助,我们将不胜感激

  • 我有一个Kafka主题,并为其附加了1个消费者(主题只有1个分区)。现在对于超时,我使用默认值(心跳:3秒,会话超时:10秒,轮询超时:5分钟)。 根据留档,轮询超时定义消费者必须在其他代理将该消费者从消费者组中删除之前处理消息。现在假设,消费者只需1分钟即可完成处理消息。 现在我有两个问题

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 我有一个Kafka消费者,其中消息通过HTTP POST调用传递给另一个应用程序。我还使用手动提交偏移量 确认。确认(); 有一些HTTP返回错误代码,我们忽略错误并提交偏移量,还有一些错误代码我们不提交偏移量。问题是,kafka使用者仅在我重新启动使用者时才轮询未提交的消息。如果分区中有未提交的消息,是否还有轮询消息的地方?

  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认

  • 我用的是阿帕奇·Kafka。我创建了一个war文件,其中生产者用Java编码,消费者用Scala编码。制作人正在从HTML页面获取数据。我可以看到,生产商发布的大部分数据都是关于消费者的,但有些数据缺失。 这是我的制片人代码 文件1 } 文件2 现在,我使用以下命令检查消费者的消息。 我是否缺少任何生产者配置?