Sparrow
分布式数据库中间件
简介
在类似订单的业务环境中,订单的记录会随着时间不断的增长。而走完整个周期的订单,时间越久,被访问到的机率就越小。
但越新的订单,访问频率越高。基于这么一个前提,做出一个简单的预测,如果经过一段时间后,订单量积累到一个非常可观的值,
影响到热数据的CURD效率,那么就必须制定一个可行的解决方案。该中间件就是为此而生。其通过对业务数据进行轻、重、热、冷横竖切分,
让每个块都保持一个轻量级状态。从而达到保证效率的目的。
安装依赖库
# 环境基于python2.7
pip install -r requirements.txt -i http://pypi.douban.com/simple --trusted-host pypi.douban.com --allow-external mysql-connector-python
配置文件
# 配置环境 [debug|sandbox|production(默认)] (可以写入具体执行用户的.bashrc文件中)
export JI_ENVIRONMENT=debug
# 转存周期 亦即多久从热库转存一次数据到冷库 d以日为周期, w以周为周期, m以月为周期
DUMP_CYCLE
# 数据转存所需时间 单位秒
DATA_FLY_TIME
# 数据库默认名称 详细配置中可做差异配置
DATABASE
# 数据库默认端口 后面详细配置中可做差异配置
DB_PORT
# 每次最多返回的记录数
DB_RESULT_LIMIT
# 热\冷库的时间线字段
TIME_LINE_FIELD
# 时间线字段的倍率 为了支持时间单位毫秒\纳秒所设, 仅用在dump脚本中, 1为妙, 1000为毫秒...
TIME_X
# 日志路径 注意读写权限
LOG_FILE_BASE
# 记录ID刻度的Key
IDsKeeper
# Redis相关配置
REDIS
DB_S
# 具体的域配置 [states.DBDomain.light.value|states.DBDomain.hot.value|states.DBDomain.cold.value], 对应轻\热\冷
domain
# 读写模式 [states.RWMode.SW_SR.value|states.RWMode.BW_AR.value], 对应单例写\单例读, 均衡写\聚合读, 一般轻库\冷库单写单读, 热库均衡写\聚合读
rw_mode
# 是否禁用该域 [True|False] 被禁用的域,配置信息不被解析
disable
# 连接池大小 注意: 所有的连接池加起来不能超过数据库环境变量max_connections, (参考命令: SHOW VARIABLES LIKE 'max_connections';)
# 包含转存脚本的连接数
pool_size
冷库格式
冷库需提前创建好,按月份,每月一个库,格式如sparrow__2016__1, 数据库名称\年\月之间用双下划线间隔.
每个冷库里面的数据表,及表的结构,索引都要与热库一致.不然转存数据会失败.
转存的时间
如果转存周期为'w',那么在进入当前周期时,就可以执行转存脚本.
crontab格式如 '* * * * 1 cd ~/sparrowt/task_scheduler && /usr/bin/python dump.py', 注意用户权限.
在周期末时,可以执行clear_expire_in_hot.py来清理热库中已被转存的数据.
crontab格式如 '* * * * 7 cd ~/sparrowt/task_scheduler && /usr/bin/python clear_expire_in_hot.py', 注意用户权限.
删除的内容会记录在当前目录的yyyymm_log_record.record文件中,当做后悔药.
启动
# HTTP API
python sparrow.py
# zmq API
python zmq_scheduler.py &
python zmq_sparrow.py &
数据库设计限制
每个表必须有id字段,且为主键,CURD操作都基于此来识别对象;
冷热库中的表都要有时间线字段,该字段具体名称可通过config.py文件设置(默认为create_time);
未建索引的字段将不能成为条件过滤字段;
组合索引,多个字段间用'__'(双下划线(如: ALTER TABLE user_info ADD INDEX first_name__last_name (first_name, last_name);))来间隔;
查询时,轻库无需指定时间区间. 冷热裤必须指定时间区间, 若不指定,只返回当天的区间结果.
进入冷库的数据即为历史记录. 不支持删\改操作.
HTTP API
插入
curl -X "POST" "http://localhost:5000/oo/login_auth" \
-H "Content-Type: application/json" \
-d "{\"id\":1,\"login_name\":\"fanliang@iqusong.com\",\"password\":\"000000.com\",\"create_time\":0}"
更新
curl -X "PATCH" "http://localhost:5000/oo/login_auth/1" \
-H "Content-Type: application/json" \
-d "{\"password\":\"newpswd\"}"
获取
curl -X "GET" "http://localhost:5000/oo/order_form?filter_str=__order_by__id,__limit__1000,__range__1451197800~1451518200"
删除
curl -X "DELETE" "http://localhost:5000/oo/login_auth/1"
zmq API
RPC
message = dict()
message['action'] = 'RPC'
message['object_name'] = 'order_form'
message['params'] = {
'function': 'generate_id_by', # 支持的方法 'generate_id_by | get_id_of_max_by'
}
socket.send_json(message)
result = socket.recv_json()
result:
{
state: {
code: "200",
zh-cn: "成功",
en-us: "OK"
}
id: 1
}
插入
message = dict()
message['action'] = 'POST'
message['object_name'] = 'order_form'
message['params'] = {
'id': 1,
'create_time': 0,
'finished_time': 0
}
socket.send_json(message)
socket.recv_json()
result:
{
state: {
code: "200",
zh-cn: "成功",
en-us: "OK"
}
}
更新
message = dict()
message['action'] = 'PATCH'
message['object_name'] = 'order_form'
message['params'] = {
'id': 1,
'create_time': 0,
'finished_time': 0
}
socket.send_json(message)
socket.recv_json()
result:
{
state: {
code: "200",
zh-cn: "成功",
en-us: "OK"
}
}
获取
message = dict()
message['action'] = 'GET'
message['object_name'] = 'order_form'
message['params'] = {
'filter_str': '__order_by__id,__limit__1000,__range__1451197800~1451518200'
}
socket.send_json(message)
socket.recv_json()
result:
{
state: {
code: "200",
zh-cn: "成功",
en-us: "OK"
},
list: [
{}
]
}
删除
message = dict()
message['action'] = 'DELETE'
message['object_name'] = 'order_form'
message['params'] = {
'id': 1
}
socket.send_json(message)
socket.recv_json()
result:
{
state: {
code: "200",
zh-cn: "成功",
en-us: "OK"
}
}
filter_str语法
eq
id__eq__100
id等于100的记录
gt
id__gt__100
id大于100的记录
lt
id__lt__100
id小于100的记录
ne
id__ne__100
id不等于100的记录
in
id_in__100~101~103
id等于100或101或103的记录
order_by
__order_by__create_time
依据create_time字段顺序排序
order_by_desc
__order_by_desc__create_time
依据create_time字段倒序排序
limit
__limit__10
限制返回前10条记录
range
__range__1451197800~1451518200
以1451197800~1451518200时间范围限定查找空间
综合示例
__order_by_desc__id,__limit__1000,__range__1451197800~1451518200
依据id字段,倒序返回1451197800~1451518200时间空间中的前1000条记录
id__gt__100,id__lt__1000,name__eq__james
返回id大于100且小于1000的记录中,name等于james的记录
错误码
error_codes = {
'41250': {
'code': '41250',
'zh-cn': u'未支持索引的字段'
},
'50050': {
'code': '50150',
'zh-cn': u'MySQL 链接或执行出错'
},
'50051': {
'code': '50051',
'zh-cn': u'Redis 链接或执行出错'
}
}
FAQ
如何扩大数据库连接限制?
Q: 有两种方式
a. 通过修改配置文件my.cnf
[mysqld]
max_connections = 1000
b. 修改环境变量
SET GLOBAL max_connections = 1000;