当前位置: 首页 > 工具软件 > Mongodb Motor > 使用案例 >

Python操作MongoDb与Redis以及ODM

宦博超
2023-12-01

前言:

今天在回家的火车上把这篇文章写了~

上篇文章连接:https://blog.csdn.net/weixin_51485807/article/details/122568825

祝大家新年快乐吧~具体看上篇前言

首发于:https://sleepymonster.cn

Python操作MongoDb

$ pip install pymongo

这里的能满足基本需要,要聚合查询或者整花活记得取官方文档去看看

import pymongo
from bson import ObjectId
from pymongo import MongoClient

# 连接数据库
client = MongoClient("mongodb://localhost:27017/")

# 管理数据库
pythonDb = client.get_database("python")  # 通过逻辑库的名字找到
listAll = client.list_databases()  # 查看所有数据库
listNameAll = client.list_database_names()  # 查看所有数据库名称
client.drop_database("temp")  # 删除数据库

# 管理集合
listAllName = pythonDb.list_collection_names()  # 查看集合名称
main = pythonDb.get_collection("python_main")  # 拿到集合
data = main.find_one()  # 拿到数据

# 新增文档
doc1 = {  # 定义一个
    'name': 'world1',
    'age': 20,
    'hobby': {
        'up': 'ball',
        'down': 'run'
    }
}
doc2 = {  # 定义一个
    'name': 'world2',
    'age': 20,
    'hobby': {
        'up': 'ball',
        'down': 'run'
    }
}
doc3 = {  # 定义一个
    'name': 'world3',
    'age': 20,
    'hobby': {
        'up': 'ball',
        'down': 'run'
    }
}
res = main.insert_one(doc1)  # 拿到那个集合
resId = res.inserted_id  # 会返回主键id
docList = [doc2, doc3]  # 准备批量插入
resList = main.insert_many(docList)
resListId = resList.inserted_ids  # 列表形式

#  查询文档
findOne = main.find_one()  # 查询一条数据
findByMainKey = main.find_one({'_id': ObjectId('61ea4b7af17f38a6ff270a9f')})  # 根据主键进行查询
findMany = main.find()  # 查询多条
findManyByOrder1 = main.find({}, {'name': 1})  # 多条查询的条件查询(只返回name列)
findManyByOrder2 = main.find({'age': {'$gt': 19}}, {'name': 1, 'age': 1})  # 多条查询的条件查询 年龄大于19的
findPage = main.find().skip(20).limit(10)  # 进行分页 跳过前2页取10条数据
findOrder1 = main.find().sort('age', pymongo.ASCENDING)  # 进行排序
findOrder2 = main.find({"name": "world1"}).sort('age', pymongo.ASCENDING)  # 条件排序
findOrder3 = main.find({"name": "world1"}).sort([('age', pymongo.ASCENDING), ('name', pymongo.ASCENDING)])  # 多列排序
findCount = main.count_documents({})  # 文档总数

# 更新文档
res = main.update_one({}, {'$set': {'age': 19}})  # 更新
resUpdate = main.replace_one({}, {"newName": "Hello"})
resUpdateMany = main.update_many({}, {'$set': {'age': 19}})  # 批量更新
main.find_one_and_update({}, {'$set': {'age': 21}})  # 返回的是文档
main.find_one_and_replace({}, {"newName": "Hello World"})

# 删除文档
resDel = main.delete_one({})  # 根据条件查询并且删除第一条
redDelFind = main.find_one_and_delete({})  # 如果删除且返回此文档
# resDelMany = main.delete_many({})  # 根据条件删除

ODM操作MongoDb

$ pip install mongoengine
import re
import time
from enum import Enum
from mongoengine import connect, Document, EmbeddedDocument, ValidationError
from mongoengine.queryset.visitor import Q
from mongoengine.fields import IntField, StringField, EnumField, ListField, EmbeddedDocumentField

connect("students", host='mongodb://localhost:27017/students')


# 自定义一个验证器
def phone_required(value):
    pattern = r'^1[0-9]{10}$'
    if not re.search(pattern, value):
        raise ValidationError('请输出手机号码')


class SexChoices(Enum):
    MEN = '男'
    WOMEN = '女'


class CourseGrade(EmbeddedDocument):  # 被嵌套的文档
    course_name = StringField(max_length=64, required=True, verbose_name='课程名称')
    teacher = StringField(max_length=16, verbose_name='老师')
    age = IntField(min_value=0, max_value=100, required=True, verbose_name='分数')


class Student(Document):
    stu_no = IntField(required=True, unique=True, verbose_name='学号')
    stu_name = StringField(required=True, max_length=16, verbose_name='姓名')
    sex = EnumField(enum=SexChoices, default=SexChoices('男'), verbose_name='性别')
    class_name = StringField(max_length=10, default="001", verbose_name='班级')
    address = StringField(max_length=255, default="中国", verbose_name='家庭住址')
    phone_no = StringField(max_length=11, default="10010000000", verbose_name='电话号码', validation=phone_required)
    age = IntField(min_value=0, max_value=150, default=18, verbose_name='年龄')
    # EmbeddedDocumentField为自定义
    # 此处的gradeList实现了 [{dict}, {dict}] 的效果
    gradeList = ListField(EmbeddedDocumentField(CourseGrade), verbose_name='成绩列表')
    # grade实现了{dict}的效果
    grade = EmbeddedDocumentField(CourseGrade, verbose_name='成绩文档')
    # gradeArr实现了[x, x, x]的效果
    gradeArr = ListField(IntField())

    meta = {
        'collection': 'students_main',
        'ordering': ['-age']
    }


# 新增数据
# todo 构造ODM对象
stu_obj = Student(stu_no=time.time(), stu_name="jack", gradeList=[{'course_name': 'math', 'age': 18}],
                  gradeArr=[18, 18],
                  grade={'course_name': 'math', 'age': 18})
# todo 验证数据
stu_obj.validate()
# todo 进行保存
res = stu_obj.save()  # 可以访问对应的属性
# 或者使用create
resCreate = Student.objects.create(stu_no=time.time() + 1, stu_name="jack",
                                   gradeList=[{'course_name': 'math', 'age': 18}],
                                   gradeArr=[18, 18],
                                   grade={'course_name': 'math', 'age': 18})

# 查询数据
first = Student.objects.first()  # 查询结构的第一个
get = Student.objects.get(id='61ea7bc06af381879c6fa89b')  # 根据id查询
# noGet = Student.objects.get(id='1')  # 不存在会报错
# noGet = Student.objects.get(stu_name='jack')  # 多个也会报错

# 条件查询
queryAll = Student.objects.all()
queryOrder = Student.objects.filter(age__gt=12)  # 年龄大于12的 filed__symbol
nestedQueryOrder = Student.objects.filter(grade__age__gte=60)  # 嵌套grade.age数据大于60
stringQueryOrder = Student.objects.filter(stu_name__startwith="j")  # stu_name 开头为 j

# 多条件查询
querySet1 = Student.objects.filter(Q(age__gt=12) & Q(age__lt=21))  # 同时满足
querySet2 = Student.objects.filter(Q(age__gt=12) | Q(age__lt=21))  # 部分满足
querySet3 = Student.objects.filter(Q(age__gt=12, sex=SexChoices.MEN) | Q(age__lt=21, sex=SexChoices.WOMEN))  # 部分满足

# 聚合统计
counts = Student.objects.count()
avgAge = Student.objects.all().average('age')
sumAge = Student.objects.all().sum('age')
order = Student.objects.order_by('+age', '-class_name')  # 指定排序
sliceData = Student.objects.all()[10:15]  # 10,11,12,13,14

# 修改数据
Student.objects.filter(stu_no=1).update_one(age=20)  # 单条修改
Student.objects.filter(stu_no=2).update_one(age=20, unset__class_name=True)  # unset不要这个字段了
stuObj = Student.objects.filter(stu_no=2).first()
stuObj.stu_name = 'Rose'
stuObj.save()  # 获取-》更改-》保存
Student.objects.filter(age=18).update(inc__age=1)  # 批量修改每个增加一个

# 删除数据
Student.objects(age__gt=11).delete()

Python操作Redis

$ pip install redis 
import redis

# 连接
# connect = redis.Redis(
#     host='localhost',
#     port=6379,
#     db=1
# )
pool = redis.ConnectionPool(
    host='localhost',
    port=6379,
    db=1,
    max_connections=20,
    decode_responses=True
)  # 连接池
connection = redis.Redis(
    connection_pool=pool
)
connection.set('hello', 'world')  # 设置
connection.close()  # 关闭连接

# 操作字符串
done = connection.set('key', '1')  # True
param = connection.get('key')  # decode_responses解码成功
connection.set('key2', 'value2', ex=10)  # 10秒钟自动超时
noneData = connection.get('None')  # None
want = {
    'key3': 'value3',
    'key4': 'value4'
}
connection.mset(want)  # 设置多个键值对
gets = connection.mget(['key2', 'key3'])  # 获取多个键值对
incr = connection.incr('age')  # 原子增加
delData = connection.delete('key')  # 删除0/1

# 操作列表
connection.lpush('list1', 'a', 'b')  # 左插入列表
connection.rpush('list2', 'a', 'b')  # 插入列表
resList = connection.lrange('list1', 0, -1)  # 查看但不取出来
res = connection.lpop('list2')  # 拿出来一个
resLength = connection.llen('list1')  # 查看长度

# 操作散列
connection.hset('stu:001', 'name', 'Jack')
isExists = connection.hexists('stu:001', 'name')  # 是否存在
connection.hsetnx('stu:002', 'name', 'Jack')  # 不存在就插入
connection.hset('stu:003', mapping=want)  # 批量插入
connection.hdel('stu:002', 'name')  # 删除
keys = connection.hkeys('stu:001')  # 要所有的keys
value = connection.hvals('stu:001') # 要所有的value

# 操作集合
connection.sadd('zoo', 'dog', 'cat')  # 添加到集合中
members = connection.smembers('zoo')  # 查看成员
isMembers = connection.sismember('zoo', 'monkey')  # 是否是
connection.srem('zoo', 'dog')  # 删除某个成员
connection.sadd('anotherZoo', 'dog', 'monkey')
diff = connection.sdiff('zoo', 'anotherZoo')  # 以第一个为准与后面的差别
inter = connection.sinter('zoo', 'anotherZoo')  # 交集
union = connection.sunion('zoo', 'anotherZoo')  # 并集

# 操作有序集合
rank = {
    'member1': 2,
    'member2': 4,
    'member3': 3,
    'member4': 5,
    'member5': 1,
}
connection.zadd('ranking', rank)  # 自动排序
count = connection.zcount('ranking', 0, 100)  # 最小到最大之间
connection.zrem('ranking', 'member3')  # 移除
score = connection.zscore('ranking', 'member2')  # 拿到分数
memberList = connection.zrange('ranking', 0, 100)  # 0~100之间的成员
rankList = connection.zrank('ranking', 'member2')  # 查看排名
 类似资料: