当前位置: 首页 > 知识库问答 >
问题:

如何使用泊坞站组成在分布式气流架构上配置芹菜工人?

胡元忠
2023-03-14

我正在设置一个分布式 Airflow 群集,其中除芹菜工作线程之外的所有其他所有内容都在一台主机上运行,并且处理在多个主机上完成。气流2.0设置是使用气流文档中给出的 yaml 文件配置的 https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml。在我最初的测试中,当我在同一主机上运行所有内容时,我使架构能够很好地工作。问题是,如何在远程主机上启动芹菜工人?

到目前为止,我试图创建一个上述docker-compose的修剪版本,其中我只在worker主机上启动芹菜工人,其他什么都不做。但我在数据库连接方面遇到了一些问题。在修剪版本中,我更改了URL,使其指向运行db和redis的主机。

dag、日志、插件和postgresql数据库位于所有主机都可见的共享驱动器上。

我应该如何配置?你知道要检查什么吗?连接等。?芹菜工人装卸工组成配置:

---
version: '3'
x-airflow-common:
  &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
  environment:
    &airflow-common-env
    AIRFLOW_UID: 50000
    AIRFLOW_GID: 50000
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: 
postgresql+psycopg2://airflow:airflow@airflowhost.example.com:8080/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow@airflowhost.example.com:8080/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@airflow@airflowhost.example.com:6380/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
    AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
    REDIS_PORT: 6380
   volumes:
    - /airflow/dev/dags:/opt/airflow/dags
    - /airflow/dev/logs:/opt/airflow/logs
    - /airflow/dev/plugins:/opt/airflow/plugins
   user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
services:
  airflow-remote-worker:
    <<: *airflow-common
    command: celery worker
    healthcheck:
      test:
        - "CMD-SHELL"
        - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

编辑1:我仍然对日志文件有一些困难。共享日志目录似乎并不能解决丢失日志文件的问题。我像建议的那样在main上添加了extra_host定义,并在工作机器上打开了端口8793。辅助角色任务失败,并显示日志:

*** Log file does not exist: 
/opt/airflow/logs/tutorial/print_date/2021-07- 
01T13:57:11.087882+00:00/1.log
*** Fetching from: http://:8793/log/tutorial/print_date/2021-07-01T13:57:11.087882+00:00/1.log
*** Failed to fetch log file from worker. Unsupported URL protocol ''

共有2个答案

姜旭
2023-03-14

以下考虑建立在公认的答案上,因为我认为它们可能与任何新的气流芹菜设置相关:

  • 在分布式设置中,启用远程日志记录通常作为集中日志的一种方式派上用场。气流本身支持远程日志记录,例如,请参阅此或此
  • 定义worker_autoscale而不是并发将允许在工作负载增加/减少时动态启动/停止新进程
  • 在工作线程环境中将环境变量 DUMB_INIT_SETSID 设置为 0 允许过热关机(请参阅文档)
  • 在 Docker Compose 中向工作线程添加卷,指向 Airflow 的base_log_folder,可以安全地将工作线程日志保留在本地。例:
# docker-compose.yml

services:
  airflow-worker:
     ...
      volumes:
        - worker_logs:/airflow/logs
     ...
  ...
volumes:
  worker_logs:
司徒河
2023-03-14

这些设置远非“终极设置”,而是一些对我有用的设置,它们使用核心节点中Airflow的docker-compose和workers:

> < li>

必须可以从运行< code>Webserver的主节点访问工作节点。我发现这个< code>CeleryExecutor架构的图表非常有助于理清事情。

当尝试读取日志时,如果在本地找不到它们,它将尝试从远程工作线程检索它们。因此,您的主节点可能不知道工作线程的主机名,因此您可以更改主机名的解析方式(hostname_callable设置,默认为socket.getfqdn。这可以通过在x-airlow-common定义中添加extra_hostsconfig键来实现:

---
version: "3"
x-airflow-common: &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
  environment: &airflow-common-env
    ...# env vars
  extra_hosts:
    - "worker-01-hostname:worker-01-ip-address" # "worker-01-hostname:192.168.0.11"
    - "worker-02-hostname:worker-02-ip-address"

*请注意,在您具有共享驱动器的特定情况下,我认为日志将在本地找到。

  • 定义并行性、DAG并发性和调度程序解析过程。可以通过使用env vars完成:
x-airflow-common: &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
  environment: &airflow-common-env
    AIRFLOW__CORE__PARALLELISM: 64
    AIRFLOW__CORE__DAG_CONCURRENCY: 32
    AIRFLOW__SCHEDULER__PARSING_PROCESSES: 4

当然,要设置的值取决于您的具体情况和可用资源。这篇文章对这个主题有一个很好的概述。DAG设置也可以在定义DAG时被覆盖。

>

  • 定义工作线程CELERY__WORKER_CONCURRENCY,默认值可能是计算机上可用的 CPU 数(文档)。

    定义如何访问在主节点中运行的服务。设置IP或主机名并注意匹配主节点中暴露的端口:

    x-airflow-common: &airflow-common
      image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
      environment: &airflow-common-env
      AIRFLOW__CORE__EXECUTOR: CeleryExecutor
      AIRFLOW__CELERY__WORKER_CONCURRENCY: 8
      AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@main_node_ip_or_hostname:5432/airflow # 5432 is default postgres port
      AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@main_node_ip_or_hostname:5432/airflow
      AIRFLOW__CELERY__BROKER_URL: redis://:@main_node_ip_or_hostname:6379/0
    
    • 共享从“.env”文件读取它们的相同Fernet Key和Secret Key:
      environment: &airflow-common-env
        AIRFLOW__CORE__FERNET_KEY: ${FERNET_KEY}
        AIRFLOW__WEBSERVER__SECRET_KEY: ${SECRET_KEY}
    
      env_file:
        - .env
    

    . env文件:FERNET_KEY=jvYUaxxxxxxxxxxx=

    >

  • 群集中的每个节点(主节点和工作节点)都应用相同的设置,这一点至关重要。

    定义辅助角色服务的主机名,以避免自动生成与容器 ID 匹配。

    公开端口8793,这是用于从worker (docs)获取日志的默认端口:

    services:
      airflow-worker:
        <<: *airflow-common
        hostname: ${HOSTNAME}
        ports:
          - 8793:8793
        command: celery worker
        restart: always
    
    • 确保每个工作节点主机都使用相同的时间配置运行,几分钟的差异可能会导致严重的执行错误,而这些错误可能很难找到。考虑在主机操作系统上启用NTP服务

    如果工作负荷繁重且并发性较高,则可能需要调整 Postgres 设置,例如max_connectionsshared_buffers。这同样适用于主机操作系统网络设置,如ip_local_port_rangesomaxconn

    在我在初始集群设置期间遇到的任何问题中,Flower和worker执行日志总是提供有用的详细信息和错误消息,任务级日志和Docker Compose服务日志,即:Docker Compose logs--tail=10000 airflow worker

    希望这对你有用!

  •  类似资料:
    • 我们正在运行气流1.10.1与芹菜。面对多个打开的连接。在DAG启动时,UI会挂起几分钟。 亮点: 所有节点都是裸金属的:CPU:40,MHz 2494.015,RAM 378G,10Gb NIC- MySQL MySQL连接: Worker.cfg Scheduler.cfg: 另外,我正在运行1000个简单任务,比如或

    • 问题内容: 我在尝试在mesos集群上运行dockerized的mesos-dns时遇到了一些麻烦。 我已经在Windows 8.1主机上使用ubuntu trusty设置了2个虚拟机。我的虚拟机称为 docker-vm 和 docker-sl- vm ;其中第一个运行mesos-master,第二个运行mesos-slave。 VM有2个网卡;一个运行NAT以便通过主机访问Internet,另一

    • 问题内容: 如何在Windows中运行celery worker而不创建Windows Service?有什么比喻吗? 问题答案: 它的完成方式与Linux中相同。将目录更改为包含celery任务的模块并调用效果很好。

    • 我最近开始研究分布式计算以提高计算速度。我选择了芹菜。然而,我对一些术语不太熟悉。所以,我有几个相关的问题。 来自芹菜文档: ... Celery通过消息进行通信,通常使用代理在客户机和工作人员之间进行调解。为了启动任务,客户机将消息添加到队列中,然后代理将该消息传递给工作者。 什么是客户端(这里)?什么是经纪商?为什么消息通过代理传递?为什么 Celery 会使用后端和队列进行进程间通信? 当我

    • 我正在码头程序窗口工具包上运行 mariadb 实例。我使用风筝在 mariaDB 容器上进行了一个可取的更改。现在,它重新创建了一个实例,丢失了我所有的数据库。有没有办法从中恢复过来? 检查是否存在悬空卷,并且数量很少 docker音量ls -f悬空=真

    • 对于Docker Hub项目,如何找到手动触发或通过触发url触发的自动化构建的状态。 < li >我想了解它是否正在运行,以及它当前的运行状态(即仍在运行还是构建已完成) < li >过去版本列表 < li >所有构建的状态(启动时间、完成时间、构建状态)