#-*-coding:utf-8-*-
from pymongo import MongoClient
class TestMongo:
def __init__(self):
client = MongoClient(host="127.0.0.1",port=27017)
self.collection = client["test"]["t1"] # 使用方括号的方式选择数据库和集合
def test_insert(self):
# insert_one接收字典, 返回objectId
ret = self.collection.insert_one({"name": "test100010","age":33})
print(ret)
def test_insert_many(self):
item_list = [{"name": "test1000{}".format(i)} for i in range(100)]
# insert_many接收一个列表,列表中为所有需要插入的字典
t = self.collection.insert_many(item_list)
# t.inserted_ids为所有插入的id
for i in t.inserted_ids:
print(i)
def try_find_one(self):
# find_one查找并且返回一个结果,接收一个字典形式的条件
t = self.collection.find_one({"name":"test10005"})
print(t)
def try_find_many(self):
# find返回所有满足条件的结果,如果条件为空,则返回数据库的所有数据
t = self.collection.find({"name":{"$regex":"^test\*"}})
# 结果是一个Cursor游标对象,是一个可迭代对象,可以类似读文件的指针
for i in t:
print(i)
def try_update_one(self):
# update_one更新一条数据
self.collection.update_one({"name":"test10001"},{"$set":{"name":"test**"}})
def try_update_many(self):
# update_one更新全部数据
self.collection.update_many({"name":"test22"},{"$set":{"name":"new_test"}})
def try_delete_one(self):
# delete_one删除一条数据
self.collection.delete_one({"name":"test100010"})
def try_delete_many(self):
# delete_many删除所有满足条件的数据
self.collection.delete_many({"name":{"$regex":"test1000.*?"}})
def try_aggregate(self):
# aggregate()的第一个参数接收一个列表,列表中的内容写法同mongodb
t = self.collection.aggregate([{"$group":{"_id":"$name","count":{"$sum":1}}},{"$sort":{"counter":-1}}])
print(list(t))
if __name__ == "__main__":
mongo = TestMongo()
# mongo.test_insert_many()
# mongo.test_insert()
# mongo.try_update_one()
# mongo.try_find_many()
# mongo.try_delete_many()
mongo.try_aggregate()