我试图建立一个Flask应用程序,有Kafka作为一个界面。我使用了一个Python连接器,kafka-python和一个用于Kafka的Docker图像,Spotify/KafkaProxy。
下面是docker-compose文件。
version: '3.3'
services:
kafka:
image: spotify/kafkaproxy
container_name: kafka_dev
ports:
- '9092:9092'
- '2181:2181'
environment:
- ADVERTISED_HOST=0.0.0.0
- ADVERTISED_PORT=9092
- CONSUMER_THREADS=1
- TOPICS=PROFILE_CREATED,IMG_RATED
- ZK_CONNECT=kafka7zookeeper:2181/root/path
flaskapp:
build: ./flask-app
container_name: flask_dev
ports:
- '9000:5000'
volumes:
- ./flask-app:/app
depends_on:
- kafka
from kafka import KafkaConsumer, KafkaProducer
TOPICS = ['PROFILE_CREATED', 'IMG_RATED']
BOOTSTRAP_SERVERS = ['kafka:9092']
consumer = KafkaConsumer(TOPICS, bootstrap_servers=BOOTSTRAP_SERVERS)
Traceback (most recent call last):
File "./app.py", line 11, in <module>
consumer = KafkaConsumer("PROFILE_CREATED", bootstrap_servers=BOOTSTRAP_SERVERS)
File "/usr/local/lib/python3.6/site-packages/kafka/consumer/group.py", line 340, in __init__
self._client = KafkaClient(metrics=self._metrics, **self.config)
File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 219, in __init__
self.config['api_version'] = self.check_version(timeout=check_timeout)
File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 819, in check_version
raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable
ping Kafka
,并从Kafka容器获取数据包。bootstrap_servers=['localhost:9092']
连接到Kafka容器,它可以正常工作。更新
正如cricket_007所提到的,假设您使用的是下面提供的docker-compose,那么您应该使用Kafka:29092
从另一个容器连接到Kafka。因此您的代码如下所示:
from kafka import KafkaConsumer, KafkaProducer
TOPICS = ['PROFILE_CREATED', 'IMG_RATED']
BOOTSTRAP_SERVERS = ['kafka:29092']
consumer = KafkaConsumer(TOPICS, bootstrap_servers=BOOTSTRAP_SERVERS)
结束更新
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
flaskapp:
build: ./flask-app
container_name: flask_dev
ports:
- '9000:5000'
volumes:
- ./flask-app:/app
请确保查看其他示例,这些示例可能对您有用,尤其是在转移到生产环境时:https://github.com/confluentinc/cp-docker-images/tree/5.0.1-post/examples
同样值得检查的是:
似乎需要指定api_version来避免此错误。有关更多细节,请查看此处。
producer = KafkaProducer(
bootstrap_servers=URL,
client_id=CLIENT_ID,
value_serializer=JsonSerializer.serialize,
api_version=(0, 10, 1)
)
这里描述了实际问题
我已经用flask在python上制作了一个restapi(端口:5000),我正在从一个网站(端口:80)发出get和post请求。我收到了cors错误,所以我尝试在RESTAPI中为站点创建一个响应头。但是我得到了导入错误: 我已经下载了烧瓶cors模块并升级它,并确保它是在正确的路径,但它仍然不工作。 API代码:
我基本上使用了install命令“$pip install Flask”,当我试图运行一个程序时,它会说“找不到模块”Flask安装在“/usr/local/lib/python2.7/site包”中,但我认为pip的意义在于,我可以到处导入这些包。我试图在我的桌面上运行一个文件,甚至当我将Flask文件夹移动到桌面上时,它也不起作用。有什么建议吗?谢谢
我正在学习Flask和Postman,目前,我正在尝试解决这个问题。下面是Python的代码行: 运行之后,当我去邮递员处运行以下命令时:获取:http://127.0.0.1:5000/get_chain控制台向我显示以下消息:错误:connect econnrefered127.0.0.1:5000
这可能吗?
2020-10-05 17:59:33,327:异常/[GET]Traceback(最近一次调用最后一次): File"/home/parshuram/. Virtualenvs/myVirtualenv/lib/python3.8/site-pack/flask/app.py",第2446行,wsgi_app响应=self.full_dispatch_request()File"/home/pa
我想要能够得到数据发送到我的烧瓶应用程序。我尝试访问,但它是一个空字符串。如何访问请求数据? 这个问题的答案导致我在Python Flask中询问Get raw POST body next Content-Type header,这是关于获取原始数据而不是解析数据的。