当前位置: 首页 > 知识库问答 >
问题:

气流:芹菜工人连接太多

能可人
2023-03-14

我们正在运行气流1.10.1与芹菜。面对多个打开的连接。在DAG启动时,UI会挂起几分钟。

亮点:

  • 所有节点都是裸金属的:CPU:40,MHz 2494.015,RAM 378G,10Gb NIC-

MySQL

+-------------------------+---------     + 
| Variable_name           | Value         |
+-------------------------+---------      +
| Thread pool_idle_threads | 0            |
| Thread pool_threads      | 0            |
| Threads_cached          | 775           |
| Threads_connected       | 5323          |
| Threads_created         | 4846609       |
| Threads_running         | 5             |
+-------------------------+---------      +

MySQL连接:

31  - worker1
215 - worker2
349 - worker53
335 - worker54
347 - worker55
336 - worker56
336 - worker57
354 - worker58
339 - worker59
328 - worker60
333 - worker61
337 - worker62
2   - scheduler

Worker.cfg

[core]
sql_alchemy_pool_size = 5
sql_alchemy_pool_recycle = 900
sql_alchemy_reconnect_timeout = 300
parallelism = 1200
dag_concurrency = 800
non_pooled_task_slot_count = 1200
max_active_runs_per_dag = 10
dagbag_import_timeout = 30
[celery]
worker_concurrency = 100

Scheduler.cfg:

   [core]
    sql_alchemy_pool_size = 30
    sql_alchemy_pool_recycle = 300
    sql_alchemy_reconnect_timeout = 300
    parallelism = 1200
    dag_concurrency = 800
    non_pooled_task_slot_count = 1200
    max_active_runs_per_dag = 10
    [scheduler]
    job_heartbeat_sec = 5
    scheduler_heartbeat_sec = 5
    run_duration = 1800
    min_file_process_interval = 10
    min_file_parsing_loop_time = 1
    dag_dir_list_interval = 300
    print_stats_interval = 30
    scheduler_zombie_task_threshold = 300
    max_tis_per_query = 1024
    max_threads = 29

另外,我正在运行1000个简单任务,比如sleepls

共有1个答案

叶晋
2023-03-14

我们可以把连接从700-800切换到1-10

你可以做两件事:

  1. setsql_alchemy_pool_enabled=False
  2. 设置一个不同的result_backend从DB,在我们的情况下,我们使用redis作为result_backend和MySQL作为主DB
 类似资料:
  • 我使用芹菜独立(不在Django内)。我计划在多台物理机器上运行一种辅助任务类型。该任务执行以下操作 接受XML文档。 转换它。 进行多个数据库读写 我正在使用PostgreSQL,但这同样适用于使用连接的其他存储类型。过去,我使用数据库连接池来避免在每次请求时创建新的数据库连接,或者避免连接打开时间过长。然而,由于每个芹菜工人都在一个单独的过程中运行,我不确定他们实际上如何能够共享游泳池。我是不

  • 当工作人员在创建之后执行任务时,我不断遇到wierdmysql问题。 我们使用django 1.3,芹菜3.1.17,djorm ext pool 0.5 我们用并发3启动芹菜进程。到目前为止,我的观察是,当工人流程开始时,他们都得到相同的mysql定义。我们将数据库连接id记录如下。 当所有工作人员都获得任务时,第一个任务成功执行,但其他两个任务会出现奇怪的Mysql错误。它要么是“Mysql服

  • 我正在设置一个分布式 Airflow 群集,其中除芹菜工作线程之外的所有其他所有内容都在一台主机上运行,并且处理在多个主机上完成。气流2.0设置是使用气流文档中给出的 yaml 文件配置的 https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml。在我最初的测试中,当我在同一主机上运行所有内容时,我使架构能够

  • 现在,我想将< code>register事件发布到某个特殊的交换,我可以使用celery远程检索和处理它。 实际上,我已经使用了函数来实现这一点,但是它必须传递来指示应该执行哪个任务并消费它。所以它似乎不太适合我的目标。 我想要的就是这样: 向某些发布消息; 远程机器1订阅此或并捕获消息,用于执行任务; 远程机器2-与机器1相同但执行另一个任务-接收(可能需要回复某些) 例如,就像这个工作流一样

  • 我有2个芹菜工人,通过eventlet池,配置如下: 当一次运行100多个任务时,我会遇到以下错误: 操作错误:致命:剩余的连接插槽保留给非复制超级用户连接 我在PostgreSQL上运行,最大连接数设置为默认值100。 从我在网上读到的,我认为池中的工作线程将共享相同的数据库连接。然而,我的似乎尝试和每个线程创建一个连接,这就是错误发生的原因。 有什么想法吗? 谢谢

  • 我最近开始研究分布式计算以提高计算速度。我选择了芹菜。然而,我对一些术语不太熟悉。所以,我有几个相关的问题。 来自芹菜文档: ... Celery通过消息进行通信,通常使用代理在客户机和工作人员之间进行调解。为了启动任务,客户机将消息添加到队列中,然后代理将该消息传递给工作者。 什么是客户端(这里)?什么是经纪商?为什么消息通过代理传递?为什么 Celery 会使用后端和队列进行进程间通信? 当我