一年时间转瞬即逝,16年11月份因为业务需要,开始构建actor工具库,因为cluster下grain实现太困难,17年4月切换到具有该特征的protoactor,到现在,使用这个框架快一年。一路踩坑无数,趁这几天规划下阶段业务,就抽空聊聊。
Actor框架的提出基本上与CSP差不多处于同一个年代,相对于后者,过去10年Actor还是要稍微火一些,毕竟目前都强调快速开发,绝大部分的开发人员更关注业务实现,这符合Actor侧重于接收消息的对象(CSP更强调传输通道)一致。ProtoActor的设计与实现,绝大部分和AKKA很相似,只是在序列化、服务发现与注册、生命周期几个地方略有差异。框架使用上整体与AKKA相同,但受限于go语言,在使用细节上差别有点大。
消息传递
ProtoActor通过MPSC(multi produce single consumer)结构,来实现系统级消息的传递;通过RingBuffer来实现业务消息传递,默认可以缓冲10条业务消息。
跨服传递方面,每一个服务节点即使rpc的客户端,也是rpc的服务端。节点以rpc请求打包发送消息,以rpc服务接收可能的消息回复。
基于gPRC
相对于go标准库的rpc包,gRPC具有更多的优点,包括但不限于:
序列化
ProtoActor跨服数据默认采用的Protobuf3序列化协议定义了跨服消息的封装,不支持msgpack、thrift等其他协议。
cluster
ProtoActor每个节点允许通过consul进行服务注册,但cluster下的name必须相同。目前小问题还比较多,尤其是跨节点的watch有设计上bug,暂时不建议用于生产环境。
ProtoActor可以算是目前能够早到的最成熟的异步队列封装的框架,基本上可以实现绝大部分业务,屏蔽了底层rpc、序列化的具体实现。
屏蔽了协程的坑
尽管go的一大亮点就是goroutinue,但其中很多坑,就一个很简单的代码在客户端正常,在网上play.golang.org
绝对跑不动:
func main() {
var ok = true
//截止到go1.9.2,如果服务器只有1cpu,则没法调度。
go func() {
time.Sleep(time.Second)
ok = false
}()
for ok {
}
fmt.Println("程序运行结束!")
}
框架中,通过atomic
等偏底层的接口,基本没使用goroutinue和chan,就实现了异步队列的消息传递和函数调用,并且通过runtime.Goschedule
避免了业务繁忙情况下,其他任务等待的问题。因此,很多时候,将其作为异步消息队列提高服务任性,是很好的选择,不用自己每次去写CSP。
在actor.Started消息响应完成初始化
尽管我们会写Actor
接口的实现,但这个实现代码(暂且叫做MyActor)不要在外部完成业务数据的初始化。涉及到数据库读取,最多就提供一个UID,然后再接收到*actor.Starting
消息后进行处理。比如这样定义:
type MyActor struct{
UID string
myContext MyActorContext //业务上下文结构定义
//...
}
func (my *MyActor)Receive(ctx actor.Context){
switch msg:=ctx.Message().(type){
case *actor.Started:
//执行数据加载,初始化myContext
//响应其他事件
}
}
这样做,有三个好处:
system
或parent actor
执行,类似于数据库加载这类IO操作相对耗时,会导致其阻塞。Props(actor)
操作,更强调模板不可变,而MyActor的UID是一直不变的。禁止使用FromInstance
官方例子文件,使用了不少FromInstance
来构建Actor,由于go语言不是面向对象语言,该方法实际上是反复使用同一个实例,在业务异常崩溃恢复后,实例并没有重新设置。我们会发现之前的某些变量,并没有因此重置(FromInstance
以后或许会被废除)
利用鸭子接口进行消息归类
最初,我们的业务就一个节点。后来需要将该节点的业务分拆为【前端接入】->【若干个后端不同场景服务】,由于消息定义并不支持继承等高级特性,就采用增加同名属性定义,并提供默认值的方式:
message Request{
required int64 Router = 1[default 10];
optional int64 UserId = 2;
}
我们的客户端只能用protobuf2,正好使用了default特性。
由于protobuf生成的Request
代码肯定会生成GetRouter
,我们就可以定义这样的接口,来对具有同样属性的消息进行归类:
type Router interface{
func GetRouter()int64
}
采用这种手段,我们在不用列举所有消息、不用reflect
包的情况下,就可以方便的将前端消息转发给对应的后端服务器。
同样,scala的case对象在编译后有Product接口实现,只要属性顺序相同,可以在对应序号的属性强制转化,进行分类。kotlin暂时没看到类似特性。
在使用过程中,享受了框架带来的便利,但也有不少出乎预料的问题。相对于AKKA,确实不够成熟,提交反馈之后通常一周会有答复,大概有一半得到修复,其他的就#¥#@%#。比较严重的,大概还有几点:
状态切换没有实际意义
actor非常强调状态,在不同状态下处理业务逻辑非常便于对业务的梳理。protoactor没有将系统消息的响应交给postRestart
、postStop
、preStart
三个独立的方法,而是通过唯一的Receive
作为所有消息的响应入口:
// Actor is the interface that defines the Receive method.
//
// Receive is sent messages to be processed from the mailbox associated with the instance of the actor
type Actor interface {
Receive(c Context) //<-只有这一个方法实现
}
这导致,不论业务Actor在何种状态,都必须在Receive处理系统消息,代码会比较臃肿:
func (actor *MyActor)Receive1(ctx actor.Context){
handleSystemMsg(ctx.Message)
//...
}
func (actor *MyActor)Receive2(ctx actor.Context){
actor.handleSystemMsg(ctx.Message) //<-通常每个状态都要响应系统消息,避免业务崩溃导致数据没有保存等情况的发生
//...
}
//响应系统消息
func (actor *MyActor)handleSystemMsg(msg interface{}){
switch msg.(type){
case *actor.Started:
case *actor.Stopping:
case *actor.Restart:
case *actor.Restarting:
}
}
导致状态切换到别的消息入口时,不得不又要写一遍对系统消息的接收,这导致提供的Setbehavior
(AKKA叫做become
)失去了实际意义。
在实践上,考虑在actor实例中增加一个属性作为状态值,采用默认Receive作为统一入口,再路由其他分支更现实:
type MyActor struct{
state int32 //actor当前状态
//其他属性
}
//默认的Receive入口
func (actor *MyActor)Receive(ctx actor.Context){
switch msg.(type){
case *actor.Started:
case *actor.Stopping:
case *actor.Restart:
case *actor.Restarting:
}
switch actor.state{
case 1:
receive_state1(ctx)
case 2:
receive_state2(ctx)
}
}
//响应state1下的消息
func (actor *MyActor)receive_state1(ctx actor.Context){
//...
}
//响应state2下的消息
func (actor *MyActor)receive_state2(ctx actor.Context){
//...
}
生命周期略有不同
框架的生命周期与Akka的略有不同,正常情况下:Started->Stopping->Stopped。其中,Stopping是停止actor之前执行,而Stopped是注销actor之后。这是要非常小心,如果Stopping出现崩溃,actor对象将不会释放。
如果在业务运行过程中崩溃,框架会发送消息恢复:Restarting->Started。这个流程与akka的恢复过程非常不同(Stopped->Restarted).
不要跨服Watch
熟悉AKKA的开发人员比较清楚,child actor是允许跨进程、跨服务节点创建的。但框架的remote包在设计上存在不足,一旦节点失效,即使恢复,两个节点之间也无法建立连接,导致无法接受消息。而框架本来强调的grain
,必须在cluster模式下,基于watch
、unwatch
,所以没法正常使用。
最初在remote模式下发现这问题,原因是:
无法监控
异步框架中,如果没有监控,相当于开飞机盲降,问题诊断会非常困难。
protoactor在不同actor之间的消息传递,是通过定义MessageEnvelop
来实现的,三个属性都有着非常重要的作用:
//actor之间传递消息时的信封
type MessageEnvelope struct {
Header messageHeader //业务之外扩展信息的头,通常用于统计、监控,更多在拦截器中使用
Message interface{} //具体de业务消息
Sender *PID //消息发送者,但发送者调用Tell方法,则Sender为空
}
不过,无解的是Header在拦截器中使用之后,默认用的全局Header信息。Request代码如下:
func (ctx *localContext) Request(pid *PID, message interface{}) {
env := &MessageEnvelope{
Header: nil, //<--如果Actor接受了之前消息,需要传递Header是没办法的,这里重置为nil
Message: message,
Sender: ctx.Self(),
}
ctx.sendUserMessage(pid, env)
}
很明显可以看出,当actor向另一个actor发送消息的时候,Header被重置为nil
,类似的Tell
方法同样如此。在github上在沟通后,说是考虑到性能。要是增加一个能够传递Header的函数Forward
多好:
func (ctx *localContext) Forward(pid *PID, message interface{}) {
env := &MessageEnvelope{
Header: ctx.Header, //<--假定有这个方法能够获取当前ctx的Header信息,如果为nil,则获取全局Header
Message: message,
Sender: ctx.Self(),
}
ctx.sendUserMessage(pid, env)
}
此外,在不同节点(或不同进程)传输时,最初Header没有定义,但17年年底增加了这个属性,意味着在第一个actor接收消息的时候,能够拦截到Header信息。
没有原生schedule
actor的定时状态通常都是利用定时消息提供,尽管go原生的timer很好用,但开销不小,并且actor退出时,没法即使撤销对其引用。不过,通常业务的actor规模并不大,并且自己实现也比较简单。
没有原生eventbus
框架的消息并不提供分类、主题的投递,只有一个eventstream
提供所有actor的广播。在尝试抄AKKA的eventbus
确实复杂,发现go语言实现确实非常复杂,最终放弃。
尽管很多年前因为折腾AKKA,就理解了异步队列、Actor的实现方式,但都没有在代码规模中如此大规模的应用。在深入应用了Actor之后,不得不说与CSP有巨大的差异。相对而言两者应用场景很大不同:
- CSP 关注队列,偏底层,更轻,简单异步处理
- Actor 关注实例状态,重,默认在实例外加了个壳子(pid+context,AKKA的是actorRef+Context),封装两个队列(MSPC、RingBuffer)三个集合(children、watchers、watchees),更能处理复杂业务逻辑。
客观的说,ProtoActor的开源创造者考虑还是非常全,在序列化方式、消息格式确定之后,先后构造了go、c#、kotlin(这个是个demo的demo),不同节点可以用这几种语言分别实现。源代码也很值得学习(建议使用能够追踪接口定义与实现代码的IDE)。从更新上看,c#更快(没明白为什么不用微软官方的orlean),go语言这边更多是跟随c#的版本迭代,似乎没什么话语权。
整体而言,ProtoActor无论是自身代码设计(actor的接口定义、remote包中的配置文件命名等)、还是受制于go语言的限制(可见性太差、无泛型、无抽象类),确实有待于继续改进。但对于中小规模项目,整天还因为业务中Mutex
带来的死锁,急需寻找出路,不妨换种思维试试。自己过去一年,尽管使用Mutex
的能力急剧下降,但业务开发上确实easy了很多,实现服务端的AOI以前非常费神的架构快速了不少。而且,相对于scala,启动速度、内存占用确实好得太多太多(我没说go绝对比scala跑得快,别喷我)^_^。