今天在回家的火车上把这篇文章写了~
上篇文章连接:https://blog.csdn.net/weixin_51485807/article/details/122568825
祝大家新年快乐吧~具体看上篇前言
首发于:https://sleepymonster.cn
$ pip install pymongo
这里的能满足基本需要,要聚合查询或者整花活记得取官方文档去看看
import pymongo
from bson import ObjectId
from pymongo import MongoClient
# 连接数据库
client = MongoClient("mongodb://localhost:27017/")
# 管理数据库
pythonDb = client.get_database("python") # 通过逻辑库的名字找到
listAll = client.list_databases() # 查看所有数据库
listNameAll = client.list_database_names() # 查看所有数据库名称
client.drop_database("temp") # 删除数据库
# 管理集合
listAllName = pythonDb.list_collection_names() # 查看集合名称
main = pythonDb.get_collection("python_main") # 拿到集合
data = main.find_one() # 拿到数据
# 新增文档
doc1 = { # 定义一个
'name': 'world1',
'age': 20,
'hobby': {
'up': 'ball',
'down': 'run'
}
}
doc2 = { # 定义一个
'name': 'world2',
'age': 20,
'hobby': {
'up': 'ball',
'down': 'run'
}
}
doc3 = { # 定义一个
'name': 'world3',
'age': 20,
'hobby': {
'up': 'ball',
'down': 'run'
}
}
res = main.insert_one(doc1) # 拿到那个集合
resId = res.inserted_id # 会返回主键id
docList = [doc2, doc3] # 准备批量插入
resList = main.insert_many(docList)
resListId = resList.inserted_ids # 列表形式
# 查询文档
findOne = main.find_one() # 查询一条数据
findByMainKey = main.find_one({'_id': ObjectId('61ea4b7af17f38a6ff270a9f')}) # 根据主键进行查询
findMany = main.find() # 查询多条
findManyByOrder1 = main.find({}, {'name': 1}) # 多条查询的条件查询(只返回name列)
findManyByOrder2 = main.find({'age': {'$gt': 19}}, {'name': 1, 'age': 1}) # 多条查询的条件查询 年龄大于19的
findPage = main.find().skip(20).limit(10) # 进行分页 跳过前2页取10条数据
findOrder1 = main.find().sort('age', pymongo.ASCENDING) # 进行排序
findOrder2 = main.find({"name": "world1"}).sort('age', pymongo.ASCENDING) # 条件排序
findOrder3 = main.find({"name": "world1"}).sort([('age', pymongo.ASCENDING), ('name', pymongo.ASCENDING)]) # 多列排序
findCount = main.count_documents({}) # 文档总数
# 更新文档
res = main.update_one({}, {'$set': {'age': 19}}) # 更新
resUpdate = main.replace_one({}, {"newName": "Hello"})
resUpdateMany = main.update_many({}, {'$set': {'age': 19}}) # 批量更新
main.find_one_and_update({}, {'$set': {'age': 21}}) # 返回的是文档
main.find_one_and_replace({}, {"newName": "Hello World"})
# 删除文档
resDel = main.delete_one({}) # 根据条件查询并且删除第一条
redDelFind = main.find_one_and_delete({}) # 如果删除且返回此文档
# resDelMany = main.delete_many({}) # 根据条件删除
$ pip install mongoengine
import re
import time
from enum import Enum
from mongoengine import connect, Document, EmbeddedDocument, ValidationError
from mongoengine.queryset.visitor import Q
from mongoengine.fields import IntField, StringField, EnumField, ListField, EmbeddedDocumentField
connect("students", host='mongodb://localhost:27017/students')
# 自定义一个验证器
def phone_required(value):
pattern = r'^1[0-9]{10}$'
if not re.search(pattern, value):
raise ValidationError('请输出手机号码')
class SexChoices(Enum):
MEN = '男'
WOMEN = '女'
class CourseGrade(EmbeddedDocument): # 被嵌套的文档
course_name = StringField(max_length=64, required=True, verbose_name='课程名称')
teacher = StringField(max_length=16, verbose_name='老师')
age = IntField(min_value=0, max_value=100, required=True, verbose_name='分数')
class Student(Document):
stu_no = IntField(required=True, unique=True, verbose_name='学号')
stu_name = StringField(required=True, max_length=16, verbose_name='姓名')
sex = EnumField(enum=SexChoices, default=SexChoices('男'), verbose_name='性别')
class_name = StringField(max_length=10, default="001", verbose_name='班级')
address = StringField(max_length=255, default="中国", verbose_name='家庭住址')
phone_no = StringField(max_length=11, default="10010000000", verbose_name='电话号码', validation=phone_required)
age = IntField(min_value=0, max_value=150, default=18, verbose_name='年龄')
# EmbeddedDocumentField为自定义
# 此处的gradeList实现了 [{dict}, {dict}] 的效果
gradeList = ListField(EmbeddedDocumentField(CourseGrade), verbose_name='成绩列表')
# grade实现了{dict}的效果
grade = EmbeddedDocumentField(CourseGrade, verbose_name='成绩文档')
# gradeArr实现了[x, x, x]的效果
gradeArr = ListField(IntField())
meta = {
'collection': 'students_main',
'ordering': ['-age']
}
# 新增数据
# todo 构造ODM对象
stu_obj = Student(stu_no=time.time(), stu_name="jack", gradeList=[{'course_name': 'math', 'age': 18}],
gradeArr=[18, 18],
grade={'course_name': 'math', 'age': 18})
# todo 验证数据
stu_obj.validate()
# todo 进行保存
res = stu_obj.save() # 可以访问对应的属性
# 或者使用create
resCreate = Student.objects.create(stu_no=time.time() + 1, stu_name="jack",
gradeList=[{'course_name': 'math', 'age': 18}],
gradeArr=[18, 18],
grade={'course_name': 'math', 'age': 18})
# 查询数据
first = Student.objects.first() # 查询结构的第一个
get = Student.objects.get(id='61ea7bc06af381879c6fa89b') # 根据id查询
# noGet = Student.objects.get(id='1') # 不存在会报错
# noGet = Student.objects.get(stu_name='jack') # 多个也会报错
# 条件查询
queryAll = Student.objects.all()
queryOrder = Student.objects.filter(age__gt=12) # 年龄大于12的 filed__symbol
nestedQueryOrder = Student.objects.filter(grade__age__gte=60) # 嵌套grade.age数据大于60
stringQueryOrder = Student.objects.filter(stu_name__startwith="j") # stu_name 开头为 j
# 多条件查询
querySet1 = Student.objects.filter(Q(age__gt=12) & Q(age__lt=21)) # 同时满足
querySet2 = Student.objects.filter(Q(age__gt=12) | Q(age__lt=21)) # 部分满足
querySet3 = Student.objects.filter(Q(age__gt=12, sex=SexChoices.MEN) | Q(age__lt=21, sex=SexChoices.WOMEN)) # 部分满足
# 聚合统计
counts = Student.objects.count()
avgAge = Student.objects.all().average('age')
sumAge = Student.objects.all().sum('age')
order = Student.objects.order_by('+age', '-class_name') # 指定排序
sliceData = Student.objects.all()[10:15] # 10,11,12,13,14
# 修改数据
Student.objects.filter(stu_no=1).update_one(age=20) # 单条修改
Student.objects.filter(stu_no=2).update_one(age=20, unset__class_name=True) # unset不要这个字段了
stuObj = Student.objects.filter(stu_no=2).first()
stuObj.stu_name = 'Rose'
stuObj.save() # 获取-》更改-》保存
Student.objects.filter(age=18).update(inc__age=1) # 批量修改每个增加一个
# 删除数据
Student.objects(age__gt=11).delete()
$ pip install redis
import redis
# 连接
# connect = redis.Redis(
# host='localhost',
# port=6379,
# db=1
# )
pool = redis.ConnectionPool(
host='localhost',
port=6379,
db=1,
max_connections=20,
decode_responses=True
) # 连接池
connection = redis.Redis(
connection_pool=pool
)
connection.set('hello', 'world') # 设置
connection.close() # 关闭连接
# 操作字符串
done = connection.set('key', '1') # True
param = connection.get('key') # decode_responses解码成功
connection.set('key2', 'value2', ex=10) # 10秒钟自动超时
noneData = connection.get('None') # None
want = {
'key3': 'value3',
'key4': 'value4'
}
connection.mset(want) # 设置多个键值对
gets = connection.mget(['key2', 'key3']) # 获取多个键值对
incr = connection.incr('age') # 原子增加
delData = connection.delete('key') # 删除0/1
# 操作列表
connection.lpush('list1', 'a', 'b') # 左插入列表
connection.rpush('list2', 'a', 'b') # 插入列表
resList = connection.lrange('list1', 0, -1) # 查看但不取出来
res = connection.lpop('list2') # 拿出来一个
resLength = connection.llen('list1') # 查看长度
# 操作散列
connection.hset('stu:001', 'name', 'Jack')
isExists = connection.hexists('stu:001', 'name') # 是否存在
connection.hsetnx('stu:002', 'name', 'Jack') # 不存在就插入
connection.hset('stu:003', mapping=want) # 批量插入
connection.hdel('stu:002', 'name') # 删除
keys = connection.hkeys('stu:001') # 要所有的keys
value = connection.hvals('stu:001') # 要所有的value
# 操作集合
connection.sadd('zoo', 'dog', 'cat') # 添加到集合中
members = connection.smembers('zoo') # 查看成员
isMembers = connection.sismember('zoo', 'monkey') # 是否是
connection.srem('zoo', 'dog') # 删除某个成员
connection.sadd('anotherZoo', 'dog', 'monkey')
diff = connection.sdiff('zoo', 'anotherZoo') # 以第一个为准与后面的差别
inter = connection.sinter('zoo', 'anotherZoo') # 交集
union = connection.sunion('zoo', 'anotherZoo') # 并集
# 操作有序集合
rank = {
'member1': 2,
'member2': 4,
'member3': 3,
'member4': 5,
'member5': 1,
}
connection.zadd('ranking', rank) # 自动排序
count = connection.zcount('ranking', 0, 100) # 最小到最大之间
connection.zrem('ranking', 'member3') # 移除
score = connection.zscore('ranking', 'member2') # 拿到分数
memberList = connection.zrange('ranking', 0, 100) # 0~100之间的成员
rankList = connection.zrank('ranking', 'member2') # 查看排名