使用 AsyncIOMotorClient 异步库,进行读写分离。
读:从数据库不停读数据存到队列,
写: 从本地队列不停批量向数据库写数据。
场景: 数据大的时候,你可以快速将A的数据导入到B中。
可以提高数据插入效率,不必等待写完才读取下一批数据
import asyncio
from custom_conf import MONGODB_URL
from motor.motor_asyncio import AsyncIOMotorClient
from pymongo import UpdateOne
class CombineShop:
def __init__(self, db: str, collection_read: str, collection_write: str, batch: int = 200):
"""
初始化
:param db: mongo database
:param collection_read: mongo read collection
:param collection_write: mongo write collection
:param batch: bulk write
"""
self.batch = batch
self.db = db
self.collection_read = collection_read
self.collection_write = collection_write
self.url = MONGODB_URL.format(self.db)
self.queue = asyncio.Queue() # 共享队列
self.finished = False # 完成标志
async def run(self):
"""入口"""
await asyncio.gather(self.read(), self.write())
def connect(self):
"""连接"""
connection = AsyncIOMotorClient(self.url)
return connection[self.db]
async def read(self):
"""读,生产数据"""
db = self.connect()
async for shop in db[self.collection_read].find({'city': {'$ne': '成都'}}, {'_id': 0}):
shop['shop_id'] = str(shop['shop_id'])
await self.queue.put({'filter': {'shop_id': shop['shop_id']}, 'update': {'$set': shop}})
self.finished = True
async def write(self):
"""写,消费数据"""
db = self.connect()
db[self.collection_write].create_index("shop_id", unique=True)
requests = []
while not self.finished or not self.queue.empty():
try:
kwargs = self.queue.get_nowait()
requests.append(UpdateOne(upsert=True, **kwargs))
except asyncio.QueueEmpty:
await asyncio.sleep(0.1)
if len(requests) == self.batch:
await db[self.collection_write].bulk_write(requests)
requests.clear()
if requests:
await db[self.collection_write].bulk_write(requests)
if __name__ == '__main__':
c = CombineShop(db='your_db', collection_read='collection_read', collection_write='collection_write')
loop = asyncio.get_event_loop()
loop.run_until_complete(c.run())