当前位置: 首页 > 知识库问答 >
问题:

消费者在运行生产者和消费者 python 脚本时不显示任何消息

赏星河
2023-03-14

我在Ubuntu服务器上设置了Apache Kafka,并按照https://kafka.apache.org/quickstart中提到的前五个步骤进行了测试,一切正常。

然后,我继续安装kafka python 1.4.6以在python中进行测试,并编写了简单的生产者和消费者脚本。

我的侦听器
侦听器配置=纯文本://本地主机:9092
播发.侦听器=纯文本://本地主机:9092

这是脚本

consum.py

from kafka import KafkaConsumer

consumer = KafkaConsumer('my-topic')
for message in consumer:
        print (message)

产品.py

from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers='localhost:9092',api_version=(0, 10, 1))

producer.send('my-topic', b'Hello')

运行脚本时,生产者脚本立即完成,消费者脚本不会打印任何消息

你知道我可能错过了什么吗?谢谢

共有1个答案

裴俊迈
2023-03-14

问题是您的创建者脚本在发送消息之前完成。运行 send() 后,Python 到达脚本的最后一行并立即停止运行。

您需要调用flush()来强制运行库在终止之前等待所有消息发送。

from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers='localhost:9092',api_version=(0, 10, 1))

producer.send('my-topic', b'Hello')

producer.flush()

除此之外,你的逻辑看起来很好(对我来说很有效)。

 类似资料:
  • 本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要

  • 我有一个springboot消费者应用程序。当我第一次运行它时,它消耗了来自Kafka主题的信息。但当我再次运行它时,它停止了消耗。在日志中,我看到以下消息。 我知道消费者无法获得偏移量。在这种情况下,消费者将引用自动偏移重置属性。如您所见,我已将其设置为,希望消费者从头开始阅读。但它没有。 应用程序. yml 在我的Java课上 我尝试了一些东西。 我将值设置为。不出所料,它抛出了一个异常,抱怨

  • 我的应用程序有一个生产者和一个消费者。我的生产者不定期地生成消息。有时我的队列会是空的,有时我会有一些消息。我想让我的消费者监听队列,当有消息在其中时,接受它并处理这条消息。这个过程可能需要几个小时,如果我的消费者没有完成处理当前消息,我不希望他接受队列中的另一条消息。 我认为AKKA和AWS SQS可以满足我的需求。通过阅读文档和示例,akka-camel似乎可以简化我的工作。 我在github

  • 生产者线程与消费者线程使用信号量同步 生产者线程与消费者线程使用信号量同步 源码/* * Copyright (c) 2006-2018, RT-Thread Development Team * * SPDX-License-Identifier: Apache-2.0 * * Change Logs: * Date Author Notes * 2018-08-24 yangjie the f

  • 所谓的生产者消费者模型就是 某个模块(函数)负责生产数据,这些数据由另一个模块来负责处理 一般生产者消费者模型包含三个部分 生产者、缓冲区、消费者 为什么生产者消费者模型要含三个部分?直接生产和消费不行么? 一个案例说明一切 生产者好比现实生活中的某个人 缓冲区好比现实生活中的邮箱 消费者好比现实生活中的邮递员 如果只有生产者和消费者, 那么相当于只有写信的人和邮递员,那么如果将来过去的邮递员离职

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?