当前位置: 首页 > 工具软件 > ADBS > 使用案例 >

建模杂谈系列215 ADBS Update V3

柳涵意
2023-12-01

说明

本来在完成这次量化实验之前不想再改版,但想想至少还有3个ADBS要在本次中完成,算下来改所花的时间还是少于因为不改而做的额外配置,所以还是改。

本次升级要解决几个问题:

  • 1 Redis Var的配置化管理
  • 2 Worker 的命令行配置(分发与非分发)

内容

1 Redis Var

目前看来,Redis Var有这么几种角色/功能:

  • 1 通用依赖。例如像时间轴这种的变量,每个worker都要使用。
  • 2 项目定制变量。每个项目都有自己流程的缓存变量,目前有一些项目的缓存还是不正确的,但是只是用于监控,也就放过了。

所以本次的改变是将所有的redis变量都集中在config文件里,有一些只和项目相关,使用上又是固定的变量,就直接用项目名称去格式化就好。

所以会涉及到所有的app和monitor,逐个扫过一遍,确保通用的redis_var会被集中管理。

2 Worker

Worker的分发模式(fetch)有助于各Worker进行并行执行,而顺序取(range)则可以保证每一条数据不会漏掉。

这两种模式区别非常明显,又都会用到,所以把这个参数通过执行命令的参数透到外面的命令行会比较灵活。

具体的执行Worker是逻辑性问题,不需要再修改。

3 操作

  • 1 先创建一个新的文件夹,用于存放新的模板文件
  • 2 以docker run -it 方式打开老的镜像

3.1 configs_base.py

先把最新的配置内容粘贴过来,然后逐个查看每个app中需要使用的redis变量。

# 通用Redis变量 - 默认都是本地redis
global_redis_var = {}
global_redis_var['redis_agent_host'] = redis_agent_host
global_redis_var['redis_connection_hash'] = redis_connection_hash
global_redis_var['some_var'] = None

# 项目用的appRedis变量 - 默认都是本地redis
app_redis_var = {}
app_redis_var['redis_agent_host'] = redis_agent_host
app_redis_var['redis_connection_hash'] = redis_connection_hash
app_redis_var['app01_PullToStep1MongoIn'] = 'BUFF.%s.step1_mongo_in.pf.app.app01_PullToStep1MongoIn.af.gp.0.uf.flow_count' % project_name
app_redis_var['app03_PullToStep1MongoOut'] = 'BUFF.%s.step1_mongo_in.pf.app.app01_PullToStep1MongoIn.af.gp.0.uf.flow_count' % project_name

# 监控用的Redis变量
monitor_redis_var = {}
monitor_redis_var['redis_agent_host'] = redis_agent_host
monitor_redis_var['redis_connection_hash'] = redis_connection_hash
monitor_redis_var['monitor03_Step1TasksClaimed'] ='BUFF.%s.step1_mongo_in.pf.app.monitor03_Step1TasksClaimed.af.gp.0.uf.last_stat' % project_name
monitor_redis_var['monitor04_StreamIn_count'] = 'BUFF.%s.step1_mongo_in.pf.app.app01_PullToStep1MongoIn.af.gp.0.uf.flow_count' % project_name

# 数值型转换(如果声明的变量缺少流程就会中断)
app01_toDoubleVarList = []
app03_toDoubleVarList = []

然后是app需要使用的redis变量,这些应该都是项目相关的固定变量。目前看来,app01是需要redis变量的。

3.2 app01_PullToStep1MongoIn.py

先修改这个。需要使用redis var来记录入系统流量,同时也需要维持数值化变量的任务。如果可以全部是数值的话就不用管。

..
from configs_base import global_redis_var,app_redis_var

..
toDoubleVarList = app01_config['toDoubleVarList']

..

import time 
redis_var =app_redis_var['app01_PullToStep1MongoIn']
redis_buff = req.post(redis_agent_host + 'getv/',json ={'k':redis_var}).json()['data']

3.3 app03_PullToStep1MongoOut.py

app03并不需要提供其他的统计,无需redis变量,但是要增加对于数值型变量的转换功能。

..
from configs_base import global_redis_var,app_redis_var

..
toDoubleVarList = app03_config['toDoubleVarList']

3.4 monitor01_StreamFlow.py

这个monitor主要是看项目默认的输入输出队列,不必改。

3.5 monitor02_DB_Recs.py

这个是看项目相关数据库的总条数,也不用改。

3.6 monitor03_Step1TasksClaimed.py

这个是监控入系统数据被分配的情况。为了减少Mongo的统计压力,采用了增量统计的方式,使用了一个redis变量记录上次的统计点。

这个也属于“项目间差异命名,项目内固定命名”的变量,所以也提出来,放在configs_base里定义,在这个程序体中直接引入就可以。

...

from configs_base import global_redis_var,monitor_redis_var

...

redis_var = monitor_redis_var['monitor03_Step1TasksClaimed']

3.7 monitor04_StreamIn_count.py

这是用于统计入系统流量的,同样需要修改redis变量。monitor04在上个版本已经改过,现在把redis_var的导入集中管理。

3.8 worker01_Go.py

之前会给这个worker01进行不同的命名,发现在多数情况下要定制的部分其实不多,所以可以做一个约定,将这个步骤的调度都命名为 worker01_Go.py,而对应的Worker,不管实现什么功能,都将对应的输出命名为: worker_af, worker_Chain_session_list 以及worker_Chain_session_dict。Worker的名称叫TheWorker.py

启动时,以argparse的方式对其进行调度,默认参数可以维持现状(这样就不必动sche.py)。

argparse的示例如下:

import argparse
# =================  解析参数
def get_arg():
    parser = argparse.ArgumentParser(description='Customized Arguments')
#     parser.add_argument('-p','--pkl', default='Meta')

    parser.add_argument('--is_fetch_mode')

    # 准备解析参数
    args = parser.parse_args()

    is_fetch_mode = args.is_fetch_mode

    return is_fetch_mode

if __name__ =='__main__':
    is_fetch_mode = get_arg() or False
    if is_fetch_mode is not False:
        is_fetch_mode = True 
    print('is_fetch_mode :', is_fetch_mode)

调用:

root@3b2281835e89:/workspace# python3 test.py
is_fetch_mode : False
root@3b2281835e89:/workspace# python3 test.py --is_fetch_mode=xx
is_fetch_mode : True

重写worker.py

import os 

print('>>>work is Running  ')
runcode =''
for some_app in ['worker01_Go.py']:
    runcode += str(os.system('python3 %s' % some_app))
print('>>>work RunCode :%s ' % runcode)

新建worker01_Go.py

...
import argparse
# =================  解析参数
def get_arg():
    parser = argparse.ArgumentParser(description='Customized Arguments')
#     parser.add_argument('-p','--pkl', default='Meta')

    parser.add_argument('--is_fetch_mode')
    parser.add_argument('--worker_name')
    parser.add_argument('--group_name')

    # 准备解析参数
    args = parser.parse_args()

    is_fetch_mode = args.is_fetch_mode
    worker_name = args.worker_name
    group_name = args.group_name

    return is_fetch_mode,worker_name,group_name


is_fetch_mode,worker_name,group_name = get_arg()

# 参数映射1
if is_fetch_mode is not None:
    is_fetch_mode = True 
else:
    is_fetch_mode = False
# 参数映射2
worker_name =worker_name or 'alice'

# 参数映射3
# 在init_projects.py定义
group_name =group_name or 'group1'

...
from TheWorker import worker_af, worker_Chain_session_list, worker_Chain_session_dict

这样worker01就可以固定下来不变了,以后每个项目中嵌入自己的worker就可以,worker中对应的实例、链和字典赋给要导入的变量。

如果worker没有定义会怎么样?

整个流是不会有影响的,使用worker是以os方式调用worker01的,如果worker01失败,主程序并不会出错。

3.9 CNT_worker.py

这类调度主要是在冷启动的时候加速处理,启动一定期限的不间断循环,工作结束时对应的容器也会自动销毁。

import os 

import argparse
# =================  解析参数
def get_arg():
    parser = argparse.ArgumentParser(description='Customized Arguments')
#     parser.add_argument('-p','--pkl', default='Meta')

    parser.add_argument('--cnt_limit')
    parser.add_argument('--is_fetch_mode')
    parser.add_argument('--worker_name')
    parser.add_argument('--group_name')

    # 准备解析参数
    args = parser.parse_args()

    cnt_limit = args.cnt_limit
    is_fetch_mode = args.is_fetch_mode
    worker_name = args.worker_name
    group_name = args.group_name

    return cnt_limit,is_fetch_mode,worker_name,group_name


cnt_limit,is_fetch_mode,worker_name,group_name = get_arg()

cnt_limit = cnt_limit or 10000
cnt_limit = int(cnt_limit)

is_fetch_mode = is_fetch_mode or 'yes'
if is_fetch_mode == 'no':
    is_fetch_mode = ''

worker_name = worker_name or ''
group_name = group_name or ''


aleary_run = 0

print('>>>work is Running  ')
for _ in range(cnt_limit):
    aleary_run+=1
    print('Already Run',aleary_run)
    runcode =''
    for some_app in ['worker01_Go.py --is_fetch_mode=%s --worker_name=%s --group_name=%s ' % (is_fetch_mode,worker_name,group_name) ]:
        runcode += str(os.system('python3 %s' % some_app))
    print('>>>work RunCode :%s ' % runcode)

3.10 Band_worker.py

这类调度主要是为了增加的实时数据的处理,在时间内一直循环,时间到时对应的容器也会自动销毁。Band_worker因为会常态运行,所以加一个参数,让其稍微停顿一下,一般一秒一次。

import os 

import time
def get_time_str1(ts = None,bias_hours=0):
    ts = ts or time.time()
    return time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(ts + bias_hours*3600))

import argparse
# =================  解析参数
def get_arg():
    parser = argparse.ArgumentParser(description='Customized Arguments')
#     parser.add_argument('-p','--pkl', default='Meta')

    parser.add_argument('--to_dt')
    parser.add_argument('--pace')
    parser.add_argument('--is_fetch_mode')
    parser.add_argument('--worker_name')
    parser.add_argument('--group_name')

    # 准备解析参数
    args = parser.parse_args()

    to_dt = args.to_dt
    pace = args.pace
    is_fetch_mode = args.is_fetch_mode
    worker_name = args.worker_name
    group_name = args.group_name

    return to_dt,pace,is_fetch_mode,worker_name,group_name


to_dt,pace,is_fetch_mode,worker_name,group_name = get_arg()

to_dt = to_dt or '2099-01-01 00:00:00'

pace = pace or 10000
pace = int(pace)

is_fetch_mode = is_fetch_mode or 'yes'
if is_fetch_mode == 'no':
    is_fetch_mode = ''

worker_name = worker_name or ''
group_name = group_name or ''



aleary_run = 0

print('>>>work is Running  ')
while True:
    aleary_run+=1
    print('Already Run',aleary_run)
    runcode =''
    for some_app in ['worker01_Go.py --is_fetch_mode=%s --worker_name=%s --group_name=%s ' % (is_fetch_mode,worker_name,group_name) ]:
        runcode += str(os.system('python3 %s' % some_app))
    print('>>>work RunCode :%s ' % runcode)

    if get_time_str1() >= to_dt:
        break
    if pace > 0:
        time.sleep(pace)

4 提交更改,生成新镜像

 类似资料: