故障重现并找到了,我觉得可以把这个问题当成一种设计模式予以强化。
我发现在CNT_Worker运行后,WorkOut队列会有小概率出现没有衍生特征的情况,进而无法输出,产生了阻塞。
一开始以为是pickle载入失败的问题,后来觉得应该不是,应该是Worker中的逻辑问题
原来在try的处理中,当计算条件不满足时也会给数据赋值,透传。这大约是因为在将条件进行扁平化判别条件时忘记将在输出时进行判定了。
msg.msg ='ok'
msg.status = True
msg.data = res_dict
msg.rule_result = 1
修改如下:
if is_trade_slot and is_retrievable and is_recs_right:
msg.msg ='ok'
msg.status = True
msg.data = res_dict
msg.rule_result = 1
else:
msg.status = False
msg.rule_result = None
msg.data = None
我觉得比较可能的故障原因是:
在之前的方案中,采用Pickle方式维持本地的文件。后来我发现当启用多个worker的时候,这个开销就变得很大。
最初还是想节约一些内存空间,只保留最近的时隙,现在看来这个方法有点问题。因为Worker在进行实时加速时,肯定是要不断访问时间轴的,与其让worker自己去同步,不如在内存中维持一个完整的时间轴。
节约内存到浪费人的时间那就是真的不节约了,从这里也深深感觉到时间轴太重要了。
修改MyQuantBase里的sniffer,在redis中维持整个时间轴。形成「一次写入,多次读取」的实质效果。
解决了一些具体的技术问题,我更关心的是这些技术问题所对应的模式问题。毕竟“从特殊到一般,再从一般到特殊”这样的归纳演绎才是完美的螺旋上升。
AF总是在“尽力投递” 。 所以在这次Worker的疏漏上,有一部分是因为不对Worker的产出物的宽度(维度)进行任何要求。所以,可以这么记忆,AF的高可靠加工流水线,需要增加一些输出性的要求。例如,本次的处理是产生特征,相当于是“加宽操作”,当加工结束后对应的宽度没有出现就是错误的。
扁平化逻辑要进行串联 。本次更大的锅是由这个模式背的。通过逻辑的扁平化,本来使得代码可读性更好,从而提高可靠性。然而,由于在最后没有进行串联,导致无效的加工也可以输出。
并发的反复读写还是要走内存。 这次在加速计算中,每个worker如果都通过读写pickle的话,每个worker都要额外花费很多时间在磁盘IO上。会浪费很多无谓的开销。像时间轴这种元数据,就应该直接在内存中读取。
通用的共用启用专门的服务。 这次使用了redis变量来缓存时间轴,但本质上这也不是最好的方式。我看redis的列表是字符型的,操作起来并不方便。所以像这种情况,我觉得使用一个微服务。这个微服务会keep一个python列表在内存中,并且通过接口来提供快速的查询操作。(当然,使用tornado会不会比flask更合适些?)
可预计错误与不可预计错误。 有些错误是在预料内的,这些应该使用机制来包容。有些错误是为预料到的逻辑错误,这些应该是用debug来解决的。