当前位置: 首页 > 知识库问答 >
问题:

Kafka-蟒蛇:生产者无法连接

姬向明
2023-03-14

kafka-python(1.0.0)在连接到代理时抛出错误。同时 /usr/bin/kafka-console-producer和 /usr/bin/kafka-console-consumer正常工作。

Python应用程序过去也运行良好,但是在动物园管理员重新启动后,它不再能够连接。

我使用文档中的裸露骨骼示例:

from kafka import KafkaProducer
from kafka.common import KafkaError

producer = KafkaProducer(bootstrap_servers=['hostname:9092'])

# Asynchronous by default
future = producer.send('test-topic', b'raw_bytes')

我收到这个错误:

Traceback (most recent call last):   File "pp.py", line 4, in <module>
    producer = KafkaProducer(bootstrap_servers=['hostname:9092'])   File "/usr/lib/python2.6/site-packages/kafka/producer/kafka.py", line 246, in __init__
    self.config['api_version'] = client.check_version()   File "/usr/lib/python2.6/site-packages/kafka/client_async.py", line 629, in check_version
    connect(node_id)   File "/usr/lib/python2.6/site-packages/kafka/client_async.py", line 592, in connect
    raise Errors.NodeNotReadyError(node_id) kafka.common.NodeNotReadyError: 0 Exception AttributeError: "'KafkaProducer' object has no attribute '_closed'" in <bound method KafkaProducer.__del__ of <kafka.producer.kafka.KafkaProducer object at 0x7f6171294c50>> ignored

单步通过( /usr/lib/python2.6/site-packages/kafka/client_async.py)时,我注意到第270行的评估为false:

270         if not self._metadata_refresh_in_progress and not self.cluster.ttl() == 0:
271             if self._can_send_request(node_id):
272                 return True
273         return False

在我的例子中,_metadata_refresh_in_progress是False,但是ttl()=0;

与此同时,kafka-console-*正在愉快地推送消息:

/usr/bin/kafka-console-producer --broker-list hostname:9092 --topic test-topic
hello again
hello2

有什么建议吗?

共有3个答案

濮彬
2023-03-14

我也有同样的问题。

我用user3503929的提示解决了问题。

kafka服务器安装在windows上。

服务器属性

...
host.name = 0.0.0.0
...

.

producer = KafkaProducer(bootstrap_servers='192.168.1.3:9092',         
                                         value_serializer=str.encode)
producer.send('test', value='aaa')
producer.close()
print("DONE.")

windows kafka客户端的处理没有问题。然而,当我在ubuntu中使用kafka-python向topic发送消息时,出现了< code>NoBrokersAvailable异常。

将以下设置添加到服务器属性。

...
advertised.host.name = 192.168.1.3
...

它在相同的代码中成功运行。为此我花了三个小时。

谢谢

席烨
2023-03-14

我也遇到过类似的问题。在我的例子中,代理主机名在客户端无法解析。尝试在配置文件中显式设置advertised.host.name

安经纶
2023-03-14

我也有同样的问题,上面的解决方案都不管用。然后我阅读异常消息,似乎必须指定api_version,所以

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],api_version=(0,1,0))

注意:与 kafka 版本 1.0.0 匹配的元组 (1,0,0

工作正常(至少完成无例外,现在必须说服它接受消息;))

 类似资料:
  • 我有一组Kafka代理实例作为集群运行。我有一个客户正在生产数据给Kafka: 当我们使用tcpdump进行监控时,我可以看到只有到broker1和broker2的连接被建立,而对于broker3,没有来自我的生产者的连接。我有一个只有一个分区的单一主题。 我的问题是: > 为什么在我的情况下,我无法连接到broker3?或者至少我的网络监控没有显示我的制作人与broker3建立了连接? 如果我能

  • 我正在运行Ubuntu 18.04。 我使用mysql连接器-python连接Python到MySQL。 我使用的是Python 3.6.7,并且已经安装了mysql连接器-python。 我已经安装了mysql连接器-python-py3_8.0.13-1ubuntu18.10_all.deb. 在运行Python脚本时,mysql。连接器模块似乎加载正确,但脚本在碰到光标时失败。next()具

  • 假设我有一些资源,我想在用python编写的aws lambda中的不同请求之间共享。我应该如何实现这一点? 是否有“启动后”挂钩,或者我应该在第一次调用时惰性地创建资源?“延迟初始化”的缺点是,它意味着一些请求会随机变慢,因为您选择了一个消费者来承担启动成本。 此外…这些资源会在lambda可执行文件被“冻结”后幸存下来吗? 本页https://docs.aws.amazon.com/lambd

  • 我们目前在HDF(Hortonworks Dataflow)3.3.1上,它捆绑了Kafka 2.0.0,并且正在尝试使用分布式模式下的Kafka Connect,以推出一个Google Cloud PubSub接收器连接器。我们正在计划将一些元数据发回到Kafka主题中,并且需要将一个Kafka生产者集成到Sink任务Java代码的flush()函数中。 这是否会对Kafka Connect向K

  • 我为Kafka建立了一个docker形象(Wurstmeister/Kafka-Docker)。在docker容器中,我可以使用内置的shell脚本创建主题、生成消息和使用消息。现在,我使用https://github.com/mapr-demos/kafka-sample-programs托管的代码从我的主机连接到kafka broker。在构建和运行程序之后,什么都没有发生,程序就会堆积起来。

  • 我正在尝试点击图中所示的下拉菜单 这就是我尝试过的 但它给了我这个错误 NoSuchElementException: Message:找不到element://div[@ class = ' choosed-container choosed-container-single ']//a[@ class = ' choosed-single choosed-single-with-deselec