pip install mongoengine
from mongoengine import connect
connect(db='mydb')
可选的参数有:
from mongoengine import Document, StringField
class User(Document):
name = StringField()
email = StringField()
meta = {
'db_alias': 'mydb1', #指定数据库
'collection': 'collection', #指定 model 对应的集合名称,默认为 model 的类名。
'ordering': ['-created_at'],#指定 QuerySet 的默认排序顺序。-是降序
'indexes': [{'fields': ['name'], 'unique': True},] #索引配置
}
meta属性常用参数:
'db_alias': 'mydb1',#指定数据库
'collection': 'collection',#指定 model 对应的集合名称,默认为 model 的类名。
'ordering': ['-created_at'],#指定 QuerySet 的默认排序顺序。-是降序
'primary_key':'field', #指定文档字段为主键,可以是 boolean 或 Field 实例
'collection_class':calss, #可以用来指定一个自定义的集合类来代替默认的 QuerySet 类。
'default_values':{'field':'value'}, #指定文档字段为主键,可以是 boolean 或 Field 实例
'indexes': [{'field': ['name'], 'unique': True},] #索引配置
collection_class的用法
默认情况下,Mongoengine 使用 mongoengine.queryset.QuerySet 类作为 model 的集合类。
这个类包含了常用的查询操作,如查询、过滤、排序、分页等。
如果你想为某个特定 model 定义自己的集合类,你可以继承 mongoengine.queryset.QuerySet 类并实现新的方法和属性。
然后在 model 的 meta 属性中指定 collection_class 参数来使用这个类
from mongoengine import queryset
class MyQuerySet(queryset.QuerySet):
def get_adults(self):
return self.filter(age__gte=18)
class User(Document):
name = StringField()
age = IntField()
meta = {
'collection_class': MyQuerySet
}
from model import User
#根据meta的indexes属性创建索引,只需要执行一次即可
User.create_indexes()
#save新增记录
user = User(name='John Doe')
user.save()
#新增一条记录
user = {'name': 'John', 'email': 'john@example.com'}
result = User.objects.insert_one(user)
#新增多条
users = [{'name': 'John', 'email': 'john@example.com'},
{'name': 'Jane', 'email': 'jane@example.com'},
{'name': 'Bob', 'email': 'bob@example.com'}]
result = User.objects.insert_many(users)
from model import User
User.objects(name='John Doe').delete()
from model import User
User.objects(name='John Doe').update(name='Jane Doe')
from model import User
#返回一个只有 name 字段为 Jane Doe 的 QuerySet 对象,它是在数据库中查询符合条件的文档, 如果没有匹配的文档,则返回空的 QuerySet 。
users = User.objects(name='Jane Doe')
#filter查询需要指定返回字段
users = User.objects.filter(name='John').select_related('name', 'email')
#排除某些返回字段
users = User.objects.filter().exclude('is_active')
#使用to_dict()方法将其转换为字典,返回列表
users_list = User.objects.filter(age__gt=20).to_dict()
#使用 to_json() 方法将其转换为 json 格式, 返回字符串
users_json = User.objects.filter(age__gt=20).to_json()
#如果想要将 QuerySet 转换为普通的 Python 列表,可以使用 Python 的 list() 函数
users_list = list(User.objects.filter(age__gt=20))
#分页查询
users = User.objects.skip(10).limit(10)
#使用 count() 方法来统计文档的数量。
total = User.objects.count()
#使用 order_by() 方法来指定查询结果的默认排序顺序,会覆盖model设置的 ordering 属性。-为降序
users = User.objects.order_by('field')
__exists: 检查字段是否存在。例如:MyModel.objects(field__exists=True)
__ne: 检查字段是否不等于给定值。例如:MyModel.objects(field__ne='value')
__lt, __lte, __gt, __gte: 检查字段是否小于、小于等于、大于、大于等于给定值。例如:MyModel.objects(field__lt=5)
__in,__nin: 检查字段是否在给定的列表中(__in)或不在给定的列表中(__nin)。例如: MyModel.objects(field__in=[1,2,3])
__all: 检查字段是否包含给定的所有元素,适用于数组字段。例如:MyModel.objects(field__all=[1,2,3])
__size: 检查字段是否包含给定数量的元素,适用于数组字段。例如:MyModel.objects(field__size=3)
__regex: 检查字段是否匹配给定的正则表达式。例如:MyModel.objects(field__regex='^prefix')
__type: 检查字段的数据类型是否为给定类型。例如:MyModel.objects(field__type=int)
__icontains:它可以检查一个字符串字段是否包含给定的字符串(不区分大小写)。例如:MyModel.objects(field__icontains='some string')
与条件 (and): 两个条件都满足
from mongoengine import Q
users = User.objects(Q(age__gt=20) & Q(name='John'))
或条件 (or): 两个条件中至少一个满足
users = User.objects(Q(age__gt=20) | Q(name='John'))
非条件 (not): 不满足条件
users = User.objects(~Q(age__gt=20))
使用 Q 对象来进行条件组合查询,能够提供更灵活,更强大的查询能力。
可以看到, 查询条件可以通过组合 Q 对象来实现复杂的查询, 这样就不用再在字符串拼接或者字典中进行拼接了
需要注意的是,这里的 Q 查询需要在最后的objects中使用,而不是在filter中使用。
使用 Q 对象和 filter() 方法是不能直接等价的, 不能在 filter() 中直接使用 Q 对象。
Q 对象主要是在查询集合中使用,使用 Q 对象需要传递给 objects() 方法,而 filter() 方法不支持 Q 对象作为参数。
如果你需要使用 Q 对象来进行查询, 可以在 filter() 前面使用 objects() 方法。
from mongoengine import Q
users = User.objects(Q(age__gt=20) & Q(name='John')).filter(age__lt=30)
Mongoengine 还提供了一些自带的聚合方法来进行一些常用的聚合操作,例如:
使用 sum() 计算某个字段的总和。
使用 avg() 计算某个字段的平均值。
使用 min() 和 max() 查询某个字段的最小值和最大值。
例子:
avg_age = User.objects.avg("age") #计算 age 字段的平均值:
自定义聚合查询
pipeline = [
{"$match": {"age": {"$gt": 20}}},
{"$group": {"_id": "$name", "count": {"$sum": 1}}}
]
result = User.objects.aggregate(*pipeline)
这样查询会返回所有年龄大于20岁的用户的 name 字段的统计信息。
在 Mongoengine 中,可以通过 ReferenceField 或者 EmbeddedDocumentField 来表示关联关系。
class User(Document):
name = StringField()
class Order(Document):
user = ReferenceField(User)
items = ListField(StringField())
# Retrieve the user of an order
order = Order.objects.first()
user = order.user
# Retrieve the orders of a user
user = User.objects.first()
orders = Order.objects(user=user)
在 Mongoengine 中,ReferenceField 默认使用另一个文档的 _id 字段作为关联字段。所以在我上面的例子中, Order 文档使用 user 字段来引用 User 文档, 实际上使用的是 User 文档的 _id 字段作为关联字段.
当然可以指定使用其他字段作为关联字段, 通过指定 db_field=‘custom_field_name’ 来进行指定, 可以这样定义:
#这样就可以使用custom_field_name 来进行关联了.
class Order(Document):
user = ReferenceField(User,db_field='custom_field_name')
当你在创建 Order 实例的时候可以将一个 User 实例作为 user 字段的值传入, 例如:
user = User(name='John Doe')
user.save()
order = Order(user=user, items=['item1', 'item2'])
order.save()
也可以使用 User 的 _id 来创建订单
user = User.objects.first()
order = Order(user=user.id, items=['item1', 'item2'])
order.save()
还可以使用字符串形式的 _id 来创建订单
user_id = User.objects.first().id
order = Order(user=user_id, items=['item1', 'item2'])
order.save()
这样就可以创建成功订单,并将user字段关联到对应的user上.
不过 Mongoengine 在保存 Order 实例时会自动检查 user 字段是否为 User 实例或字符串形式的 _id , 如果是,就会将其保存为 ReferenceField 的值.
双下划线连表查询
orders = Order.objects.filter(user__name='John Doe')
#关联的表也可以继续用__操作符查询,下面是查询用户年龄大于等于20的订单
orders = Order.objects.filter(user__age__get=20)
如果你希望对关联的文档进行更复杂的筛选,可以使用 Q 对象来实现.
例如,要查询具有某些特定条件的用户的订单,并且其年龄大于20岁,可以这样做:
from mongoengine import Q
orders = Order.objects.filter(Q(user__name='John Doe') & Q(user__age__gt=20
如果使用 filter() 方法对关联文档进行筛选并且只需要在筛选完之后访问其关联文档一次,那么不需要使用 select_related() 或 prefetch_related()。
但是,使用 select_related() 或 prefetch_related() 可以提高查询效率.
例如,如果在筛选完之后要访问多次关联文档,那么就可以使用select_related() 优化性能,如下:
orders = Order.objects.select_related("user").filter(user__name='John Doe')
for order in orders:
print(order.user.name)
如果你需要在多次查询中访问关联文档,你就需要使用 prefetch_related()
总结来说,使用 filter() 方法对关联文档进行筛选可以实现连表查询,但在有需要时使用 select_related() 或 prefetch_related() 可以提高查询性能。
这两个方法的用法基本一致,都是在使用关联查询的时候进行配置。
在 Mongoengine 中,没有内置的事务支持,如果要在多个数据库操作之间保证原子性,可以使用 “two-phase commit” 的思想来实现。
“Two-phase commit” 是一种分布式事务协议,它分为两个阶段:“prepare” 和 “commit”。在 “prepare” 阶段,事务管理器会向每个参与者发送 “prepare” 消息,询问参与者是否准备好提交事务。如果所有参与者都准备好了,那么事务管理器会向所有参与者发送 “commit” 消息,命令他们提交事务;如果有任何一个参与者不准备好,那么事务管理器会向所有参与者发送 “abort” 消息,命令他们回滚事务。
使用这种方式,可以让你自己来管理事务,通过捕获异常来回滚操作。
from model import User
user = User(name='John Doe')
try:
user.save()
#do other save operations here
#if all the operations are successfull then
commit()
except Exception as e:
#do any operation you want to rollback the previously successful save operations
但是,这种方法有一些缺点,因为它需要自己手动管理事务。如果需要更复杂的事务支持,可以考虑使用第三方的 MongoDB ODM 库,如 mongoengine-transactions 。
安装
pip install mongoengine-transactions
from mongoengine_transactions import Transaction
try:
with Transaction():
user = User(name='John Doe')
user.save()
# other database operations
# if all the operations are successful then
Transaction.commit()
except Exception as e:
Transaction.rollback()
MongoDB 3.x版本中,事务功能是限制在单个文档上的,也就是说在单个文档上支持事务,但是不支持多文档事务。
使用事务的前提是要开启事务支持,而且要使用 replica sets 以及要开启WiredTiger存储引擎。
MongoDB 4.0 及更高版本中,支持多文档事务,即可以在多个集合或多个数据库中执行原子操作。
需要注意的是,在执行事务操作前需要确保MongoDB服务器版本是4.0或更高,并且已经在配置文件中启用了事务功能。
在 Mongoengine 中,时间字段默认使用UTC时间来存储和读取,并且默认是考虑时区的。
如果你的应用程序不需要考虑时区信息,并且只需要使用本地时间,你可以在读取时间时将其转换为本地时间,在保存时间时将其转换为UTC时间。
如果你想更改时区,你可以在应用程序中使用pytz库来管理时区信息.
from datetime import datetime
from pytz import timezone
local_tz = timezone('Asia/Shanghai')
local_time = datetime.now(local_tz)
# save to mongodb
my_model.my_date = local_time.astimezone(pytz.utc)
my_model.save()
# read from mongodb
utc_time = my_model.my_date
local_time = utc_time.replace(tzinfo=pytz.utc).astimezone(local_tz)