当前位置: 首页 > 工具软件 > Asyncio Mongo > 使用案例 >

python 通过 asyncio 异步操作 mongo, 可在 jupyter 上运行

卫骏
2023-12-01

导包

import asyncio
import motor.motor_asyncio
import nest_asyncio  # jupyter 上运行需要
nest_asyncio.apply()  # jupyter 上运行需要

创建类,可直接复制使用,可添加相应的方法或者修改

实现了,查询、插入,断开连接,的方法。

查询:可做条件查询,返回的是 [{documentation}, {documentation}, ...]
插入:传入可为 {documentation} ,或者:[{documentation}, {documentation}, ...]

"""operation mongodb"""
class Mongo(object):

    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = object.__new__(cls)
        return cls.__instance

    def __init__(self, host, port, db, user='', pwd=''):

        if user:
            self.client = motor.motor_asyncio.AsyncIOMotorClient(host, port)
            motor.motor_asyncio.AsyncIOMotorClient(f'mongodb://{user}:{pwd}@localhost:27017')
            self.db = self.client[db]
        else:
            # no user, no password
            self.client = motor.motor_asyncio.AsyncIOMotorClient(host, port)
            self.db = self.client[db]

    def insert(self, coll, data):
        """
        insert data
        :param coll:
        :param data: Dict or List(Dict, Dict, )
        :return:
        """
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self.__insert(coll, data))

    async def __insert(self, coll, data):
        c = self.db[coll]

        if isinstance(data, dict):
            res = await c.insert_one(data)
            print(f'inserted {len(res.inserted_ids)} docs in {coll}')
        elif isinstance(data, list):
            res = await c.insert_many(data)
            print(f'inserted {len(res.inserted_ids)} docs in {coll}')

        else:
            print(f"data type only is List or Dict,no {type(data)}")

    def find(self, coll, *args, **kwargs):
        loop = asyncio.get_event_loop()
        return loop.run_until_complete(self.__find(coll, *args, **kwargs))

    async def __find(self, coll, *args, **kwargs):
        c = self.db[coll]
        cursor = c.find(*args, **kwargs)
        return [dic async for dic in cursor]

    def close(self):
        self.client.close()

使用

mongo = Mongo(host=host, port=port, db='库名')  # 有密码的,可u添加: user='', pwd=''
# {"type" : "train"} 条件查询,可不要
# {"type": 1}, 指定返回字段,可不要
train = mongo.find("表名", {"type" : "train"}, {"type": 1}) # 返回[{documentation}, {documentation}, ...]

mongo.insert("表名", data)  # data  `{documentation}`  ,或者:`[{documentation}, {documentation}, ...]`
 类似资料: