当前位置: 首页 > 工具软件 > Asyncio Mongo > 使用案例 >

mongo 异步 读写分离

皮承基
2023-12-01

使用 AsyncIOMotorClient 异步库,进行读写分离。
读:从数据库不停读数据存到队列,
写: 从本地队列不停批量向数据库写数据。
场景: 数据大的时候,你可以快速将A的数据导入到B中。
可以提高数据插入效率,不必等待写完才读取下一批数据

import asyncio

from custom_conf import MONGODB_URL
from motor.motor_asyncio import AsyncIOMotorClient
from pymongo import UpdateOne


class CombineShop:
    def __init__(self, db: str, collection_read: str, collection_write: str, batch: int = 200):
        """
        初始化
        :param db: mongo database
        :param collection_read: mongo read collection
        :param collection_write: mongo write collection
        :param batch: bulk write
        """

        self.batch = batch

        self.db = db
        self.collection_read = collection_read
        self.collection_write = collection_write

        self.url = MONGODB_URL.format(self.db)
        self.queue = asyncio.Queue()  # 共享队列
        self.finished = False  # 完成标志

    async def run(self):
        """入口"""

        await asyncio.gather(self.read(), self.write())

    def connect(self):
        """连接"""

        connection = AsyncIOMotorClient(self.url)
        return connection[self.db]

    async def read(self):
        """读,生产数据"""

        db = self.connect()

        async for shop in db[self.collection_read].find({'city': {'$ne': '成都'}}, {'_id': 0}):
            shop['shop_id'] = str(shop['shop_id'])
            await self.queue.put({'filter': {'shop_id': shop['shop_id']}, 'update': {'$set': shop}})

        self.finished = True

    async def write(self):
        """写,消费数据"""

        db = self.connect()
        db[self.collection_write].create_index("shop_id", unique=True)

        requests = []

        while not self.finished or not self.queue.empty():
            try:
                kwargs = self.queue.get_nowait()
                requests.append(UpdateOne(upsert=True, **kwargs))
            except asyncio.QueueEmpty:
                await asyncio.sleep(0.1)

            if len(requests) == self.batch:
                await db[self.collection_write].bulk_write(requests)
                requests.clear()

        if requests:
            await db[self.collection_write].bulk_write(requests)


if __name__ == '__main__':
    c = CombineShop(db='your_db', collection_read='collection_read', collection_write='collection_write')
    loop = asyncio.get_event_loop()
    loop.run_until_complete(c.run())
 类似资料: