当前位置: 首页 > 知识库问答 >
问题:

Python kafka消费者不会使用来自生产者的消息

华英睿
2023-03-14
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:2181',api_version=(1,0,1))
producer.send('MyFirstTopic1', 'Hello, World!')

这是我的消费者:

from kafka import KafkaConsumer,KafkaProducer,TopicPartition,OffsetAndMetadata
consumer = KafkaConsumer(
 bootstrap_servers=['localhost:2181'],api_version=(1,0,1),
 group_id=None,
 enable_auto_commit=False,
 auto_offset_reset='smallest'
)
consumer.subscribe('MyFirstTopic1',0)
print("hello")
for message in consumer:
 print(message)

所以当运行我的制作人时,它最终会出错。任何人都知道这意味着什么,如果这可能是错的。

File "producer.py", line 3, in <module>
    producer.send('MyFirstTopic1', 'Hello, World!')
  File "/usr/local/lib/python3.5/site-packages/kafka/producer/kafka.py", line 543, in send
    self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
  File "/usr/local/lib/python3.5/site-packages/kafka/producer/kafka.py", line 664, in _wait_on_metadata
    "Failed to update metadata after %.1f secs." % max_wait)
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.

共有1个答案

羊光辉
2023-03-14

看起来您在客户端配置中使用了错误的主机。localhost:2181通常是Zookeeper服务器。

为了让客户机工作,您需要将bootstrap_servers设置为Kafka broker主机名和端口。默认情况下,这是localhost:9092

参见https://kafka-python.readthedocs.io/en/latest/apidoc/kafkaproducer.html

 类似资料:
  • 我有一个消费者作为生产者消费者模式的一部分: 简化: 如果我移除 通过将线程设置为睡眠,CPU使用率攀升到极高的水平(13%),而不是0%。 此外,如果我实例化该类的多个实例,则每个实例的CPU使用率都会以13%的增量攀升。 大约每分钟(可能每30秒)都会向BlockingCollection添加一个新的LogItem,并将适用的消息写入文件。 有没有可能线程以某种方式阻止了其他线程的运行,而系统

  • 本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要

  • 生产者线程与消费者线程使用信号量同步 生产者线程与消费者线程使用信号量同步 源码/* * Copyright (c) 2006-2018, RT-Thread Development Team * * SPDX-License-Identifier: Apache-2.0 * * Change Logs: * Date Author Notes * 2018-08-24 yangjie the f

  • 所谓的生产者消费者模型就是 某个模块(函数)负责生产数据,这些数据由另一个模块来负责处理 一般生产者消费者模型包含三个部分 生产者、缓冲区、消费者 为什么生产者消费者模型要含三个部分?直接生产和消费不行么? 一个案例说明一切 生产者好比现实生活中的某个人 缓冲区好比现实生活中的邮箱 消费者好比现实生活中的邮递员 如果只有生产者和消费者, 那么相当于只有写信的人和邮递员,那么如果将来过去的邮递员离职

  • 我的应用程序有一个生产者和一个消费者。我的生产者不定期地生成消息。有时我的队列会是空的,有时我会有一些消息。我想让我的消费者监听队列,当有消息在其中时,接受它并处理这条消息。这个过程可能需要几个小时,如果我的消费者没有完成处理当前消息,我不希望他接受队列中的另一条消息。 我认为AKKA和AWS SQS可以满足我的需求。通过阅读文档和示例,akka-camel似乎可以简化我的工作。 我在github

  • D: \软件\Kafka\Kafka2.10-0.10.0.1\bin\windows 我使用上面的命令来消费消息,有什么我错过的吗?帮助我: 这个 那些是生产者和消费者......