当前位置: 首页 > 工具软件 > OpenEdge > 使用案例 >

openedge-function模块浅析——百度BIE边缘侧openedge项目源码阅读(3)

王高超
2023-12-01

前言

中断了一段时间,发现前面分析的hub模块的源码拉错分枝了(对,我就是个菜鸡),不过大致流程差不多,有时间改一下。这次分析openedge-function模块,openedge-function模块比较简单,很多核心功能其实还是依赖于master和hub的。

openedge-function启动

下面将openedge-function模块简称为function模块

入口函数是openedge-0.1.1/openedge-function/main.go,其核心代码如下:

func main(){
 	···
 	m, err := New(f.Config)
 	···
 	err = m.Start()
 	···
 }

也就是分为两个部分,一个是New一个mo(也就是上面的m),之后启动这个mo。

New mo

核心代码如下:

// New creates a new module
func New(confDate string) (module.Module, error) {
	var cfg Config
	err := module.Load(&cfg, confDate)
	···
	defaults(&cfg)
	···
	man, err := NewManager(cfg)
	···
	mo{
		cfg: cfg,
		man: man,
		rrs: []*ruler{},
		log: logger.WithFields(),
	}
	for _, r := range cfg.Rules {
		f, err := man.Get(r.Compute.Function)
		if err != nil {
			m.Close()
			return nil, err
		}
		rr, err := create(r, cfg.Hub, f)
		···
		m.rrs = append(m.rrs, rr)
	}
	return m, nil
}

前面几行就是为了加载配置,这里面提一下default函数:

func defaults(cfg *Config) {
	if cfg.API.Address == "" {
		cfg.API.Address = utils.GetEnv(module.EnvOpenEdgeMasterAPI)
	}
	cfg.API.Username = cfg.UniqueName()
	cfg.API.Password = utils.GetEnv(module.EnvOpenEdgeModuleToken)
}

这里满就是为了设置master API的地址,这个就是master模块中创建API的缘故,也是启动、关闭函数模块的关键。
配置设置好后,开始创建Manager:

// NewManager loads all functions and return
func NewManager(c Config) (*Manager, error) {
	cli, err := master.NewClient(c.API)
	···
	m := &Manager{
		cfg: c,
		cli: cli,
		fcs: make(map[string]*Function),
		log: logger.WithFields("manager", "function"),
	}
	for _, fc := range c.Functions {
		m.fcs[fc.Name] = newFunction(m, fc)
	}
	return m, nil
}

cli是为了与master节点(API)通信的,fcs中存储了所有我们定义的函数,其核心就在于newFunction方法:

func newFunction(m *Manager, c config.Function) *Function {
	···
	f.pool = pool.NewObjectPool(context.Background(), newFuncionFactory(f), pc)
	return f
}

这里面创建一个对象池,这个对象池里面用于对我们刚刚定义的Function进行管理。
Manager创建过后,回到New方法,接下来用于创建mo,其中最重要的是如下部分:

// New creates a new module
func New(confDate string) (module.Module, error) {
	···
	for _, r := range cfg.Rules {
		···
		rr, err := create(r, cfg.Hub, f)
		···
		m.rrs = append(m.rrs, rr)
	}
	return m, nil
}

循环里面的f就是我们配置文件定义的函数,然后使用create方法创建ruler,并添加至mo的rrs中(rrs就是用来存放ruler的)。

func create(r Rule, cc config.MQTTClient, f *Function) (*ruler, error) {
	···
	cc.Subscriptions = []config.Subscription{config.Subscription{Topic: r.Subscribe.Topic, QOS: r.Subscribe.QOS}}
	fd, err := NewDispatcher(f)
	···
	return &ruler{
		r:  &r,
		fd: fd,
		md: mqtt.NewDispatcher(cc),
	}, nil
}

create方法创建了ruler,其中有两个属性最为重要,一个是fd,另一个是md,fd根据用途我猜应该是Function Dispatcher,md是MQTT Dispatcher,前者主要用于通过使用master API启动一个函数运算模块(后面会提到),后者主要进行MQTT消息转发处理,里面存放了每个函数订阅的主题信息,两者是互相配合工作的(后面会提到)。

mo start

核心代码如下:

// Start starts module
func (m *mo) Start() error {
	for _, rr := range m.rrs {
		err := rr.start()
		···
	}
	return nil
}

其核心就是启动ruler:

func (rr *ruler) start() error {
	···
	h := mqtt.Handler{}
	h.ProcessPublish = func(p *packet.Publish) error {
		return rr.fd.Invoke(p)
	}
	···
	return rr.md.Start(h)
}

这个方法先是设置了MQTT消息类型为publish的方法,也就是调用Function Dispatcher的invoke方法(这个方法会讲到),然后启动MQTT,MQTT启动方法如下:

// Start starts dispatcher
func (d *Dispatcher) Start(h Handler) error {
	return d.tomb.Go(func() error {
		return d.supervisor(h)
	})
}

// Supervisor the supervised reconnect loop
func (d *Dispatcher) supervisor(handler Handler) error {
	···
	client, err := NewClient(d.config, handler)
	···
	// run dispatcher on client
	current, dying = d.dispatcher(client, current)
	···
}

// NewClient returns a new client
func NewClient(cc config.MQTTClient, handler Handler) (*Client, error) {
	···
	c := &Client{
		···
	}
	err = c.connect()
	···
	return c, nil
}

首先调用supervisor方法循环启动(启动失败了再重启),然后调用NewClient方法创建一个新的MQTT连接,这里面c.connect()中涉及了一个connect方法,这个后面会用到:

func (c *Client) connect() (err error) {
	// allocate packet
	connect := packet.NewConnect()
	···
	// send connect packet
	err = c.send(connect, false)
	···
	// start process routine
	c.tomb.Go(c.processor)
	···
	// allocate subscribe packet
	subscribe := packet.NewSubscribe()
	subscribe.ID = 1
	subscribe.Subscriptions = c.config.GetSubscriptions()
	···
	// send packet
	err = c.send(subscribe, true)
	if err != nil {
		return c.die(err)
	}
	return nil
}

// processes incoming packets
func (c *Client) processor() error {
	···
	for {
		// get next packet from connection
		pkt, err := c.conn.Receive()
		···
		switch p := pkt.(type) {
		case *packet.Publish:
				err = c.handler.ProcessPublish(p)
		case *packet.Puback:
				err = c.handler.ProcessPuback(p)	
		···
	}
}

首先创建一个真正的MQTT连接——connect(真正是表示这个是由其他库完成的),然后发送连接包,并注册相关MQTT包的处理方法(processor方法),然后发送进行相关主题的订阅。这里面processor方法里面,处理Publish(ProcessPublish)、Puback(ProcessPuback)方法不就是在ruler的start方法中定义的ProcessPublish吗!

func (rr *ruler) start() error {
	···
	h := mqtt.Handler{}
	h.ProcessPublish = func(p *packet.Publish) error {
		//TODO
		//Blank:开启方法的开端
		return rr.fd.Invoke(p)
	}
	···
	return rr.md.Start(h)
}

重新回到supervisor方法,里面剩下一个dispatch方法没有解释:

// reads from the queues and calls the current client
func (d *Dispatcher) dispatcher(client *Client, current packet.Generic) (packet.Generic, bool) {
	···
	if current != nil {
		err := client.Send(current)
		···
	}

	for {
		select {
		case pkt := <-d.channel:
			err := client.Send(pkt)
			if err != nil {
				return pkt, false
			}
		case <-client.Dying():
			return nil, false
		case <-d.tomb.Dying():
			return nil, true
		}
	}
}

这个方法其实就是读取dispatcher中的消息,然后进行发送至hub模块,也就是真正进行与hub模块的通信。
这样,function模块的启动就介绍完了,主要是为每一个函数封装为一个ruler,ruler中有一个Funtion Dispatcher和一个MQTT Dispatcher,后者用于订阅、发布消息,当MQTT Dispatcher收到消息后由Function Dispatcher找到对应的function,然后向master API发送创建一个函数模块的请求,由master API负责启动模块
另外master启动函数模块后,函数模块会启动一个小的grpc服务器,由function通过rpc调用函数模块的Handle方法,Handle方法会打开我们定义的python文件,然后运行,并将运行结果返回给function模块,function模块获得返回值后调用在ruler.fd(Function Dispatcher)中设置的callback,将结果发送至hub模块,由hub模块负责进行转发消息。

 类似资料: