本来在完成这次量化实验之前不想再改版,但想想至少还有3个ADBS要在本次中完成,算下来改所花的时间还是少于因为不改而做的额外配置,所以还是改。
本次升级要解决几个问题:
目前看来,Redis Var有这么几种角色/功能:
所以本次的改变是将所有的redis变量都集中在config文件里,有一些只和项目相关,使用上又是固定的变量,就直接用项目名称去格式化就好。
所以会涉及到所有的app和monitor,逐个扫过一遍,确保通用的redis_var会被集中管理。
Worker的分发模式(fetch)有助于各Worker进行并行执行,而顺序取(range)则可以保证每一条数据不会漏掉。
这两种模式区别非常明显,又都会用到,所以把这个参数通过执行命令的参数透到外面的命令行会比较灵活。
具体的执行Worker是逻辑性问题,不需要再修改。
configs_base.py
先把最新的配置内容粘贴过来,然后逐个查看每个app中需要使用的redis变量。
# 通用Redis变量 - 默认都是本地redis
global_redis_var = {}
global_redis_var['redis_agent_host'] = redis_agent_host
global_redis_var['redis_connection_hash'] = redis_connection_hash
global_redis_var['some_var'] = None
# 项目用的appRedis变量 - 默认都是本地redis
app_redis_var = {}
app_redis_var['redis_agent_host'] = redis_agent_host
app_redis_var['redis_connection_hash'] = redis_connection_hash
app_redis_var['app01_PullToStep1MongoIn'] = 'BUFF.%s.step1_mongo_in.pf.app.app01_PullToStep1MongoIn.af.gp.0.uf.flow_count' % project_name
app_redis_var['app03_PullToStep1MongoOut'] = 'BUFF.%s.step1_mongo_in.pf.app.app01_PullToStep1MongoIn.af.gp.0.uf.flow_count' % project_name
# 监控用的Redis变量
monitor_redis_var = {}
monitor_redis_var['redis_agent_host'] = redis_agent_host
monitor_redis_var['redis_connection_hash'] = redis_connection_hash
monitor_redis_var['monitor03_Step1TasksClaimed'] ='BUFF.%s.step1_mongo_in.pf.app.monitor03_Step1TasksClaimed.af.gp.0.uf.last_stat' % project_name
monitor_redis_var['monitor04_StreamIn_count'] = 'BUFF.%s.step1_mongo_in.pf.app.app01_PullToStep1MongoIn.af.gp.0.uf.flow_count' % project_name
# 数值型转换(如果声明的变量缺少流程就会中断)
app01_toDoubleVarList = []
app03_toDoubleVarList = []
然后是app需要使用的redis变量,这些应该都是项目相关的固定变量。目前看来,app01是需要redis变量的。
app01_PullToStep1MongoIn.py
先修改这个。需要使用redis var来记录入系统流量,同时也需要维持数值化变量的任务。如果可以全部是数值的话就不用管。
..
from configs_base import global_redis_var,app_redis_var
..
toDoubleVarList = app01_config['toDoubleVarList']
..
import time
redis_var =app_redis_var['app01_PullToStep1MongoIn']
redis_buff = req.post(redis_agent_host + 'getv/',json ={'k':redis_var}).json()['data']
app03_PullToStep1MongoOut.py
app03并不需要提供其他的统计,无需redis变量,但是要增加对于数值型变量的转换功能。
..
from configs_base import global_redis_var,app_redis_var
..
toDoubleVarList = app03_config['toDoubleVarList']
monitor01_StreamFlow.py
这个monitor主要是看项目默认的输入输出队列,不必改。
monitor02_DB_Recs.py
这个是看项目相关数据库的总条数,也不用改。
monitor03_Step1TasksClaimed.py
这个是监控入系统数据被分配的情况。为了减少Mongo的统计压力,采用了增量统计的方式,使用了一个redis变量记录上次的统计点。
这个也属于“项目间差异命名,项目内固定命名”的变量,所以也提出来,放在configs_base里定义,在这个程序体中直接引入就可以。
...
from configs_base import global_redis_var,monitor_redis_var
...
redis_var = monitor_redis_var['monitor03_Step1TasksClaimed']
monitor04_StreamIn_count.py
这是用于统计入系统流量的,同样需要修改redis变量。monitor04在上个版本已经改过,现在把redis_var的导入集中管理。
worker01_Go.py
之前会给这个worker01进行不同的命名,发现在多数情况下要定制的部分其实不多,所以可以做一个约定,将这个步骤的调度都命名为 worker01_Go.py
,而对应的Worker,不管实现什么功能,都将对应的输出命名为: worker_af, worker_Chain_session_list 以及worker_Chain_session_dict。Worker的名称叫TheWorker.py
启动时,以argparse的方式对其进行调度,默认参数可以维持现状(这样就不必动sche.py)。
argparse的示例如下:
import argparse
# ================= 解析参数
def get_arg():
parser = argparse.ArgumentParser(description='Customized Arguments')
# parser.add_argument('-p','--pkl', default='Meta')
parser.add_argument('--is_fetch_mode')
# 准备解析参数
args = parser.parse_args()
is_fetch_mode = args.is_fetch_mode
return is_fetch_mode
if __name__ =='__main__':
is_fetch_mode = get_arg() or False
if is_fetch_mode is not False:
is_fetch_mode = True
print('is_fetch_mode :', is_fetch_mode)
调用:
root@3b2281835e89:/workspace# python3 test.py
is_fetch_mode : False
root@3b2281835e89:/workspace# python3 test.py --is_fetch_mode=xx
is_fetch_mode : True
重写worker.py
import os
print('>>>work is Running ')
runcode =''
for some_app in ['worker01_Go.py']:
runcode += str(os.system('python3 %s' % some_app))
print('>>>work RunCode :%s ' % runcode)
新建worker01_Go.py
...
import argparse
# ================= 解析参数
def get_arg():
parser = argparse.ArgumentParser(description='Customized Arguments')
# parser.add_argument('-p','--pkl', default='Meta')
parser.add_argument('--is_fetch_mode')
parser.add_argument('--worker_name')
parser.add_argument('--group_name')
# 准备解析参数
args = parser.parse_args()
is_fetch_mode = args.is_fetch_mode
worker_name = args.worker_name
group_name = args.group_name
return is_fetch_mode,worker_name,group_name
is_fetch_mode,worker_name,group_name = get_arg()
# 参数映射1
if is_fetch_mode is not None:
is_fetch_mode = True
else:
is_fetch_mode = False
# 参数映射2
worker_name =worker_name or 'alice'
# 参数映射3
# 在init_projects.py定义
group_name =group_name or 'group1'
...
from TheWorker import worker_af, worker_Chain_session_list, worker_Chain_session_dict
这样worker01就可以固定下来不变了,以后每个项目中嵌入自己的worker就可以,worker中对应的实例、链和字典赋给要导入的变量。
如果worker没有定义会怎么样?
整个流是不会有影响的,使用worker是以os方式调用worker01的,如果worker01失败,主程序并不会出错。
CNT_worker.py
这类调度主要是在冷启动的时候加速处理,启动一定期限的不间断循环,工作结束时对应的容器也会自动销毁。
import os
import argparse
# ================= 解析参数
def get_arg():
parser = argparse.ArgumentParser(description='Customized Arguments')
# parser.add_argument('-p','--pkl', default='Meta')
parser.add_argument('--cnt_limit')
parser.add_argument('--is_fetch_mode')
parser.add_argument('--worker_name')
parser.add_argument('--group_name')
# 准备解析参数
args = parser.parse_args()
cnt_limit = args.cnt_limit
is_fetch_mode = args.is_fetch_mode
worker_name = args.worker_name
group_name = args.group_name
return cnt_limit,is_fetch_mode,worker_name,group_name
cnt_limit,is_fetch_mode,worker_name,group_name = get_arg()
cnt_limit = cnt_limit or 10000
cnt_limit = int(cnt_limit)
is_fetch_mode = is_fetch_mode or 'yes'
if is_fetch_mode == 'no':
is_fetch_mode = ''
worker_name = worker_name or ''
group_name = group_name or ''
aleary_run = 0
print('>>>work is Running ')
for _ in range(cnt_limit):
aleary_run+=1
print('Already Run',aleary_run)
runcode =''
for some_app in ['worker01_Go.py --is_fetch_mode=%s --worker_name=%s --group_name=%s ' % (is_fetch_mode,worker_name,group_name) ]:
runcode += str(os.system('python3 %s' % some_app))
print('>>>work RunCode :%s ' % runcode)
Band_worker.py
这类调度主要是为了增加的实时数据的处理,在时间内一直循环,时间到时对应的容器也会自动销毁。Band_worker因为会常态运行,所以加一个参数,让其稍微停顿一下,一般一秒一次。
import os
import time
def get_time_str1(ts = None,bias_hours=0):
ts = ts or time.time()
return time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(ts + bias_hours*3600))
import argparse
# ================= 解析参数
def get_arg():
parser = argparse.ArgumentParser(description='Customized Arguments')
# parser.add_argument('-p','--pkl', default='Meta')
parser.add_argument('--to_dt')
parser.add_argument('--pace')
parser.add_argument('--is_fetch_mode')
parser.add_argument('--worker_name')
parser.add_argument('--group_name')
# 准备解析参数
args = parser.parse_args()
to_dt = args.to_dt
pace = args.pace
is_fetch_mode = args.is_fetch_mode
worker_name = args.worker_name
group_name = args.group_name
return to_dt,pace,is_fetch_mode,worker_name,group_name
to_dt,pace,is_fetch_mode,worker_name,group_name = get_arg()
to_dt = to_dt or '2099-01-01 00:00:00'
pace = pace or 10000
pace = int(pace)
is_fetch_mode = is_fetch_mode or 'yes'
if is_fetch_mode == 'no':
is_fetch_mode = ''
worker_name = worker_name or ''
group_name = group_name or ''
aleary_run = 0
print('>>>work is Running ')
while True:
aleary_run+=1
print('Already Run',aleary_run)
runcode =''
for some_app in ['worker01_Go.py --is_fetch_mode=%s --worker_name=%s --group_name=%s ' % (is_fetch_mode,worker_name,group_name) ]:
runcode += str(os.system('python3 %s' % some_app))
print('>>>work RunCode :%s ' % runcode)
if get_time_str1() >= to_dt:
break
if pace > 0:
time.sleep(pace)
略