import asyncio
import motor.motor_asyncio
import nest_asyncio # jupyter 上运行需要
nest_asyncio.apply() # jupyter 上运行需要
实现了,查询、插入,断开连接,的方法。
查询:可做条件查询,返回的是 [{documentation}, {documentation}, ...]
插入:传入可为 {documentation}
,或者:[{documentation}, {documentation}, ...]
"""operation mongodb"""
class Mongo(object):
__instance = None
def __new__(cls, *args, **kwargs):
if not cls.__instance:
cls.__instance = object.__new__(cls)
return cls.__instance
def __init__(self, host, port, db, user='', pwd=''):
if user:
self.client = motor.motor_asyncio.AsyncIOMotorClient(host, port)
motor.motor_asyncio.AsyncIOMotorClient(f'mongodb://{user}:{pwd}@localhost:27017')
self.db = self.client[db]
else:
# no user, no password
self.client = motor.motor_asyncio.AsyncIOMotorClient(host, port)
self.db = self.client[db]
def insert(self, coll, data):
"""
insert data
:param coll:
:param data: Dict or List(Dict, Dict, )
:return:
"""
loop = asyncio.get_event_loop()
loop.run_until_complete(self.__insert(coll, data))
async def __insert(self, coll, data):
c = self.db[coll]
if isinstance(data, dict):
res = await c.insert_one(data)
print(f'inserted {len(res.inserted_ids)} docs in {coll}')
elif isinstance(data, list):
res = await c.insert_many(data)
print(f'inserted {len(res.inserted_ids)} docs in {coll}')
else:
print(f"data type only is List or Dict,no {type(data)}")
def find(self, coll, *args, **kwargs):
loop = asyncio.get_event_loop()
return loop.run_until_complete(self.__find(coll, *args, **kwargs))
async def __find(self, coll, *args, **kwargs):
c = self.db[coll]
cursor = c.find(*args, **kwargs)
return [dic async for dic in cursor]
def close(self):
self.client.close()
mongo = Mongo(host=host, port=port, db='库名') # 有密码的,可u添加: user='', pwd=''
# {"type" : "train"} 条件查询,可不要
# {"type": 1}, 指定返回字段,可不要
train = mongo.find("表名", {"type" : "train"}, {"type": 1}) # 返回[{documentation}, {documentation}, ...]
mongo.insert("表名", data) # data `{documentation}` ,或者:`[{documentation}, {documentation}, ...]`