当前位置: 首页 > 知识库问答 >
问题:

为每个芹菜工人创建单独的数据库连接

唐沈义
2023-03-14

当工作人员在创建之后执行任务时,我不断遇到wierdmysql问题。

我们使用django 1.3,芹菜3.1.17,djorm ext pool 0.5

我们用并发3启动芹菜进程。到目前为止,我的观察是,当工人流程开始时,他们都得到相同的mysql定义。我们将html" target="_blank">数据库连接id记录如下。

from django.db import connection
connection.cursor()
logger.info("Task %s processing with db connection %s", str(task_id), str(connection.connection.thread_id()))

当所有工作人员都获得任务时,第一个任务成功执行,但其他两个任务会出现奇怪的Mysql错误。它要么是“Mysql服务器消失”的错误,要么是Django抛出“DoesNotExist”错误的情况。显然,Django查询的对象确实存在。

在这个错误之后,每个worker开始获得它自己的数据库连接,之后我们没有发现任何问题。

芹菜的默认行为是什么?是否设计为共享相同的数据库连接。如果是,如何处理进程间通信?理想情况下,我希望每个工作人员有不同的数据库连接。

我尝试了下面链接中提到的代码,但没有成功。芹菜工人数据库连接池

我们还修复了下面建议的芹菜代码。https://github.com/celery/celery/issues/2453

对于那些投反对票的人,请告诉我投反对票的原因。

共有1个答案

华献
2023-03-14

芹菜是用下面的命令启动的

celery -A myproject worker --loglevel=debug --concurrency=3 -Q testqueue

myproject.py作为主进程的一部分,在分支工作进程之前对mysql数据库进行一些查询。

作为主进程中查询流的一部分,django ORM创建一个sqlalchemy连接池(如果它还不存在的话)。然后创建工作进程。

芹菜作为django fixups的一部分关闭现有连接。

    def close_database(self, **kwargs):
    if self._close_old_connections:
        return self._close_old_connections()  # Django 1.6
    if not self.db_reuse_max:
        return self._close_database()
    if self._db_recycles >= self.db_reuse_max * 2:
        self._db_recycles = 0
        self._close_database()
    self._db_recycles += 1

实际上,可能发生的情况是,具有一个未使用的db连接的sqlalChemy池对象在分叉时被复制到3个工作进程。因此,3个不同的池有3个指向相同连接文件描述符的连接对象。

Workers在执行任务时,当请求db连接时,所有Worker从sqlalchemy池获得相同的未使用连接,因为该连接当前未使用。所有连接都指向同一个文件描述符,这一事实导致MySQL连接出现错误。

之后创建的新连接都是新的,并且不指向相同的套接字文件描述符。

解决方案:

在主流程中添加

from django.db import connection
connection.cursor()

在完成任何导入之前,即在添加djorm-ext-pool模块之前。

这样,所有的数据库查询都将使用django在池外创建的连接。当芹菜django修复关闭连接时,连接实际上被关闭,而不是回到炼金术池,在分叉时处理所有工人时,炼金术池中没有连接。之后,当工作人员请求db连接时,sqlalChemy将返回一个新创建的连接。

 类似资料:
  • 我使用芹菜独立(不在Django内)。我计划在多台物理机器上运行一种辅助任务类型。该任务执行以下操作 接受XML文档。 转换它。 进行多个数据库读写 我正在使用PostgreSQL,但这同样适用于使用连接的其他存储类型。过去,我使用数据库连接池来避免在每次请求时创建新的数据库连接,或者避免连接打开时间过长。然而,由于每个芹菜工人都在一个单独的过程中运行,我不确定他们实际上如何能够共享游泳池。我是不

  • 我们正在运行气流1.10.1与芹菜。面对多个打开的连接。在DAG启动时,UI会挂起几分钟。 亮点: 所有节点都是裸金属的:CPU:40,MHz 2494.015,RAM 378G,10Gb NIC- MySQL MySQL连接: Worker.cfg Scheduler.cfg: 另外,我正在运行1000个简单任务,比如或

  • 我有2个芹菜工人,通过eventlet池,配置如下: 当一次运行100多个任务时,我会遇到以下错误: 操作错误:致命:剩余的连接插槽保留给非复制超级用户连接 我在PostgreSQL上运行,最大连接数设置为默认值100。 从我在网上读到的,我认为池中的工作线程将共享相同的数据库连接。然而,我的似乎尝试和每个线程创建一个连接,这就是错误发生的原因。 有什么想法吗? 谢谢

  • 我正在为某个游戏开发某种“分数跟踪器”应用程序。用户添加一定数量的玩家(当前数量是无限的),然后这些玩家的名字被添加到ArrayList中。然后在下一个活动中,用户必须从一个旋转器中选择一个playername,并为该玩家输入一定数量的“点数”,或者说“分数”。 这是我当前的代码: 现在我要做的是实现一个功能,它将跟踪每个球员的得分。 示例:用户添加了2个玩家,一个叫约翰,一个叫杰克。然后用户给约

  • 我正在开发一个Java的REST API,它同时命中多个endpoint,所有这些服务都并行运行。我希望我的应用程序为每个终端使用单独的Spark会话。 问题陈述: 每当我使用或,它会关闭sparkContext本身,因为并行运行的其他服务会失败,并且每当我再次点击该服务时,sparkContext都无法重新启动。 我尝试了以下方法来解决问题: 使用SparkSession singleton对象

  • 问题内容: 我正在尝试创建Windows服务来启动Celery。我碰到了一篇使用Task Scheduler 来做的文章。但是,它似乎启动了许多芹菜实例,并不断消耗内存直到机器死机。有什么方法可以将其作为Windows服务启动吗? 问题答案: 我从另一个网站得到了答案。Celeryd(Celery的守护程序服务)作为粘贴应用程序运行,在这里搜索“ Paster Windows Service”会导