我正在用celery构建一个用于背景任务的flask应用程序。我的应用程序正在使用docker容器中运行的localstack,在本地为我的message Broker模拟SQS。我已经让flask和celery在本地运行,以便与localstack正确工作,在localstack中,我可以看到flask接收请求,向SQS队列添加消息,然后celery接收该任务并执行它。
我尝试了dockerize flask和celery以及localstack,除了celery worker(它不执行队列中的任务)之外,我的所有服务都按预期运行。我可以在本地启动一个celery worker来读取队列并执行任务,但docker celery worker不会拉出任何任务。
在flask容器中运行celery worker也得到了相同的结果,并且添加了我在这个github线程中找到的类似--withon-gossip
的参数。
我是否遗漏了docker体系结构中的一些东西,使celery不能从SQS队列中拉出?
这是我的docker-compose.yml:
services:
dev:
build:
context: .
dockerfile: 'dev.Dockerfile'
ports:
- "5050:5000"
restart: always
volumes:
- .:/app
environment:
- GUNICORN_CMD_ARGS="--reload"
- docker_env=true
stdin_open: true
tty: true
command: ./entrypoint.sh
depends_on:
- localstack
# mimics AWS services locally
localstack:
image: localstack/localstack:latest
ports:
- '4561-4599:4561-4599'
- '8080:8080'
environment:
- SERVICES=sqs
- DEBUG=1
- DATA_DIR=/tmp/localstack/data
volumes:
- './.localstack:/tmp/localstack'
restart: always
celery:
build:
context: .
dockerfile: 'dev.Dockerfile'
volumes:
- .:/app
environment:
- docker_env=true
stdin_open: true
tty: true
command: ./celeryworker.sh
restart: always
links:
- localstack
depends_on:
- localstack
volumes:
.:
Dev.DockerFile:
FROM python:3.6
USER root
# Environment Variables
ENV HOST 0.0.0.0
ENV PORT 5000
# Install Packages
COPY requirements.txt /requirements.txt
RUN /bin/bash -c "python3 -m venv docker \
&& source docker/bin/activate \
&& pip3 install --upgrade pip \
&& pip3 install -r requirements.txt"
# Source Code
WORKDIR /app
COPY . .
COPY app/gunicorn_config.py /deploy/app/gunicorn_config.py
COPY entrypoint.sh /bin/entrypoint.sh
COPY celeryworker.sh /bin/celeryworker.sh
USER nobody
EXPOSE 5000
CMD ["bash"]
source ../docker/bin/activate
gunicorn -c app/gunicorn_config.py --bind 0.0.0.0:5000 'app:create_app()'
source ../docker/bin/activate
celery worker -A app.celery_worker.celery
celeryconfig.py
import os
BROKER_URL = 'sqs://fake:fake@localhost:4576/0' # local sqs
# BROKER_URL = 'redis://localhost:6379/0' # local redis
# RESULT_BACKEND = 'redis://localhost:6379/0' # local redis
if os.getenv('docker_env', 'local') != 'local':
BROKER_URL = 'sqs://fake:fake@localstack:4576/0'
BROKER_TRANSPORT_OPTIONS = {'queue_name_prefix': 'app-'}
我在本地使用相同的命令,并且一切都按预期运行。我还重新创建了本地虚拟环境,以确保在requirements.txt
中没有额外的包
通过在localstack环境变量中设置hostname
和hostname_external
来解决此问题。通过在localhost
和localstack
之间更改这两者,我可以让我的本地或docker celery工作者拉取任务。
我最近将一台服务器从ActiveMQ从5.8升级到了最新版本(5.11.1)。从那以后,我偶尔注意到,消息将在特定队列中累积,而不会被删除。 我们的架构有一个生产者,一个消费者。我可以看到消费者仍然保持联系,但制作人的信息越来越多。我的解决方案是通过web控制台删除队列。之后,我立即看到消费者重新连接,消息再次开始处理。 如果相关,在这种情况下,生产者正在运行NMS。NET和消费者在Java 1.
我有一个应用程序,在这个应用程序中,我可以在进程的一部分中以JSON格式将消息写入Azure服务总线队列。我有一个下游进程,我想将该消息从队列中弹出,将json转换为一个对象,然后处理该对象。 我没有问题将消息推送到队列上,但我还没有找到任何将消息从队列中逐一或循环弹出的示例。我在微软或Github上看到的每一个例子都是一个控制台应用程序(在网络应用程序中毫无用处),它设置了某种侦听器,可以抓取队
我有一个触发Lambda函数的SQS队列。我正在尝试解析SQS消息并提取相关数据。我特别尝试提取s3对象键和s3 bucket arn。 我当前的Lambda函数: CloudWatch中显示的我的SQS消息: 最后一个错误是: 我还尝试了objectKey=record['Records][0]['s3']['object']['key']。谢谢
如何从CSV文件中提取列? 我对Java有点陌生。你如何从csv文件中提取特定列。例如,如果我有这个数据: 如何提取第一列和第三列?我能够读取整个CSV文件,但我想从中提取特定的列。
所以我使用executorservice创建了一个线程池。 我试图访问线程池队列中的任务数。我看到没有办法得到它。我知道有一些方法来获取线程池执行器中的队列大小,但是我如何使用执行器服务对象来实现这一点。 就像我说的,如果我创建了一个像这样的线程池执行器,我可以得到队列信息 我知道我可以使用tpExecutor。队列size()获取线程池队列中的任务数。但目前我已经使用Executor服务声明了我
我想了解在fork-连接池中处理任务Java顺序。 到目前为止,我在文档中找到的唯一相关信息是关于一个名为“asyncMode”的参数,该参数“如果此池对从未加入的分叉任务使用本地先进先出调度模式,则为真”。 我对这句话的解释是,每个工人都有自己的任务队列;工人从他们自己队列的前面接受任务,或者如果他们自己的队列是空的,从其他工人队列的后面偷走任务;如果asyncMode为真(分别为假),工作人员