当前位置: 首页 > 工具软件 > Asyncio Mongo > 使用案例 >

Motor Mongo 在多进程下的使用

郎飞航
2023-12-01

进程的创建

在一个新进程执行一个新的程序需要两个步骤:

  • 创建一个新进程 (fork)
  • 通过 exec 系统调用把新的二进制程序加载到该进程

在早期的 UNIX 系统中,调用fork()时,内核会复制所有的内部数据结构. 现在的 UNIX 系统采用 copy-on-write(COW) 的惰性算法的优化策略. fork 新进程的时候,会共享同一个副本的内容. 如果有进行 write 操作,才开始拷贝内存,这样可以减少资源的浪费.

ProcessPoolExector

Python3中的concurrent.futures.ProcessPoolExecutor模块可以非常方便的进行多进程编程. ProcessPoolExecutor是基于multiprocessing模块. multiprocessing模块在linux操作系统下使用fork创建子进程. 其他操作系统下还有另外两种新建多进程的模式: spawn, fork server

Pymongo

Pymongo 是线程安全的, 提供了内置的连接池供多线程应用使用,但不是fork-safe的. Pymongo会产生多个线程来跑后台任务,比如保活connections.这些后台线程通过锁来共享状态,而这些锁本身不是fork-safe的.
经过 fork 之后, Lock 包括 Lock 的状态都会被复制到子进程,所以当子进程需要用到 Lock 的时候, 有可能造成死锁.

Motor Mongo

Motor Mongo 是基于 Pymongo,可以在 Tornado 和 Asyncio 里使用的异步 Mongodb 库.在Motor-Asyncio中, Motor使用ThreadPoolExecutor将同步阻塞的pymongo请求放在多个线程中,通过callback回调来达到异步的效果.
Motor的核心函数:

def asynchronize(framework, sync_method, doc=None):
    """Decorate `sync_method` so it accepts a callback or returns a Future.

    The method runs on a thread and calls the callback or resolves
    the Future when the thread completes.

    :Parameters:
     - `motor_class`:       Motor class being created, e.g. MotorClient.
     - `framework`:         An asynchronous framework
     - `sync_method`:       Unbound method of pymongo Collection, Database,
                            MongoClient, etc.
     - `doc`:               Optionally override sync_method's docstring
    """
    @functools.wraps(sync_method)
    def method(self, *args, **kwargs):
        loop = self.get_io_loop()
        callback = kwargs.pop('callback', None)
        future = framework.run_on_executor(loop,
                                           sync_method,
                                           self.delegate,
                                           *args,
                                           **kwargs)

        return framework.future_or_callback(future, callback, loop)

    # This is for the benefit of motor_extensions.py, which needs this info to
    # generate documentation with Sphinx.
    method.is_async_method = True
    name = sync_method.__name__
    method.pymongo_method_name = name
    if doc is not None:
        method.__doc__ = doc

    return method

官方文档里说明: Motor 不支持 Multithreading 和 forking , 只能在单线程中使用.

不过我在测试中,如果只是对db做一些查询操作,在多进程多协程的环境下跑了 600w 数据,目前还没有碰到问题.

 类似资料: