sqlite3 — 内建关系型数据库
优质
小牛编辑
129浏览
2023-12-01
创建一个数据库
# sqlite3_createdb.py
import os
import sqlite3
db_filename = 'todo.db'
db_is_new = not os.path.exists(db_filename)
conn = sqlite3.connect(db_filename)
if db_is_new:
print('Need to create schema')
else:
print('Database exists, assume schema does, too.')
conn.close()
-- todo_schema.sql
-- Schema for to-do application examples.
-- Projects are high-level activities made up of tasks
create table project (
name text primary key,
description text,
deadline date
);
-- Tasks are steps that can be taken to complete a project
create table task (
id integer primary key autoincrement not null,
priority integer default 1,
details text,
status text,
deadline date,
completed_on date,
project text not null references project(name)
);
# sqlite3_create_schema.py
import os
import sqlite3
db_filename = 'todo.db'
schema_filename = 'todo_schema.sql'
db_is_new = not os.path.exists(db_filename)
with sqlite3.connect(db_filename) as conn:
if db_is_new:
print('Creating schema')
with open(schema_filename, 'rt') as f:
schema = f.read()
conn.executescript(schema)
print('Inserting initial data')
conn.executescript("""
insert into project (name, description, deadline)
values ('pymotw', 'Python Module of the Week',
'2016-11-01');
insert into task (details, status, deadline, project)
values ('write about select', 'done', '2016-04-25',
'pymotw');
insert into task (details, status, deadline, project)
values ('write about random', 'waiting', '2016-08-22',
'pymotw');
insert into task (details, status, deadline, project)
values ('write about sqlite3', 'active', '2017-07-31',
'pymotw');
""")
else:
print('Database exists, assume schema does, too.')
获取数据
# sqlite3_select_tasks.py
import sqlite3
db_filename = 'todo.db'
with sqlite3.connect(db_filename) as conn:
cursor = conn.cursor()
cursor.execute("""
select id, priority, details, status, deadline from task
where project = 'pymotw'
""")
for row in cursor.fetchall():
task_id, priority, details, status, deadline = row
print('{:2d} [{:d}] {:<25} [{:<8}] ({})'.format(
task_id, priority, details, status, deadline))
# sqlite3_select_variations.py
import sqlite3
db_filename = 'todo.db'
with sqlite3.connect(db_filename) as conn:
cursor = conn.cursor()
cursor.execute("""
select name, description, deadline from project
where name = 'pymotw'
""")
name, description, deadline = cursor.fetchone()
print('Project details for {} ({})\n due {}'.format(
description, name, deadline))
cursor.execute("""
select id, priority, details, status, deadline from task
where project = 'pymotw' order by deadline
""")
print('\nNext 5 tasks:')
for row in cursor.fetchmany(5):
task_id, priority, details, status, deadline = row
print('{:2d} [{:d}] {:<25} [{:<8}] ({})'.format(
task_id, priority, details, status, deadline))
查询元数据
# sqlite3_cursor_description.py
import sqlite3
db_filename = 'todo.db'
with sqlite3.connect(db_filename) as conn:
cursor = conn.cursor()
cursor.execute("""
select * from task where project = 'pymotw'
""")
print('Task table has these columns:')
for colinfo in cursor.description:
print(colinfo)
行对象
# sqlite3_row_factory.py
import sqlite3
db_filename = 'todo.db'
with sqlite3.connect(db_filename) as conn:
# Change the row factory to use Row
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("""
select name, description, deadline from project
where name = 'pymotw'
""")
name, description, deadline = cursor.fetchone()
print('Project details for {} ({})\n due {}'.format(
description, name, deadline))
cursor.execute("""
select id, priority, status, deadline, details from task
where project = 'pymotw' order by deadline
""")
print('\nNext 5 tasks:')
for row in cursor.fetchmany(5):
print('{:2d} [{:d}] {:<25} [{:<8}] ({})'.format(
row['id'], row['priority'], row['details'],
row['status'], row['deadline'],
))
使用变量查询
位置参数
# sqlite3_argument_positional.py
import sqlite3
import sys
db_filename = 'todo.db'
project_name = sys.argv[1]
with sqlite3.connect(db_filename) as conn:
cursor = conn.cursor()
query = """
select id, priority, details, status, deadline from task
where project = ?
"""
cursor.execute(query, (project_name,))
for row in cursor.fetchall():
task_id, priority, details, status, deadline = row
print('{:2d} [{:d}] {:<25} [{:<8}] ({})'.format(
task_id, priority, details, status, deadline))
命名参数
# sqlite3_argument_named.py
import sqlite3
import sys
db_filename = 'todo.db'
project_name = sys.argv[1]
with sqlite3.connect(db_filename) as conn:
cursor = conn.cursor()
query = """
select id, priority, details, status, deadline from task
where project = :project_name
order by deadline, priority
"""
cursor.execute(query, {'project_name': project_name})
for row in cursor.fetchall():
task_id, priority, details, status, deadline = row
print('{:2d} [{:d}] {:<25} [{:<8}] ({})'.format(
task_id, priority, details, status, deadline))
# sqlite3_argument_update.py
import sqlite3
import sys
db_filename = 'todo.db'
id = int(sys.argv[1])
status = sys.argv[2]
with sqlite3.connect(db_filename) as conn:
cursor = conn.cursor()
query = "update task set status = :status where id = :id"
cursor.execute(query, {'status': status, 'id': id})
批量加载
# sqlite3_load_csv.py
import csv
import sqlite3
import sys
db_filename = 'todo.db'
data_filename = sys.argv[1]
SQL = """
insert into task (details, priority, status, deadline, project)
values (:details, :priority, 'active', :deadline, :project)
"""
with open(data_filename, 'rt') as csv_file:
csv_reader = csv.DictReader(csv_file)
with sqlite3.connect(db_filename) as conn:
cursor = conn.cursor()
cursor.executemany(SQL, csv_reader)
定义新的列类型
# sqlite3_date_types.py
import sqlite3
import sys
db_filename = 'todo.db'
sql = "select id, details, deadline from task"
def show_deadline(conn):
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(sql)
row = cursor.fetchone()
for col in ['id', 'details', 'deadline']:
print(' {:<8} {!r:<26} {}'.format(
col, row[col], type(row[col])))
return
print('Without type detection:')
with sqlite3.connect(db_filename) as conn:
show_deadline(conn)
print('\nWith type detection:')
with sqlite3.connect(db_filename,
detect_types=sqlite3.PARSE_DECLTYPES,
) as conn:
show_deadline(conn)
# sqlite3_custom_type.py
import pickle
import sqlite3
db_filename = 'todo.db'
def adapter_func(obj):
"""Convert from in-memory to storage representation.
"""
print('adapter_func({})\n'.format(obj))
return pickle.dumps(obj)
def converter_func(data):
"""Convert from storage to in-memory representation.
"""
print('converter_func({!r})\n'.format(data))
return pickle.loads(data)
class MyObj:
def __init__(self, arg):
self.arg = arg
def __str__(self):
return 'MyObj({!r})'.format(self.arg)
# Register the functions for manipulating the type.
sqlite3.register_adapter(MyObj, adapter_func)
sqlite3.register_converter("MyObj", converter_func)
# Create some objects to save. Use a list of tuples so
# the sequence can be passed directly to executemany().
to_save = [
(MyObj('this is a value to save'),),
(MyObj(42),),
]
with sqlite3.connect(
db_filename,
detect_types=sqlite3.PARSE_DECLTYPES) as conn:
# Create a table with column of type "MyObj"
conn.execute("""
create table if not exists obj (
id integer primary key autoincrement not null,
data MyObj
)
""")
cursor = conn.cursor()
# Insert the objects into the database
cursor.executemany("insert into obj (data) values (?)",
to_save)
# Query the database for the objects just saved
cursor.execute("select id, data from obj")
for obj_id, obj in cursor.fetchall():
print('Retrieved', obj_id, obj)
print(' with type', type(obj))
print()
确定列类型
# sqlite3_custom_type_column.py
import pickle
import sqlite3
db_filename = 'todo.db'
def adapter_func(obj):
"""Convert from in-memory to storage representation.
"""
print('adapter_func({})\n'.format(obj))
return pickle.dumps(obj)
def converter_func(data):
"""Convert from storage to in-memory representation.
"""
print('converter_func({!r})\n'.format(data))
return pickle.loads(data)
class MyObj:
def __init__(self, arg):
self.arg = arg
def __str__(self):
return 'MyObj({!r})'.format(self.arg)
# Register the functions for manipulating the type.
sqlite3.register_adapter(MyObj, adapter_func)
sqlite3.register_converter("MyObj", converter_func)
# Create some objects to save. Use a list of tuples so we
# can pass this sequence directly to executemany().
to_save = [
(MyObj('this is a value to save'),),
(MyObj(42),),
]
with sqlite3.connect(
db_filename,
detect_types=sqlite3.PARSE_COLNAMES) as conn:
# Create a table with column of type "text"
conn.execute("""
create table if not exists obj2 (
id integer primary key autoincrement not null,
data text
)
""")
cursor = conn.cursor()
# Insert the objects into the database
cursor.executemany("insert into obj2 (data) values (?)",
to_save)
# Query the database for the objects just saved,
# using a type specifier to convert the text
# to objects.
cursor.execute(
'select id, data as "pickle [MyObj]" from obj2',
)
for obj_id, obj in cursor.fetchall():
print('Retrieved', obj_id, obj)
print(' with type', type(obj))
print()
事务
保存变更
# sqlite3_transaction_commit.py
import sqlite3
db_filename = 'todo.db'
def show_projects(conn):
cursor = conn.cursor()
cursor.execute('select name, description from project')
for name, desc in cursor.fetchall():
print(' ', name)
with sqlite3.connect(db_filename) as conn1:
print('Before changes:')
show_projects(conn1)
# Insert in one cursor
cursor1 = conn1.cursor()
cursor1.execute("""
insert into project (name, description, deadline)
values ('virtualenvwrapper', 'Virtualenv Extensions',
'2011-01-01')
""")
print('\nAfter changes in conn1:')
show_projects(conn1)
# Select from another connection, without committing first
print('\nBefore commit:')
with sqlite3.connect(db_filename) as conn2:
show_projects(conn2)
# Commit then select from another connection
conn1.commit()
print('\nAfter commit:')
with sqlite3.connect(db_filename) as conn3:
show_projects(conn3)
丢弃变更
# sqlite3_transaction_rollback.py
import sqlite3
db_filename = 'todo.db'
def show_projects(conn):
cursor = conn.cursor()
cursor.execute('select name, description from project')
for name, desc in cursor.fetchall():
print(' ', name)
with sqlite3.connect(db_filename) as conn:
print('Before changes:')
show_projects(conn)
try:
# Insert
cursor = conn.cursor()
cursor.execute("""delete from project
where name = 'virtualenvwrapper'
""")
# Show the settings
print('\nAfter delete:')
show_projects(conn)
# Pretend the processing caused an error
raise RuntimeError('simulated error')
except Exception as err:
# Discard the changes
print('ERROR:', err)
conn.rollback()
else:
# Save the changes
conn.commit()
# Show the results
print('\nAfter rollback:')
show_projects(conn)
隔离级别
# sqlite3_isolation_levels.py
import logging
import sqlite3
import sys
import threading
import time
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s (%(threadName)-10s) %(message)s',
)
db_filename = 'todo.db'
isolation_level = sys.argv[1]
def writer():
with sqlite3.connect(
db_filename,
isolation_level=isolation_level) as conn:
cursor = conn.cursor()
cursor.execute('update task set priority = priority + 1')
logging.debug('waiting to synchronize')
ready.wait() # synchronize threads
logging.debug('PAUSING')
time.sleep(1)
conn.commit()
logging.debug('CHANGES COMMITTED')
def reader():
with sqlite3.connect(
db_filename,
isolation_level=isolation_level) as conn:
cursor = conn.cursor()
logging.debug('waiting to synchronize')
ready.wait() # synchronize threads
logging.debug('wait over')
cursor.execute('select * from task')
logging.debug('SELECT EXECUTED')
cursor.fetchall()
logging.debug('results fetched')
if __name__ == '__main__':
ready = threading.Event()
threads = [
threading.Thread(name='Reader 1', target=reader),
threading.Thread(name='Reader 2', target=reader),
threading.Thread(name='Writer 1', target=writer),
threading.Thread(name='Writer 2', target=writer),
]
[t.start() for t in threads]
time.sleep(1)
logging.debug('setting ready')
ready.set()
[t.join() for t in threads]
内存数据库
导出一个数据库的内容
# sqlite3_iterdump.py
import sqlite3
schema_filename = 'todo_schema.sql'
with sqlite3.connect(':memory:') as conn:
conn.row_factory = sqlite3.Row
print('Creating schema')
with open(schema_filename, 'rt') as f:
schema = f.read()
conn.executescript(schema)
print('Inserting initial data')
conn.execute("""
insert into project (name, description, deadline)
values ('pymotw', 'Python Module of the Week',
'2010-11-01')
""")
data = [
('write about select', 'done', '2010-10-03',
'pymotw'),
('write about random', 'waiting', '2010-10-10',
'pymotw'),
('write about sqlite3', 'active', '2010-10-17',
'pymotw'),
]
conn.executemany("""
insert into task (details, status, deadline, project)
values (?, ?, ?, ?)
""", data)
print('Dumping:')
for text in conn.iterdump():
print(text)
在SQL中使用Python函数
# sqlite3_create_function.py
import codecs
import sqlite3
db_filename = 'todo.db'
def encrypt(s):
print('Encrypting {!r}'.format(s))
return codecs.encode(s, 'rot-13')
def decrypt(s):
print('Decrypting {!r}'.format(s))
return codecs.encode(s, 'rot-13')
with sqlite3.connect(db_filename) as conn:
conn.create_function('encrypt', 1, encrypt)
conn.create_function('decrypt', 1, decrypt)
cursor = conn.cursor()
# Raw values
print('Original values:')
query = "select id, details from task"
cursor.execute(query)
for row in cursor.fetchall():
print(row)
print('\nEncrypting...')
query = "update task set details = encrypt(details)"
cursor.execute(query)
print('\nRaw encrypted values:')
query = "select id, details from task"
cursor.execute(query)
for row in cursor.fetchall():
print(row)
print('\nDecrypting in query...')
query = "select id, decrypt(details) from task"
cursor.execute(query)
for row in cursor.fetchall():
print(row)
print('\nDecrypting...')
query = "update task set details = decrypt(details)"
cursor.execute(query)
用正则表达式查询
# sqlite3_regex.py
import re
import sqlite3
db_filename = 'todo.db'
def regexp(pattern, input):
return bool(re.match(pattern, input))
with sqlite3.connect(db_filename) as conn:
conn.row_factory = sqlite3.Row
conn.create_function('regexp', 2, regexp)
cursor = conn.cursor()
pattern = '.*[wW]rite [aA]bout.*'
cursor.execute(
"""
select id, priority, details, status, deadline from task
where details regexp :pattern
order by deadline, priority
""",
{'pattern': pattern},
)
for row in cursor.fetchall():
task_id, priority, details, status, deadline = row
print('{:2d} [{:d}] {:<25} [{:<8}] ({})'.format(
task_id, priority, details, status, deadline))
自定义聚合
# sqlite3_create_aggregate.py
import sqlite3
import collections
db_filename = 'todo.db'
class Mode:
def __init__(self):
self.counter = collections.Counter()
def step(self, value):
print('step({!r})'.format(value))
self.counter[value] += 1
def finalize(self):
result, count = self.counter.most_common(1)[0]
print('finalize() -> {!r} ({} times)'.format(
result, count))
return result
with sqlite3.connect(db_filename) as conn:
conn.create_aggregate('mode', 1, Mode)
cursor = conn.cursor()
cursor.execute("""
select mode(deadline) from task where project = 'pymotw'
""")
row = cursor.fetchone()
print('mode(deadline) is:', row[0])
线程和连接共享
# sqlite3_threading.py
import sqlite3
import sys
import threading
import time
db_filename = 'todo.db'
isolation_level = None # autocommit mode
def reader(conn):
print('Starting thread')
try:
cursor = conn.cursor()
cursor.execute('select * from task')
cursor.fetchall()
print('results fetched')
except Exception as err:
print('ERROR:', err)
if __name__ == '__main__':
with sqlite3.connect(db_filename,
isolation_level=isolation_level,
) as conn:
t = threading.Thread(name='Reader 1',
target=reader,
args=(conn,),
)
t.start()
t.join()
限制访问数据
# sqlite3_set_authorizer.py
import sqlite3
db_filename = 'todo.db'
def authorizer_func(action, table, column, sql_location, ignore):
print('\nauthorizer_func({}, {}, {}, {}, {})'.format(
action, table, column, sql_location, ignore))
response = sqlite3.SQLITE_OK # be permissive by default
if action == sqlite3.SQLITE_SELECT:
print('requesting permission to run a select statement')
response = sqlite3.SQLITE_OK
elif action == sqlite3.SQLITE_READ:
print('requesting access to column {}.{} from {}'.format(
table, column, sql_location))
if column == 'details':
print(' ignoring details column')
response = sqlite3.SQLITE_IGNORE
elif column == 'priority':
print(' preventing access to priority column')
response = sqlite3.SQLITE_DENY
return response
with sqlite3.connect(db_filename) as conn:
conn.row_factory = sqlite3.Row
conn.set_authorizer(authorizer_func)
print('Using SQLITE_IGNORE to mask a column value:')
cursor = conn.cursor()
cursor.execute("""
select id, details from task where project = 'pymotw'
""")
for row in cursor.fetchall():
print(row['id'], row['details'])
print('\nUsing SQLITE_DENY to deny access to a column:')
cursor.execute("""
select id, priority from task where project = 'pymotw'
""")
for row in cursor.fetchall():
print(row['id'], row['details'])