之前的一篇文章介绍了openedge-function模块(下面简称function模块)进行初始化的部分,下面整体过一下启动一个函数的流程。
之前介绍过,ruler是整个function模块的核心,其负责进行接收启动函数的命令以及向master模块请求启动模块,并将函数运行后的结果发送至hub模块中。
下面看一下接收到消息的代码:
func (rr *ruler) start() error {
rr.fd.SetCallback(func(pkt *packet.Publish) {
···
if pkt.Message.Payload != nil {
···
err := rr.md.Send(pkt)
···
}
···
})
h := mqtt.Handler{}
h.ProcessPublish = func(p *packet.Publish) error {
return rr.fd.Invoke(p)
}
···
return rr.md.Start(h)
}
里面由两个重要点,一个是callback方法,另一个是ProcessPublish方法。其中,ProcessPublish就是接收到Publish的消息后的处理方法,我们接着看rr.fd.Invoke(p)
方法:(fd我们就暂时称为Function Dispatcher好了)
// Invoke invokes a function
func (d *Dispatcher) Invoke(pkt *packet.Publish) error {
select {
case d.buffer <- struct{}{}:
case <-d.tomb.Dying():
return ErrDispatcherClosed
}
go func(pub *packet.Publish) {
msg := &runtime.Message{
QOS: uint32(pub.Message.QOS),
Topic: pub.Message.Topic,
Payload: pub.Message.Payload,
}
msg, err := d.function.Invoke(msg)
if err != nil {
pub.Message.Payload = utils.MakeErrorPayload(pub, err)
} else {
pub.Message.Payload = msg.Payload
}
if d.callback != nil {
d.callback(pub)
}
<-d.buffer
}(pkt)
return nil
}
首先其使用function.Invoke(msg)方法,然后调用了callback方法,这个callback是函数模块执行完毕后返回的值,在讲返回的时候再讲这里。下面看看function.Invoke(msg)方法:
// Invoke call funtion to handle message and return result message
func (f *Function) Invoke(msg *runtime.Message) (*runtime.Message, error) {
item, err := f.pool.BorrowObject(context.Background())
···
fl := item.(*funclet)
res, err := fl.handle(msg)
···
f.pool.ReturnObject(context.Background(), item)
return res, nil
}
这里面调用了BorrowObject方法,这个是一个go的对象池的一个方法,从其源码中看,最终调用的是pool_factory的MakeObject方法:
func (f *functionFactory) MakeObject(ctx context.Context) (*pool.PooledObject, error) {
//Blank:通知master,开启一个module
fl, err := f.newFunclet()
if err != nil {
return nil, err
}
return pool.NewPooledObject(fl), nil
}
func (f *Function) newFunclet() (*funclet, error) {
···
fl := &funclet{
···
}
···
err := fl.start()
return fl, nil
}
func (fl *funclet) start() error {
host, port := fl.id, 50051
···
rc := config.Runtime{}
rc.Name = fl.id
rc.Function = fl.cfg
rc.Server.Address = fmt.Sprintf("%s:%d", host, port)
···
err = fl.man.cli.StartModule(&mc)
···
cc := config.NewRuntimeClient(fmt.Sprintf("%s:%d", host, port))
cc.RuntimeServer = rc.Server
cc.Backoff.Max = rc.Server.Timeout
fl.rtc, err = runtime.NewClient(cc)
if err != nil {
fl.log.WithError(err).Errorf("failed to create runtime client")
}
return err
}
这里做了两个分支,一个是fl.man.cli.StartModule方法,是通过master API发送开启模块的请求,然后master启动了一个函数模块,并返回一个funclet对象,但是函数模块现在还没有进行函数调用,这也是function.handle(msg)模块剩下的一部分,通过funclet模块,执行handle函数:
// Invoke call funtion to handle message and return result message
func (f *Function) Invoke(msg *runtime.Message) (*runtime.Message, error) {
item, err := f.pool.BorrowObject(context.Background())
···
fl := item.(*funclet)
res, err := fl.handle(msg)
···
f.pool.ReturnObject(context.Background(), item)
return res, nil
}
func (fl *funclet) handle(msg *runtime.Message) (*runtime.Message, error) {
msg.FunctionName = fl.cfg.Name
if msg.FunctionInvokeID == "" {
msg.FunctionInvokeID = uuid.Generate().String()
}
return fl.rtc.Handle(msg)
}
// Handle sends request to function server
func (c *Client) Handle(in *Message) (*Message, error) {
···
return c.cli.Handle(ctx, in, callopt)
}
在funclet方法中,设置了函数模块进行方法调用的时候的参数,到最后的Client的c.cli.Handle(ctx, in, callopt)
,通过GRPC的方式发送到了函数模块中。
函数模块入口如下:
if __name__ == '__main__':
···
m = mo()
m.Load(args.c)
m.Start()
···
Load函数如下:
def Load(self, conf):
···
sys.path.append(self.config['function']['codedir'])
module_handler = self.config['function']['handler'].split('.')
handler_name = module_handler.pop()
module = importlib.import_module('.'.join(module_handler))
self.function = getattr(module, handler_name)
···
self.server = grpc.server(thread_pool=futures.ThreadPoolExecutor(),
options=[('grpc.max_send_message_length', max_message_size),
('grpc.max_receive_message_length', max_message_size)])
openedge_function_runtime_pb2_grpc.add_RuntimeServicer_to_server(
self, self.server)
···
self.server.add_insecure_port(self.config['server']['address'])
这里可以看到,在函数模块被master启动后,就已经初始化好了需要调用的方法(在self.function = getattr(module, handler_name)
)以及开启了grpc server,这里也可以看到,之前在funclet的start方法中设置的address和port就是为了开启grpc server的(self.server.add_insecure_port(self.config['server']['address'])
)。这里需要注意一行代码openedge_function_runtime_pb2_grpc.add_RuntimeServicer_to_server(self, self.server)
,进入到这个方法中:
def add_RuntimeServicer_to_server(servicer, server):
rpc_method_handlers = {
'Handle': grpc.unary_unary_rpc_method_handler(
servicer.Handle,
request_deserializer=openedge__function__runtime__pb2.Message.FromString,
response_serializer=openedge__function__runtime__pb2.Message.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'runtime.Runtime', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
因为我对python也不是很熟悉,这里根据我的理解是把Handle方法进行绑定,如果有请求调用Handle方法的话,那么就调用这个servicer.Handle方法,接着我们看看这个Handle方法是什么:
def Handle(self, request, context):
"""
handle request
"""
ctx = {}
ctx['messageQOS'] = request.QOS
ctx['messageTopic'] = request.Topic
ctx['functionName'] = request.FunctionName
ctx['functionInvokeID'] = request.FunctionInvokeID
ctx['invokeid'] = request.FunctionInvokeID
if request.Payload:
try:
msg = json.loads(request.Payload)
except ValueError:
msg = request.Payload # raw data, not json format
msg = self.function(msg, ctx)
if msg is None:
request.Payload = b''
else:
request.Payload = json.dumps(msg)
return request
可以看到这个Handle方法就是把发送过来的数据发送给要调用的方法msg = self.function(msg, ctx)
,然后把返回值返回到哪里呢?
通过rpc调用,返回值又从函数模块返回了function模块中,我们看看发送grpc的最后一部分:
// Handle sends request to function server
func (c *Client) Handle(in *Message) (*Message, error) {
ctx, cancel := context.WithTimeout(context.TODO(), c.conf.Timeout)
defer cancel()
return c.cli.Handle(ctx, in, callopt)
}
这里的return c.cli.Handle(ctx, in, callopt)
返回的就是函数模块返回的消息(RPC就是远程过程调用,调用远程接口就像调用本地方法一样),然后追着调用链逐步向上寻找,找到了function.Invoke(msg)方法:
// Invoke call funtion to handle message and return result message
func (f *Function) Invoke(msg *runtime.Message) (*runtime.Message, error) {
item, err := f.pool.BorrowObject(context.Background())
···
fl := item.(*funclet)
res, err := fl.handle(msg)
···
f.pool.ReturnObject(context.Background(), item)
return res, nil
}
这里又把这个response返回,看看返回到哪里:
// Invoke invokes a function
func (d *Dispatcher) Invoke(pkt *packet.Publish) error {
select {
case d.buffer <- struct{}{}:
case <-d.tomb.Dying():
return ErrDispatcherClosed
}
go func(pub *packet.Publish) {
msg := &runtime.Message{
QOS: uint32(pub.Message.QOS),
Topic: pub.Message.Topic,
Payload: pub.Message.Payload,
}
msg, err := d.function.Invoke(msg)
if err != nil {
pub.Message.Payload = utils.MakeErrorPayload(pub, err)
} else {
pub.Message.Payload = msg.Payload
}
if d.callback != nil {
d.callback(pub)
}
<-d.buffer
}(pkt)
return nil
}
返回到了dispatcher的Invoke方法,在这里有一行代码d.callback(pub)
,这里将返回值(把response的内容放到了msg中)放入了callback的参数中,这时候我们再看看这个callback方法:
代码原本在ruler的start方法中
rr.fd.SetCallback(func(pkt *packet.Publish) {
subqos := pkt.Message.QOS
if pkt.Message.Payload != nil {
if pkt.Message.QOS > rr.r.Publish.QOS {
pkt.Message.QOS = rr.r.Publish.QOS
}
pkt.Message.Topic = rr.r.Publish.Topic
err := rr.md.Send(pkt)
if err != nil {
return
}
}
if subqos == 1 && (rr.r.Publish.QOS == 0 || pkt.Message.Payload == nil) {
puback := packet.NewPuback()
puback.ID = pkt.ID
rr.md.Send(puback)
}
})
这里把response进行发送,这里有一句代码pkt.Message.Topic = rr.r.Publish.Topic
,这里把消息的topic设置为在定义module.yml中的publish属性,也就是示例中的t/hi中,这也正是function模块接收主题为“t”的消息,然后把执行后的结果发送至“t/hi”的原因。然后调用err := rr.md.Send(pkt)
,通过MQTT Dispatcher将消息发送至hub中。