当前位置: 首页 > 知识库问答 >
问题:

Dockerized Celery worker未从Localstack SQS队列中提取任务

叶建柏
2023-03-14

我正在用celery构建一个用于背景任务的flask应用程序。我的应用程序正在使用docker容器中运行的localstack,在本地为我的message Broker模拟SQS。我已经让flask和celery在本地运行,以便与localstack正确工作,在localstack中,我可以看到flask接收请求,向SQS队列添加消息,然后celery接收该任务并执行它。

我尝试了dockerize flask和celery以及localstack,除了celery worker(它不执行队列中的任务)之外,我的所有服务都按预期运行。我可以在本地启动一个celery worker来读取队列并执行任务,但docker celery worker不会拉出任何任务。

在flask容器中运行celery worker也得到了相同的结果,并且添加了我在这个github线程中找到的类似--withon-gossip的参数。

我是否遗漏了docker体系结构中的一些东西,使celery不能从SQS队列中拉出?

这是我的docker-compose.yml:


services:
  dev:
    build:
      context: .
      dockerfile: 'dev.Dockerfile'
    ports:
    - "5050:5000"
    restart: always
    volumes:
    - .:/app
    environment:
    - GUNICORN_CMD_ARGS="--reload"
    - docker_env=true
    stdin_open: true
    tty: true
    command: ./entrypoint.sh
    depends_on: 
      - localstack

  # mimics AWS services locally
  localstack:
    image: localstack/localstack:latest
    ports:
      - '4561-4599:4561-4599'
      - '8080:8080'
    environment:
      - SERVICES=sqs
      - DEBUG=1
      - DATA_DIR=/tmp/localstack/data
    volumes:
      - './.localstack:/tmp/localstack'
    restart: always

  celery:
    build:
      context: .
      dockerfile: 'dev.Dockerfile'
    volumes:
      - .:/app
    environment: 
      - docker_env=true
    stdin_open: true
    tty: true
    command: ./celeryworker.sh
    restart: always
    links:
      - localstack
    depends_on:
      - localstack


volumes:
  .:

Dev.DockerFile:

FROM python:3.6

USER root

# Environment Variables
ENV HOST 0.0.0.0
ENV PORT 5000

# Install Packages
COPY requirements.txt /requirements.txt

RUN /bin/bash -c "python3 -m venv docker \
    && source docker/bin/activate \
    && pip3 install --upgrade pip \
    && pip3 install -r requirements.txt"

# Source Code
WORKDIR /app
COPY . .
COPY app/gunicorn_config.py /deploy/app/gunicorn_config.py
COPY entrypoint.sh /bin/entrypoint.sh
COPY celeryworker.sh /bin/celeryworker.sh


USER nobody

EXPOSE 5000
CMD ["bash"]
source ../docker/bin/activate
gunicorn -c app/gunicorn_config.py --bind 0.0.0.0:5000 'app:create_app()'
source ../docker/bin/activate
celery worker -A app.celery_worker.celery

celeryconfig.py

import os

BROKER_URL = 'sqs://fake:fake@localhost:4576/0'     # local sqs
# BROKER_URL = 'redis://localhost:6379/0'     # local redis
# RESULT_BACKEND = 'redis://localhost:6379/0'     # local redis
if os.getenv('docker_env', 'local') != 'local':
    BROKER_URL = 'sqs://fake:fake@localstack:4576/0'

BROKER_TRANSPORT_OPTIONS = {'queue_name_prefix': 'app-'}

我在本地使用相同的命令,并且一切都按预期运行。我还重新创建了本地虚拟环境,以确保在requirements.txt中没有额外的包

共有1个答案

程谭三
2023-03-14

通过在localstack环境变量中设置hostnamehostname_external来解决此问题。通过在localhostlocalstack之间更改这两者,我可以让我的本地或docker celery工作者拉取任务。

 类似资料:
  • 我最近将一台服务器从ActiveMQ从5.8升级到了最新版本(5.11.1)。从那以后,我偶尔注意到,消息将在特定队列中累积,而不会被删除。 我们的架构有一个生产者,一个消费者。我可以看到消费者仍然保持联系,但制作人的信息越来越多。我的解决方案是通过web控制台删除队列。之后,我立即看到消费者重新连接,消息再次开始处理。 如果相关,在这种情况下,生产者正在运行NMS。NET和消费者在Java 1.

  • 我有一个应用程序,在这个应用程序中,我可以在进程的一部分中以JSON格式将消息写入Azure服务总线队列。我有一个下游进程,我想将该消息从队列中弹出,将json转换为一个对象,然后处理该对象。 我没有问题将消息推送到队列上,但我还没有找到任何将消息从队列中逐一或循环弹出的示例。我在微软或Github上看到的每一个例子都是一个控制台应用程序(在网络应用程序中毫无用处),它设置了某种侦听器,可以抓取队

  • 我有一个触发Lambda函数的SQS队列。我正在尝试解析SQS消息并提取相关数据。我特别尝试提取s3对象键和s3 bucket arn。 我当前的Lambda函数: CloudWatch中显示的我的SQS消息: 最后一个错误是: 我还尝试了objectKey=record['Records][0]['s3']['object']['key']。谢谢

  • 如何从CSV文件中提取列? 我对Java有点陌生。你如何从csv文件中提取特定列。例如,如果我有这个数据: 如何提取第一列和第三列?我能够读取整个CSV文件,但我想从中提取特定的列。

  • 所以我使用executorservice创建了一个线程池。 我试图访问线程池队列中的任务数。我看到没有办法得到它。我知道有一些方法来获取线程池执行器中的队列大小,但是我如何使用执行器服务对象来实现这一点。 就像我说的,如果我创建了一个像这样的线程池执行器,我可以得到队列信息 我知道我可以使用tpExecutor。队列size()获取线程池队列中的任务数。但目前我已经使用Executor服务声明了我

  • 我想了解在fork-连接池中处理任务Java顺序。 到目前为止,我在文档中找到的唯一相关信息是关于一个名为“asyncMode”的参数,该参数“如果此池对从未加入的分叉任务使用本地先进先出调度模式,则为真”。 我对这句话的解释是,每个工人都有自己的任务队列;工人从他们自己队列的前面接受任务,或者如果他们自己的队列是空的,从其他工人队列的后面偷走任务;如果asyncMode为真(分别为假),工作人员