当前位置: 首页 > 知识库问答 >
问题:

Python Kafka用户在消息到达时不读取消息

年运珧
2023-03-14

我刚开始学Kafka,Kafka-蟒蛇。在下面的代码中,我试图在消息到达时读取它们。但出于某种原因,消费者似乎要等到一定数量的消息积累后才能获取它们。

我最初以为是因为正在批量出版的制片人。当我运行“kafka-console-consumer--bootstrap-servers--topic”时,我可以看到发布后收到的每一条消息(就像在consumer控制台上看到的那样)

def run():
    success_consumer = KafkaConsumer('success_logs',
                                     bootstrap_servers=KAFKA_BROKER_URL,
                                     group_id=None,
                                     fetch_min_bytes=1,
                                     fetch_max_bytes=10,
                                     enable_auto_commit=True)
    #dummy poll
    success_consumer.poll()
    for msg in success_consumer:
        print(msg)

    success_consumer.close()

有人能指出用KafKaconsumer改变了什么配置吗?为什么它不能读取像“Kafka-控制台-消费者”这样的消息?

共有1个答案

公孙盛
2023-03-14

KafkaConsumer类还有一个FETCH_MAX_WAIT_MS参数。您应该将其设置为0

success_consumer = KafkaConsumer(...,fetch_max_wait_ms=0)
 类似资料:
  • 我们有一个Kafka制作人,偶尔会制作一些信息。 我写了一个消费者来消费这些消息。问题是,只有当两个消息叠加时,它们才会被使用。例如,如果消息是在13:00产生的,消费者不做任何事情。如果另一条消息是在13:01生成的,则消费者会使用这两条消息。在kafkaTool中,在消费者属性中有一个名为LAG的列,当消息未被消费时,该列为1。我缺少的这个东西有什么配置吗? 消费者配置:

  • 我在discord.py做了一个机器人,希望我的机器人在网络钩子在特定频道发送消息时定位一个角色。有办法这么做吗?现在所有我有的是通道ID和我很确定这是一个客户端事件

  • 读取匹配指定过滤条件、并且在上次调用本方法之后接收到的消息。 调用: web3.shh.getFilterMessages(id) 参数: id:String,消息过滤器ID,由shh.newMessageFilter()返回 返回值: Array: 一组消息对象。 示例代码: web3.shh.getFilterMessages('2b47fbafb3cce24570812a82e6e93cd

  • 当我使用spring文档中描述的配置时:

  • 该未读消息接口记录不含在通知内的消息,如当前用户收到的评论、点赞和未处理的审核等,调用相应的列表和操作接口,讲自动清零 GET /user/unread-count Response Status: 200 OK { "counts": { "user_id": 1, "unread_comments_count": 0, "unread_likes_count":

  • 早上好在我的时区。 事先表示感谢并致以最良好的问候