当前位置: 首页 > 面试题库 >

SQLAlchemy和多个进程的连接问题

苍宝
2023-03-14
问题内容

我在一个由启动子进程的主进程组成的项目中使用PostgreSQL和SQLAlchemy。所有这些过程都通过SQLAlchemy访问数据库。

我遇到了可重复的连接失败:前几个子进程正常工作,但是过一会儿出现连接错误。这是MWCE:

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, create_engine
from sqlalchemy.orm import sessionmaker

DB_URL = 'postgresql://user:password@localhost/database'

Base = declarative_base()

class Dummy(Base):
    __tablename__ = 'dummies'
    id = Column(Integer, primary_key=True)
    value = Column(Integer)

engine = None
Session = None
session = None

def init():
    global engine, Session, session
    engine = create_engine(DB_URL)
    Base.metadata.create_all(engine)
    Session = sessionmaker(bind=engine)
    session = Session()

def cleanup():
    session.close()
    engine.dispose()

def target(id):
    init()
    try:
        dummy = session.query(Dummy).get(id)
        dummy.value += 1
        session.add(dummy)
        session.commit()
    finally:
        cleanup()

def main():
    init()
    try:
        dummy = Dummy(value=1)
        session.add(dummy)
        session.commit()
        p = multiprocessing.Process(target=target, args=(dummy.id,))
        p.start()
        p.join()
        session.refresh(dummy)
        assert dummy.value == 2
    finally:
        cleanup()

if __name__ == '__main__':
    i = 1
    while True:
        print(i)
        main()
        i += 1

在我的系统上(PostgreSQL 9.6,SQLAlchemy 1.1.4,psycopg2 2.6.2,Python 2.7,Ubuntu
14.04),这会产生

1
2
3
4
5
6
7
8
9
10
11
Traceback (most recent call last):
  File "./fork_test.py", line 64, in <module>
    main()
  File "./fork_test.py", line 55, in main
    session.refresh(dummy)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1422, in refresh
    only_load_props=attribute_names) is None:
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 223, in load_on_ident
    return q.one()
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2756, in one
    ret = self.one_or_none()
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2726, in one_or_none
    ret = list(self)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2797, in __iter__
    return self._execute_and_instances(context)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2820, in _execute_and_instances
    result = conn.execute(querycontext.statement, self._params)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 945, in execute
    return meth(self, multiparams, params)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/sql/elements.py", line 263, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1053, in _execute_clauseelement
    compiled_sql, distilled_params
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1189, in _execute_context
    context)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1393, in _handle_dbapi_exception
    exc_info
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 202, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_context
    context)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/default.py", line 469, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly
    This probably means the server terminated abnormally
    before or while processing the request.
 [SQL: 'SELECT dummies.id AS dummies_id, dummies.value AS dummies_value \nFROM dummies \nWHERE dummies.id = %(param_1)s'] [parameters: {'param_1': 11074}]

这是可重复的,并且总是在同一迭代中崩溃。

我正在按照SQLAlchemy文档和其他地方的建议在派生后创建新的引擎和会话。有趣的是,以下略有不同的方法不会崩溃:

import contextlib
import multiprocessing

import sqlalchemy
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, create_engine
from sqlalchemy.orm import sessionmaker

DB_URL = 'postgresql://user:password@localhost/database'

Base = declarative_base()

class Dummy(Base):
    __tablename__ = 'dummies'
    id = Column(Integer, primary_key=True)
    value = Column(Integer)

@contextlib.contextmanager
def get_session():
    engine = sqlalchemy.create_engine(DB_URL)
    Base.metadata.create_all(engine)
    Session = sessionmaker(bind=engine)
    session = Session()
    try:
        yield session
    finally:
        session.close()
        engine.dispose()

def target(id):
    with get_session() as session:
        dummy = session.query(Dummy).get(id)
        dummy.value += 1
        session.add(dummy)
        session.commit()

def main():
    with get_session() as session:
        dummy = Dummy(value=1)
        session.add(dummy)
        session.commit()
        p = multiprocessing.Process(target=target, args=(dummy.id,))
        p.start()
        p.join()
        session.refresh(dummy)
        assert dummy.value == 2

if __name__ == '__main__':
    i = 1
    while True:
        print(i)
        main()
        i += 1

由于原始代码更加复杂,无法简单地切换到后一个版本,因此我想了解为什么其中一个有效而另一个无效。

唯一明显的区别是,崩溃的代码对引擎和会话使用了全局变量-它们通过子进程的写时复制共享。但是,由于我在分叉后立即将它们重置,所以我不明白这可能是个问题。

我使用最新的SQLAlchemy(1.1.5)和Python 2.7和Python
3.4重新运行了这两个代码段。两者的结果基本上如上所述。但是,在Python
2.7上,第一个代码段的崩溃现在发生在第13个迭代中(可重现),而在3.4上,它已经在第三次迭代中发生了(可重现)。第二个代码段在两个版本上都运行正常。这是对3.4的追溯:

1
2
3
Traceback (most recent call last):
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_context
    context)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/default.py", line 470, in do_execute
    cursor.execute(statement, parameters)
psycopg2.OperationalError: server closed the connection unexpectedly
    This probably means the server terminated abnormally
    before or while processing the request.


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "fork_test.py", line 64, in <module>
    main()
  File "fork_test.py", line 55, in main
    session.refresh(dummy)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/session.py", line 1424, in refresh
    only_load_props=attribute_names) is None:
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/loading.py", line 223, in load_on_ident
    return q.one()
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2749, in one
    ret = self.one_or_none()
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2719, in one_or_none
    ret = list(self)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2790, in __iter__
    return self._execute_and_instances(context)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2813, in _execute_and_instances
    result = conn.execute(querycontext.statement, self._params)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 945, in execute
    return meth(self, multiparams, params)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/sql/elements.py", line 263, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1053, in _execute_clauseelement
    compiled_sql, distilled_params
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1189, in _execute_context
    context)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1393, in _handle_dbapi_exception
    exc_info
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/util/compat.py", line 203, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/util/compat.py", line 186, in reraise
    raise value.with_traceback(tb)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_context
    context)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/default.py", line 470, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly
    This probably means the server terminated abnormally
    before or while processing the request.
 [SQL: 'SELECT dummies.id AS dummies_id, dummies.value AS dummies_value \nFROM dummies \nWHERE dummies.id = %(param_1)s'] [parameters: {'param_1': 3397}]

这是PostgreSQL日志(2.7和3.4相同):

2017-01-18 10:59:36 UTC [22429-1] LOG:  database system was shut down at 2017-01-18 10:59:35 UTC
2017-01-18 10:59:36 UTC [22429-2] LOG:  MultiXact member wraparound protections are now enabled
2017-01-18 10:59:36 UTC [22428-1] LOG:  database system is ready to accept connections
2017-01-18 10:59:36 UTC [22433-1] LOG:  autovacuum launcher started
2017-01-18 10:59:36 UTC [22435-1] [unknown]@[unknown] LOG:  incomplete startup packet
2017-01-18 11:00:10 UTC [22466-1] user@db LOG:  SSL error: decryption failed or bad record mac
2017-01-18 11:00:10 UTC [22466-2] user@db LOG:  could not receive data from client: Connection reset by peer

(请注意,有关不完整的启动数据包的消息是无害的)


问题答案:

引用“如何在Python多处理或os.fork()中使用引擎/连接/会话?”

特别强调:

SQLAlchemy Engine对象引用现有数据库连接的连接池。因此,当将此对象复制到子进程时, 目标是确保没有数据库连接被继承

但是,对于共享事务活动会话或连接的情况,没有自动解决方案。应用程序需要确保新的子进程仅启动新的Connection对象和事务以及ORM
Session对象。

问题源自分叉的子进程继承了live global
session,而live全局仍保持不变Connection。当target调用时init,它将覆盖对engine和的全局引用session,从而将其子代中的refcounts减少为0,从而迫使它们最终确定。例如,如果您以一种方式或另一种方式在子代中创建对继承的会话的另一个引用,则可以防止对该子代进行清理,但不要这样做。后main加入并返回到一切如常它试图用现在完成潜在的-
或以其他方式不同步-连接。至于为什么这只会在经过一定数量的迭代后才导致错误,我不确定。

使用全局变量处理这种情况的唯一方法是

  1. 关闭所有会议
  2. 呼叫 engine.dispose()

在分叉之前。这将防止连接泄漏到孩子。例如:

def main():
    global session
    init()
    try:
        dummy = Dummy(value=1)
        session.add(dummy)
        session.commit()
        dummy_id = dummy.id
        # Return the Connection to the pool
        session.close()
        # Dispose of it!
        engine.dispose()
        # ...or call your cleanup() function, which does the same
        p = multiprocessing.Process(target=target, args=(dummy_id,))
        p.start()
        p.join()
        # Start a new session
        session = Session()
        dummy = session.query(Dummy).get(dummy_id)
        assert dummy.value == 2
    finally:
        cleanup()

您的第二个示例不会在子级中触发终结处理,因此它似乎只能工作,尽管它可能与第一个示例一样坏,因为它仍继承会话的副本及其在本地定义的连接main



 类似资料:
  • 我对连接池的理解是;如果connectionstring完全相同,那么我们重用该连接,而不是建立新的连接。 我的问题是,我正在为并行处理创建许多线程。在这个“虚拟”程序中,我创建了500个线程,并让线程池函数处理这些线程。 步骤是: > < li> 每个线程在SQL中创建一个更新表。(说明更新的时间戳) 然后线程Hibernate1到10秒(随机)。 最后,线程在 SQL 中进行另一次更新(说明结

  • 我正在使用Symfony 3和教义2.5。总共有三张表: 给定公司id 2,最终结果应该是这样,但就我个人而言,我无法为它编写一个有效的原则QueryBuilder方法 我有三个可以工作的MySQL查询,它们为我带来了想要的结果。但就我的一生而言,我无法将任何一个问题转换成至少一个工作原理问题。 使用MySQL查询(这实际上可能会帮助这里的一些用户): 我在Doctrine的QueryBuilde

  • 在我的公司项目中,我们使用 Hikari 进行连接池管理。[Hibernate 4.3.5 Spring 4 Java 1.8]下面是配置 我们面临的问题二是以下问题 > 出于某种原因,Hikari正在创建2个连接池HikariPool-1和HikariPool-2。 只有HikariPool-2正在使用,甚至出现错误一段时间后

  • 本文向大家介绍sqlalchemy 使用连接,包括了sqlalchemy 使用连接的使用技巧和注意事项,需要的朋友参考一下 示例 您可以使用上下文管理器打开连接(即从池中请求一个连接): 有无,但必须手动将其关闭:            

  • 分布式模式下Kafka Connect集群的偏移管理行为是什么,即运行多个连接器并监听同一组主题(或一个主题)? 因此,在分布式模式下,Kafka Connect 会将偏移量信息存储在 Kafka 中,此偏移量将由集群中的工作线程读取和提交。如果我在该 Kafka Connect 集群中运行多个连接器侦听同一主题,会发生什么情况?分区的偏移量是否与所有连接器相同,或者每个连接器在分区上的偏移量是否

  • 问题内容: 我使用SQLAlchemy并至少有三个实体:,并且,其中有方法,所以如果我如想选择所有记录,从我能做到这一点 和这个 甚至这个 -结果将是相同的。 据我了解,如果有人使用它,它会创建,打开(Alchemy会为您处理)并执行查询。但是,执行此任务的这三种方式之间是否存在全局差异? 问题答案: 单行概述: 的行为是在所有情况下相同,但它们是3种不同的方法,在,和类。 到底是什么: 要了解行