当前位置: 首页 > 知识库问答 >
问题:

在码头化的环境中无法从烧瓶连接到Kafka

水浩歌
2023-03-14

我试图建立一个Flask应用程序,有Kafka作为一个界面。我使用了一个Python连接器,kafka-python和一个用于Kafka的Docker图像,Spotify/KafkaProxy。

下面是docker-compose文件。

version: '3.3'
services:
  kafka:
    image: spotify/kafkaproxy
    container_name: kafka_dev
    ports:
      - '9092:9092'
      - '2181:2181'
    environment:
      - ADVERTISED_HOST=0.0.0.0
      - ADVERTISED_PORT=9092
      - CONSUMER_THREADS=1
      - TOPICS=PROFILE_CREATED,IMG_RATED
      - ZK_CONNECT=kafka7zookeeper:2181/root/path
  flaskapp:
    build: ./flask-app
    container_name: flask_dev
    ports:
      - '9000:5000'
    volumes:
      - ./flask-app:/app
    depends_on:
      - kafka
from kafka import KafkaConsumer, KafkaProducer

TOPICS = ['PROFILE_CREATED', 'IMG_RATED']
BOOTSTRAP_SERVERS = ['kafka:9092']

consumer = KafkaConsumer(TOPICS, bootstrap_servers=BOOTSTRAP_SERVERS)
Traceback (most recent call last):
  File "./app.py", line 11, in <module>
    consumer = KafkaConsumer("PROFILE_CREATED", bootstrap_servers=BOOTSTRAP_SERVERS)
  File "/usr/local/lib/python3.6/site-packages/kafka/consumer/group.py", line 340, in __init__
    self._client = KafkaClient(metrics=self._metrics, **self.config)
  File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 219, in __init__
    self.config['api_version'] = self.check_version(timeout=check_timeout)
  File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 819, in check_version
    raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable
  1. 我能够从烧瓶容器运行ping Kafka,并从Kafka容器获取数据包。
  2. 当我在本地运行Flask应用程序时,试图通过设置bootstrap_servers=['localhost:9092']连接到Kafka容器,它可以正常工作。

共有1个答案

丁震博
2023-03-14

更新

正如cricket_007所提到的,假设您使用的是下面提供的docker-compose,那么您应该使用Kafka:29092从另一个容器连接到Kafka。因此您的代码如下所示:

from kafka import KafkaConsumer, KafkaProducer

TOPICS = ['PROFILE_CREATED', 'IMG_RATED']
BOOTSTRAP_SERVERS = ['kafka:29092']

consumer = KafkaConsumer(TOPICS, bootstrap_servers=BOOTSTRAP_SERVERS)

结束更新

---
version: '2'
services:
zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
    ZOOKEEPER_CLIENT_PORT: 2181
    ZOOKEEPER_TICK_TIME: 2000

kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
    - zookeeper
    ports:
    - 9092:9092
    environment:
    KAFKA_BROKER_ID: 1
    KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
    KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
    KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
    KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

flaskapp:
    build: ./flask-app
    container_name: flask_dev
    ports:
    - '9000:5000'
    volumes:
    - ./flask-app:/app

请确保查看其他示例,这些示例可能对您有用,尤其是在转移到生产环境时:https://github.com/confluentinc/cp-docker-images/tree/5.0.1-post/examples

同样值得检查的是:

似乎需要指定api_version来避免此错误。有关更多细节,请查看此处。

producer = KafkaProducer(
    bootstrap_servers=URL,
    client_id=CLIENT_ID,
    value_serializer=JsonSerializer.serialize,
    api_version=(0, 10, 1)
)

这里描述了实际问题

 类似资料:
  • 我已经用flask在python上制作了一个restapi(端口:5000),我正在从一个网站(端口:80)发出get和post请求。我收到了cors错误,所以我尝试在RESTAPI中为站点创建一个响应头。但是我得到了导入错误: 我已经下载了烧瓶cors模块并升级它,并确保它是在正确的路径,但它仍然不工作。 API代码:

  • 我基本上使用了install命令“$pip install Flask”,当我试图运行一个程序时,它会说“找不到模块”Flask安装在“/usr/local/lib/python2.7/site包”中,但我认为pip的意义在于,我可以到处导入这些包。我试图在我的桌面上运行一个文件,甚至当我将Flask文件夹移动到桌面上时,它也不起作用。有什么建议吗?谢谢

  • 我正在学习Flask和Postman,目前,我正在尝试解决这个问题。下面是Python的代码行: 运行之后,当我去邮递员处运行以下命令时:获取:http://127.0.0.1:5000/get_chain控制台向我显示以下消息:错误:connect econnrefered127.0.0.1:5000

  • 2020-10-05 17:59:33,327:异常/[GET]Traceback(最近一次调用最后一次): File"/home/parshuram/. Virtualenvs/myVirtualenv/lib/python3.8/site-pack/flask/app.py",第2446行,wsgi_app响应=self.full_dispatch_request()File"/home/pa

  • 我想要能够得到数据发送到我的烧瓶应用程序。我尝试访问,但它是一个空字符串。如何访问请求数据? 这个问题的答案导致我在Python Flask中询问Get raw POST body next Content-Type header,这是关于获取原始数据而不是解析数据的。