当前位置: 首页 > 工具软件 > OpenEdge > 使用案例 >

openedge-function模块进行函数运行源码浅析——百度BIE边缘侧openedge项目源码阅读(4)

姚昊焱
2023-12-01

前言

之前的一篇文章介绍了openedge-function模块(下面简称function模块)进行初始化的部分,下面整体过一下启动一个函数的流程。

接收到消息

之前介绍过,ruler是整个function模块的核心,其负责进行接收启动函数的命令以及向master模块请求启动模块,并将函数运行后的结果发送至hub模块中。
下面看一下接收到消息的代码:

func (rr *ruler) start() error {
	rr.fd.SetCallback(func(pkt *packet.Publish) {
		···
		if pkt.Message.Payload != nil {
			···
			err := rr.md.Send(pkt)
			···
		}
		···
	})
	h := mqtt.Handler{}
	h.ProcessPublish = func(p *packet.Publish) error {
		return rr.fd.Invoke(p)
	}
	···
	return rr.md.Start(h)
}

里面由两个重要点,一个是callback方法,另一个是ProcessPublish方法。其中,ProcessPublish就是接收到Publish的消息后的处理方法,我们接着看rr.fd.Invoke(p)方法:(fd我们就暂时称为Function Dispatcher好了)

// Invoke invokes a function
func (d *Dispatcher) Invoke(pkt *packet.Publish) error {
	select {
	case d.buffer <- struct{}{}:
	case <-d.tomb.Dying():
		return ErrDispatcherClosed
	}
	go func(pub *packet.Publish) {
		msg := &runtime.Message{
			QOS:     uint32(pub.Message.QOS),
			Topic:   pub.Message.Topic,
			Payload: pub.Message.Payload,
		}
		msg, err := d.function.Invoke(msg)
		if err != nil {
			pub.Message.Payload = utils.MakeErrorPayload(pub, err)
		} else {
			pub.Message.Payload = msg.Payload
		}
		if d.callback != nil {
			d.callback(pub)
		}
		<-d.buffer
	}(pkt)
	return nil
}

首先其使用function.Invoke(msg)方法,然后调用了callback方法,这个callback是函数模块执行完毕后返回的值,在讲返回的时候再讲这里。下面看看function.Invoke(msg)方法:

// Invoke call funtion to handle message and return result message
func (f *Function) Invoke(msg *runtime.Message) (*runtime.Message, error) {
	item, err := f.pool.BorrowObject(context.Background())
	···
	fl := item.(*funclet)
	res, err := fl.handle(msg)
	···
	f.pool.ReturnObject(context.Background(), item)
	return res, nil

}

这里面调用了BorrowObject方法,这个是一个go的对象池的一个方法,从其源码中看,最终调用的是pool_factory的MakeObject方法:

func (f *functionFactory) MakeObject(ctx context.Context) (*pool.PooledObject, error) {
	//Blank:通知master,开启一个module
	fl, err := f.newFunclet()
	if err != nil {
		return nil, err
	}
	return pool.NewPooledObject(fl), nil
}

func (f *Function) newFunclet() (*funclet, error) {
	···
	fl := &funclet{
		···
	}
	···
	err := fl.start()
	return fl, nil
}
func (fl *funclet) start() error {
	host, port := fl.id, 50051
	···
	rc := config.Runtime{}
	rc.Name = fl.id
	rc.Function = fl.cfg
	rc.Server.Address = fmt.Sprintf("%s:%d", host, port)
	···
	err = fl.man.cli.StartModule(&mc)
	···
	cc := config.NewRuntimeClient(fmt.Sprintf("%s:%d", host, port))
	cc.RuntimeServer = rc.Server
	cc.Backoff.Max = rc.Server.Timeout
	fl.rtc, err = runtime.NewClient(cc)
	if err != nil {
		fl.log.WithError(err).Errorf("failed to create runtime client")
	}
	return err
}

这里做了两个分支,一个是fl.man.cli.StartModule方法,是通过master API发送开启模块的请求,然后master启动了一个函数模块,并返回一个funclet对象,但是函数模块现在还没有进行函数调用,这也是function.handle(msg)模块剩下的一部分,通过funclet模块,执行handle函数:

// Invoke call funtion to handle message and return result message
func (f *Function) Invoke(msg *runtime.Message) (*runtime.Message, error) {
	item, err := f.pool.BorrowObject(context.Background())
	···
	fl := item.(*funclet)
	res, err := fl.handle(msg)
	···
	f.pool.ReturnObject(context.Background(), item)
	return res, nil
}

func (fl *funclet) handle(msg *runtime.Message) (*runtime.Message, error) {
	msg.FunctionName = fl.cfg.Name
	if msg.FunctionInvokeID == "" {
		msg.FunctionInvokeID = uuid.Generate().String()
	}
	return fl.rtc.Handle(msg)
}

// Handle sends request to function server
func (c *Client) Handle(in *Message) (*Message, error) {
	···
	return c.cli.Handle(ctx, in, callopt)
}

在funclet方法中,设置了函数模块进行方法调用的时候的参数,到最后的Client的c.cli.Handle(ctx, in, callopt),通过GRPC的方式发送到了函数模块中。
函数模块入口如下:

if __name__ == '__main__':
  	···
    m = mo()
    m.Load(args.c)
    m.Start()
	···

Load函数如下:

def Load(self, conf):
        ···
        sys.path.append(self.config['function']['codedir'])
        module_handler = self.config['function']['handler'].split('.')
        handler_name = module_handler.pop()
        module = importlib.import_module('.'.join(module_handler))
        self.function = getattr(module, handler_name)
		···
        self.server = grpc.server(thread_pool=futures.ThreadPoolExecutor(),
                                  options=[('grpc.max_send_message_length', max_message_size),
                                           ('grpc.max_receive_message_length', max_message_size)])
        openedge_function_runtime_pb2_grpc.add_RuntimeServicer_to_server(
            self, self.server)
        ···
        self.server.add_insecure_port(self.config['server']['address'])

这里可以看到,在函数模块被master启动后,就已经初始化好了需要调用的方法(在self.function = getattr(module, handler_name))以及开启了grpc server,这里也可以看到,之前在funclet的start方法中设置的address和port就是为了开启grpc server的(self.server.add_insecure_port(self.config['server']['address']))。这里需要注意一行代码openedge_function_runtime_pb2_grpc.add_RuntimeServicer_to_server(self, self.server),进入到这个方法中:

def add_RuntimeServicer_to_server(servicer, server):
  rpc_method_handlers = {
      'Handle': grpc.unary_unary_rpc_method_handler(
          servicer.Handle,
          request_deserializer=openedge__function__runtime__pb2.Message.FromString,
          response_serializer=openedge__function__runtime__pb2.Message.SerializeToString,
      ),
  }
  generic_handler = grpc.method_handlers_generic_handler(
      'runtime.Runtime', rpc_method_handlers)
  server.add_generic_rpc_handlers((generic_handler,))

因为我对python也不是很熟悉,这里根据我的理解是把Handle方法进行绑定,如果有请求调用Handle方法的话,那么就调用这个servicer.Handle方法,接着我们看看这个Handle方法是什么:

def Handle(self, request, context):
        """
        handle request
        """
        ctx = {}
        ctx['messageQOS'] = request.QOS
        ctx['messageTopic'] = request.Topic
        ctx['functionName'] = request.FunctionName
        ctx['functionInvokeID'] = request.FunctionInvokeID
        ctx['invokeid'] = request.FunctionInvokeID
        if request.Payload:
            try:
                msg = json.loads(request.Payload)
            except ValueError:
                msg = request.Payload  # raw data, not json format
        msg = self.function(msg, ctx)
        if msg is None:
            request.Payload = b''
        else:
            request.Payload = json.dumps(msg)
        return request

可以看到这个Handle方法就是把发送过来的数据发送给要调用的方法msg = self.function(msg, ctx),然后把返回值返回到哪里呢?
通过rpc调用,返回值又从函数模块返回了function模块中,我们看看发送grpc的最后一部分:

// Handle sends request to function server
func (c *Client) Handle(in *Message) (*Message, error) {
	ctx, cancel := context.WithTimeout(context.TODO(), c.conf.Timeout)
	defer cancel()
	return c.cli.Handle(ctx, in, callopt)
}

这里的return c.cli.Handle(ctx, in, callopt)返回的就是函数模块返回的消息(RPC就是远程过程调用,调用远程接口就像调用本地方法一样),然后追着调用链逐步向上寻找,找到了function.Invoke(msg)方法:

// Invoke call funtion to handle message and return result message
func (f *Function) Invoke(msg *runtime.Message) (*runtime.Message, error) {
	item, err := f.pool.BorrowObject(context.Background())
	···
	fl := item.(*funclet)
	res, err := fl.handle(msg)
	···
	f.pool.ReturnObject(context.Background(), item)
	return res, nil

}

这里又把这个response返回,看看返回到哪里:

// Invoke invokes a function
func (d *Dispatcher) Invoke(pkt *packet.Publish) error {
	select {
	case d.buffer <- struct{}{}:
	case <-d.tomb.Dying():
		return ErrDispatcherClosed
	}
	go func(pub *packet.Publish) {
		msg := &runtime.Message{
			QOS:     uint32(pub.Message.QOS),
			Topic:   pub.Message.Topic,
			Payload: pub.Message.Payload,
		}
		msg, err := d.function.Invoke(msg)
		if err != nil {
			pub.Message.Payload = utils.MakeErrorPayload(pub, err)
		} else {
			pub.Message.Payload = msg.Payload
		}
		if d.callback != nil {
			d.callback(pub)
		}
		<-d.buffer
	}(pkt)
	return nil
}

返回到了dispatcher的Invoke方法,在这里有一行代码d.callback(pub),这里将返回值(把response的内容放到了msg中)放入了callback的参数中,这时候我们再看看这个callback方法:

代码原本在ruler的start方法中
rr.fd.SetCallback(func(pkt *packet.Publish) {
		subqos := pkt.Message.QOS
		if pkt.Message.Payload != nil {
			if pkt.Message.QOS > rr.r.Publish.QOS {
				pkt.Message.QOS = rr.r.Publish.QOS
			}
			pkt.Message.Topic = rr.r.Publish.Topic
			err := rr.md.Send(pkt)
			if err != nil {
				return
			}
		}
		if subqos == 1 && (rr.r.Publish.QOS == 0 || pkt.Message.Payload == nil) {
			puback := packet.NewPuback()
			puback.ID = pkt.ID
			rr.md.Send(puback)
		}
	})

这里把response进行发送,这里有一句代码pkt.Message.Topic = rr.r.Publish.Topic,这里把消息的topic设置为在定义module.yml中的publish属性,也就是示例中的t/hi中,这也正是function模块接收主题为“t”的消息,然后把执行后的结果发送至“t/hi”的原因。然后调用err := rr.md.Send(pkt),通过MQTT Dispatcher将消息发送至hub中。

 类似资料: