当前位置: 首页 > 工具软件 > Sparrow > 使用案例 >

python的数据库中间件_sparrow

鲜于光辉
2023-12-01

Sparrow

分布式数据库中间件

简介

在类似订单的业务环境中,订单的记录会随着时间不断的增长。而走完整个周期的订单,时间越久,被访问到的机率就越小。

但越新的订单,访问频率越高。基于这么一个前提,做出一个简单的预测,如果经过一段时间后,订单量积累到一个非常可观的值,

影响到热数据的CURD效率,那么就必须制定一个可行的解决方案。该中间件就是为此而生。其通过对业务数据进行轻、重、热、冷横竖切分,

让每个块都保持一个轻量级状态。从而达到保证效率的目的。

安装依赖库

# 环境基于python2.7

pip install -r requirements.txt -i http://pypi.douban.com/simple --trusted-host pypi.douban.com --allow-external mysql-connector-python

配置文件

# 配置环境 [debug|sandbox|production(默认)] (可以写入具体执行用户的.bashrc文件中)

export JI_ENVIRONMENT=debug

# 转存周期 亦即多久从热库转存一次数据到冷库 d以日为周期, w以周为周期, m以月为周期

DUMP_CYCLE

# 数据转存所需时间 单位秒

DATA_FLY_TIME

# 数据库默认名称 详细配置中可做差异配置

DATABASE

# 数据库默认端口 后面详细配置中可做差异配置

DB_PORT

# 每次最多返回的记录数

DB_RESULT_LIMIT

# 热\冷库的时间线字段

TIME_LINE_FIELD

# 时间线字段的倍率 为了支持时间单位毫秒\纳秒所设, 仅用在dump脚本中, 1为妙, 1000为毫秒...

TIME_X

# 日志路径 注意读写权限

LOG_FILE_BASE

# 记录ID刻度的Key

IDsKeeper

# Redis相关配置

REDIS

DB_S

# 具体的域配置 [states.DBDomain.light.value|states.DBDomain.hot.value|states.DBDomain.cold.value], 对应轻\热\冷

domain

# 读写模式 [states.RWMode.SW_SR.value|states.RWMode.BW_AR.value], 对应单例写\单例读, 均衡写\聚合读, 一般轻库\冷库单写单读, 热库均衡写\聚合读

rw_mode

# 是否禁用该域 [True|False] 被禁用的域,配置信息不被解析

disable

# 连接池大小 注意: 所有的连接池加起来不能超过数据库环境变量max_connections, (参考命令: SHOW VARIABLES LIKE 'max_connections';)

# 包含转存脚本的连接数

pool_size

冷库格式

冷库需提前创建好,按月份,每月一个库,格式如sparrow__2016__1, 数据库名称\年\月之间用双下划线间隔.

每个冷库里面的数据表,及表的结构,索引都要与热库一致.不然转存数据会失败.

转存的时间

如果转存周期为'w',那么在进入当前周期时,就可以执行转存脚本.

crontab格式如 '* * * * 1 cd ~/sparrowt/task_scheduler && /usr/bin/python dump.py', 注意用户权限.

在周期末时,可以执行clear_expire_in_hot.py来清理热库中已被转存的数据.

crontab格式如 '* * * * 7 cd ~/sparrowt/task_scheduler && /usr/bin/python clear_expire_in_hot.py', 注意用户权限.

删除的内容会记录在当前目录的yyyymm_log_record.record文件中,当做后悔药.

启动

# HTTP API

python sparrow.py

# zmq API

python zmq_scheduler.py &

python zmq_sparrow.py &

数据库设计限制

每个表必须有id字段,且为主键,CURD操作都基于此来识别对象;

冷热库中的表都要有时间线字段,该字段具体名称可通过config.py文件设置(默认为create_time);

未建索引的字段将不能成为条件过滤字段;

组合索引,多个字段间用'__'(双下划线(如: ALTER TABLE user_info ADD INDEX first_name__last_name (first_name, last_name);))来间隔;

查询时,轻库无需指定时间区间. 冷热裤必须指定时间区间, 若不指定,只返回当天的区间结果.

进入冷库的数据即为历史记录. 不支持删\改操作.

HTTP API

插入

curl -X "POST" "http://localhost:5000/oo/login_auth" \

-H "Content-Type: application/json" \

-d "{\"id\":1,\"login_name\":\"fanliang@iqusong.com\",\"password\":\"000000.com\",\"create_time\":0}"

更新

curl -X "PATCH" "http://localhost:5000/oo/login_auth/1" \

-H "Content-Type: application/json" \

-d "{\"password\":\"newpswd\"}"

获取

curl -X "GET" "http://localhost:5000/oo/order_form?filter_str=__order_by__id,__limit__1000,__range__1451197800~1451518200"

删除

curl -X "DELETE" "http://localhost:5000/oo/login_auth/1"

zmq API

RPC

message = dict()

message['action'] = 'RPC'

message['object_name'] = 'order_form'

message['params'] = {

'function': 'generate_id_by', # 支持的方法 'generate_id_by | get_id_of_max_by'

}

socket.send_json(message)

result = socket.recv_json()

result:

{

state: {

code: "200",

zh-cn: "成功",

en-us: "OK"

}

id: 1

}

插入

message = dict()

message['action'] = 'POST'

message['object_name'] = 'order_form'

message['params'] = {

'id': 1,

'create_time': 0,

'finished_time': 0

}

socket.send_json(message)

socket.recv_json()

result:

{

state: {

code: "200",

zh-cn: "成功",

en-us: "OK"

}

}

更新

message = dict()

message['action'] = 'PATCH'

message['object_name'] = 'order_form'

message['params'] = {

'id': 1,

'create_time': 0,

'finished_time': 0

}

socket.send_json(message)

socket.recv_json()

result:

{

state: {

code: "200",

zh-cn: "成功",

en-us: "OK"

}

}

获取

message = dict()

message['action'] = 'GET'

message['object_name'] = 'order_form'

message['params'] = {

'filter_str': '__order_by__id,__limit__1000,__range__1451197800~1451518200'

}

socket.send_json(message)

socket.recv_json()

result:

{

state: {

code: "200",

zh-cn: "成功",

en-us: "OK"

},

list: [

{}

]

}

删除

message = dict()

message['action'] = 'DELETE'

message['object_name'] = 'order_form'

message['params'] = {

'id': 1

}

socket.send_json(message)

socket.recv_json()

result:

{

state: {

code: "200",

zh-cn: "成功",

en-us: "OK"

}

}

filter_str语法

eq

id__eq__100

id等于100的记录

gt

id__gt__100

id大于100的记录

lt

id__lt__100

id小于100的记录

ne

id__ne__100

id不等于100的记录

in

id_in__100~101~103

id等于100或101或103的记录

order_by

__order_by__create_time

依据create_time字段顺序排序

order_by_desc

__order_by_desc__create_time

依据create_time字段倒序排序

limit

__limit__10

限制返回前10条记录

range

__range__1451197800~1451518200

以1451197800~1451518200时间范围限定查找空间

综合示例

__order_by_desc__id,__limit__1000,__range__1451197800~1451518200

依据id字段,倒序返回1451197800~1451518200时间空间中的前1000条记录

id__gt__100,id__lt__1000,name__eq__james

返回id大于100且小于1000的记录中,name等于james的记录

错误码

error_codes = {

'41250': {

'code': '41250',

'zh-cn': u'未支持索引的字段'

},

'50050': {

'code': '50150',

'zh-cn': u'MySQL 链接或执行出错'

},

'50051': {

'code': '50051',

'zh-cn': u'Redis 链接或执行出错'

}

}

FAQ

如何扩大数据库连接限制?

Q: 有两种方式

a. 通过修改配置文件my.cnf

[mysqld]

max_connections = 1000

b. 修改环境变量

SET GLOBAL max_connections = 1000;

 类似资料: