在一个新进程执行一个新的程序需要两个步骤:
在早期的 UNIX 系统中,调用fork()时,内核会复制所有的内部数据结构. 现在的 UNIX 系统采用 copy-on-write(COW) 的惰性算法的优化策略. fork 新进程的时候,会共享同一个副本的内容. 如果有进行 write 操作,才开始拷贝内存,这样可以减少资源的浪费.
Python3中的concurrent.futures.ProcessPoolExecutor模块可以非常方便的进行多进程编程. ProcessPoolExecutor是基于multiprocessing模块. multiprocessing模块在linux操作系统下使用fork
创建子进程. 其他操作系统下还有另外两种新建多进程的模式: spawn
, fork server
Pymongo 是线程安全的, 提供了内置的连接池供多线程应用使用,但不是fork-safe的. Pymongo会产生多个线程来跑后台任务,比如保活connections.这些后台线程通过锁来共享状态,而这些锁本身不是fork-safe的.
经过 fork 之后, Lock 包括 Lock 的状态都会被复制到子进程,所以当子进程需要用到 Lock 的时候, 有可能造成死锁.
Motor Mongo 是基于 Pymongo,可以在 Tornado 和 Asyncio 里使用的异步 Mongodb 库.在Motor-Asyncio中, Motor使用ThreadPoolExecutor将同步阻塞的pymongo请求放在多个线程中,通过callback回调来达到异步的效果.
Motor的核心函数:
def asynchronize(framework, sync_method, doc=None):
"""Decorate `sync_method` so it accepts a callback or returns a Future.
The method runs on a thread and calls the callback or resolves
the Future when the thread completes.
:Parameters:
- `motor_class`: Motor class being created, e.g. MotorClient.
- `framework`: An asynchronous framework
- `sync_method`: Unbound method of pymongo Collection, Database,
MongoClient, etc.
- `doc`: Optionally override sync_method's docstring
"""
@functools.wraps(sync_method)
def method(self, *args, **kwargs):
loop = self.get_io_loop()
callback = kwargs.pop('callback', None)
future = framework.run_on_executor(loop,
sync_method,
self.delegate,
*args,
**kwargs)
return framework.future_or_callback(future, callback, loop)
# This is for the benefit of motor_extensions.py, which needs this info to
# generate documentation with Sphinx.
method.is_async_method = True
name = sync_method.__name__
method.pymongo_method_name = name
if doc is not None:
method.__doc__ = doc
return method
在官方文档里说明: Motor 不支持 Multithreading 和 forking , 只能在单线程中使用.
不过我在测试中,如果只是对db做一些查询操作,在多进程多协程的环境下跑了 600w 数据,目前还没有碰到问题.