我正在设置一个分布式 Airflow 群集,其中除芹菜工作线程之外的所有其他所有内容都在一台主机上运行,并且处理在多个主机上完成。气流2.0设置是使用气流文档中给出的 yaml 文件配置的 https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml。在我最初的测试中,当我在同一主机上运行所有内容时,我使架构能够很好地工作。问题是,如何在远程主机上启动芹菜工人?
到目前为止,我试图创建一个上述docker-compose的修剪版本,其中我只在worker主机上启动芹菜工人,其他什么都不做。但我在数据库连接方面遇到了一些问题。在修剪版本中,我更改了URL,使其指向运行db和redis的主机。
dag、日志、插件和postgresql数据库位于所有主机都可见的共享驱动器上。
我应该如何配置?你知道要检查什么吗?连接等。?芹菜工人装卸工组成配置:
---
version: '3'
x-airflow-common:
&airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
environment:
&airflow-common-env
AIRFLOW_UID: 50000
AIRFLOW_GID: 50000
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN:
postgresql+psycopg2://airflow:airflow@airflowhost.example.com:8080/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow@airflowhost.example.com:8080/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@airflow@airflowhost.example.com:6380/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
REDIS_PORT: 6380
volumes:
- /airflow/dev/dags:/opt/airflow/dags
- /airflow/dev/logs:/opt/airflow/logs
- /airflow/dev/plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
services:
airflow-remote-worker:
<<: *airflow-common
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 10s
retries: 5
restart: always
编辑1:我仍然对日志文件有一些困难。共享日志目录似乎并不能解决丢失日志文件的问题。我像建议的那样在main上添加了extra_host定义,并在工作机器上打开了端口8793。辅助角色任务失败,并显示日志:
*** Log file does not exist:
/opt/airflow/logs/tutorial/print_date/2021-07-
01T13:57:11.087882+00:00/1.log
*** Fetching from: http://:8793/log/tutorial/print_date/2021-07-01T13:57:11.087882+00:00/1.log
*** Failed to fetch log file from worker. Unsupported URL protocol ''
以下考虑建立在公认的答案上,因为我认为它们可能与任何新的气流芹菜设置相关:
worker_autoscale
而不是并发
将允许在工作负载增加/减少时动态启动/停止新进程环境中
将环境变量 DUMB_INIT_SETSID 设置为 0
允许过热关机(请参阅文档)的base_log_folder
,可以安全地将工作线程日志保留在本地。例:# docker-compose.yml
services:
airflow-worker:
...
volumes:
- worker_logs:/airflow/logs
...
...
volumes:
worker_logs:
这些设置远非“终极设置”,而是一些对我有用的设置,它们使用核心节点中Airflow的docker-compose和workers:
> < li>
必须可以从运行< code>Webserver的主节点访问工作节点。我发现这个< code>CeleryExecutor架构的图表非常有助于理清事情。
当尝试读取日志时,如果在本地找不到它们,它将尝试从远程工作线程检索它们。因此,您的主节点可能不知道工作线程的主机名,因此您可以更改主机名的解析方式(hostname_callable
设置,默认为socket.getfqdn。这可以通过在
x-airlow-common
定义中添加extra_hosts
config键来实现:
---
version: "3"
x-airflow-common: &airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
environment: &airflow-common-env
...# env vars
extra_hosts:
- "worker-01-hostname:worker-01-ip-address" # "worker-01-hostname:192.168.0.11"
- "worker-02-hostname:worker-02-ip-address"
*请注意,在您具有共享驱动器的特定情况下,我认为日志将在本地找到。
定义并行性、DAG并发性和调度程序解析过程。可以通过使用env vars完成:
x-airflow-common: &airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
environment: &airflow-common-env
AIRFLOW__CORE__PARALLELISM: 64
AIRFLOW__CORE__DAG_CONCURRENCY: 32
AIRFLOW__SCHEDULER__PARSING_PROCESSES: 4
当然,要设置的值取决于您的具体情况和可用资源。这篇文章对这个主题有一个很好的概述。DAG设置也可以在定义DAG时被覆盖。
>
定义工作
线程CELERY__WORKER_CONCURRENCY
,默认值可能是计算机上可用的 CPU 数(文档)。
定义如何访问在主节点中运行的服务。设置IP或主机名并注意匹配主节点中暴露的端口:
x-airflow-common: &airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
environment: &airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CELERY__WORKER_CONCURRENCY: 8
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@main_node_ip_or_hostname:5432/airflow # 5432 is default postgres port
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@main_node_ip_or_hostname:5432/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@main_node_ip_or_hostname:6379/0
共享从“.env”文件读取它们的相同Fernet Key和Secret Key:
environment: &airflow-common-env
AIRFLOW__CORE__FERNET_KEY: ${FERNET_KEY}
AIRFLOW__WEBSERVER__SECRET_KEY: ${SECRET_KEY}
env_file:
- .env
. env文件:
FERNET_KEY=jvYUaxxxxxxxxxxx=
>
群集中的每个节点(主节点和工作节点)都应用相同的设置,这一点至关重要。
定义辅助角色服务的主机名,以避免自动生成与容器 ID 匹配。
公开端口8793,这是用于从worker (docs)获取日志的默认端口:
services:
airflow-worker:
<<: *airflow-common
hostname: ${HOSTNAME}
ports:
- 8793:8793
command: celery worker
restart: always
确保每个工作节点主机都使用相同的时间配置运行,几分钟的差异可能会导致严重的执行错误,而这些错误可能很难找到。考虑在主机操作系统上启用NTP服务
如果工作负荷繁重且并发性较高,则可能需要调整 Postgres 设置,例如
max_connections
和shared_buffers
。这同样适用于主机操作系统网络设置,如ip_local_port_range
或 somaxconn
。
在我在初始集群设置期间遇到的任何问题中,
Flower
和worker执行日志总是提供有用的详细信息和错误消息,任务级日志和Docker Compose服务日志,即:Docker Compose logs--tail=10000 airflow worker
希望这对你有用!
我们正在运行气流1.10.1与芹菜。面对多个打开的连接。在DAG启动时,UI会挂起几分钟。 亮点: 所有节点都是裸金属的:CPU:40,MHz 2494.015,RAM 378G,10Gb NIC- MySQL MySQL连接: Worker.cfg Scheduler.cfg: 另外,我正在运行1000个简单任务,比如或
问题内容: 我在尝试在mesos集群上运行dockerized的mesos-dns时遇到了一些麻烦。 我已经在Windows 8.1主机上使用ubuntu trusty设置了2个虚拟机。我的虚拟机称为 docker-vm 和 docker-sl- vm ;其中第一个运行mesos-master,第二个运行mesos-slave。 VM有2个网卡;一个运行NAT以便通过主机访问Internet,另一
问题内容: 如何在Windows中运行celery worker而不创建Windows Service?有什么比喻吗? 问题答案: 它的完成方式与Linux中相同。将目录更改为包含celery任务的模块并调用效果很好。
我最近开始研究分布式计算以提高计算速度。我选择了芹菜。然而,我对一些术语不太熟悉。所以,我有几个相关的问题。 来自芹菜文档: ... Celery通过消息进行通信,通常使用代理在客户机和工作人员之间进行调解。为了启动任务,客户机将消息添加到队列中,然后代理将该消息传递给工作者。 什么是客户端(这里)?什么是经纪商?为什么消息通过代理传递?为什么 Celery 会使用后端和队列进行进程间通信? 当我
对于Docker Hub项目,如何找到手动触发或通过触发url触发的自动化构建的状态。 < li >我想了解它是否正在运行,以及它当前的运行状态(即仍在运行还是构建已完成) < li >过去版本列表 < li >所有构建的状态(启动时间、完成时间、构建状态)
我正在码头程序窗口工具包上运行 mariadb 实例。我使用风筝在 mariaDB 容器上进行了一个可取的更改。现在,它重新创建了一个实例,丢失了我所有的数据库。有没有办法从中恢复过来? 检查是否存在悬空卷,并且数量很少 docker音量ls -f悬空=真