当前位置: 首页 > 知识库问答 >
问题:

芹菜工作者数据库连接池

韩智明
2023-03-14

我使用芹菜独立(不在Django内)。我计划在多台物理机器上运行一种辅助任务类型。该任务执行以下操作

  1. 接受XML文档。
  2. 转换它。
  3. 进行多个数据库读写

我正在使用PostgreSQL,但这同样适用于使用连接的其他存储类型。过去,我使用数据库连接池来避免在每次请求时创建新的数据库连接,或者避免连接打开时间过长。然而,由于每个芹菜工人都在一个单独的过程中运行,我不确定他们实际上如何能够共享游泳池。我是不是漏了什么?我知道芹菜允许你坚持从芹菜工人返回的结果,但这不是我在这里试图做的。根据处理的数据,每个任务可以执行几个不同的更新或插入。

从芹菜工人内部访问数据库的正确方法是什么?

是否可以跨多个员工/任务共享一个池,或者是否有其他方法来实现这一点?

共有3个答案

邓夕
2023-03-14

您可以在芹菜配置中覆盖默认行为,使每个进程具有线程工作线程,而不是工作线程:

CELERYD\u POOL=“celery.concurrency.threads.TaskPool”

然后,您可以将共享池实例存储在任务实例上,并从每个线程化任务调用中引用它。

杨乐意
2023-03-14

每个辅助进程有一个DB连接。由于芹菜本身维护一个辅助进程池,因此您的数据库连接将始终等于芹菜辅助进程的数量。从反面来说,它将把数据库连池与芹菜工人的过程管理联系起来。但是考虑到GIL在一个进程中一次只允许一个线程,这应该没问题。

古凌
2023-03-14

我喜欢tigeronk2关于每个工人一个连接的想法。正如他所说,芹菜维护自己的工作人员池,因此实际上不需要单独的数据库连接池。芹菜信号文档解释了如何在创建worker时进行自定义初始化,因此我将以下代码添加到了我的tasks.py中,它的工作方式似乎与您预期的完全相同。我甚至可以在工人停工时关闭连接:

from celery.signals import worker_process_init, worker_process_shutdown

db_conn = None

@worker_process_init.connect
def init_worker(**kwargs):
    global db_conn
    print('Initializing database connection for worker.')
    db_conn = db.connect(DB_CONNECT_STRING)


@worker_process_shutdown.connect
def shutdown_worker(**kwargs):
    global db_conn
    if db_conn:
        print('Closing database connectionn for worker.')
        db_conn.close()
 类似资料:
  • 我有2个芹菜工人,通过eventlet池,配置如下: 当一次运行100多个任务时,我会遇到以下错误: 操作错误:致命:剩余的连接插槽保留给非复制超级用户连接 我在PostgreSQL上运行,最大连接数设置为默认值100。 从我在网上读到的,我认为池中的工作线程将共享相同的数据库连接。然而,我的似乎尝试和每个线程创建一个连接,这就是错误发生的原因。 有什么想法吗? 谢谢

  • 当工作人员在创建之后执行任务时,我不断遇到wierdmysql问题。 我们使用django 1.3,芹菜3.1.17,djorm ext pool 0.5 我们用并发3启动芹菜进程。到目前为止,我的观察是,当工人流程开始时,他们都得到相同的mysql定义。我们将数据库连接id记录如下。 当所有工作人员都获得任务时,第一个任务成功执行,但其他两个任务会出现奇怪的Mysql错误。它要么是“Mysql服

  • 我们正在运行气流1.10.1与芹菜。面对多个打开的连接。在DAG启动时,UI会挂起几分钟。 亮点: 所有节点都是裸金属的:CPU:40,MHz 2494.015,RAM 378G,10Gb NIC- MySQL MySQL连接: Worker.cfg Scheduler.cfg: 另外,我正在运行1000个简单任务,比如或

  • 我最近开始研究分布式计算以提高计算速度。我选择了芹菜。然而,我对一些术语不太熟悉。所以,我有几个相关的问题。 来自芹菜文档: ... Celery通过消息进行通信,通常使用代理在客户机和工作人员之间进行调解。为了启动任务,客户机将消息添加到队列中,然后代理将该消息传递给工作者。 什么是客户端(这里)?什么是经纪商?为什么消息通过代理传递?为什么 Celery 会使用后端和队列进行进程间通信? 当我

  • 现在,我想将< code>register事件发布到某个特殊的交换,我可以使用celery远程检索和处理它。 实际上,我已经使用了函数来实现这一点,但是它必须传递来指示应该执行哪个任务并消费它。所以它似乎不太适合我的目标。 我想要的就是这样: 向某些发布消息; 远程机器1订阅此或并捕获消息,用于执行任务; 远程机器2-与机器1相同但执行另一个任务-接收(可能需要回复某些) 例如,就像这个工作流一样

  • 我对java非常陌生。所以如果我犯了愚蠢的错误,我很抱歉。我正在尝试执行servlet中的以下代码,但是在我将值提交到我在索引中创建的登录表单之后,什么也没有发生。html。另外,我还为会话创建了另一个servlet。预期的输出是重定向到我从数据库验证用户名和密码后创建的另一个jsp页面