中断了一段时间,发现前面分析的hub模块的源码拉错分枝了(对,我就是个菜鸡),不过大致流程差不多,有时间改一下。这次分析openedge-function模块,openedge-function模块比较简单,很多核心功能其实还是依赖于master和hub的。
下面将openedge-function模块简称为function模块
入口函数是openedge-0.1.1/openedge-function/main.go,其核心代码如下:
func main(){
···
m, err := New(f.Config)
···
err = m.Start()
···
}
也就是分为两个部分,一个是New一个mo(也就是上面的m),之后启动这个mo。
核心代码如下:
// New creates a new module
func New(confDate string) (module.Module, error) {
var cfg Config
err := module.Load(&cfg, confDate)
···
defaults(&cfg)
···
man, err := NewManager(cfg)
···
mo{
cfg: cfg,
man: man,
rrs: []*ruler{},
log: logger.WithFields(),
}
for _, r := range cfg.Rules {
f, err := man.Get(r.Compute.Function)
if err != nil {
m.Close()
return nil, err
}
rr, err := create(r, cfg.Hub, f)
···
m.rrs = append(m.rrs, rr)
}
return m, nil
}
前面几行就是为了加载配置,这里面提一下default函数:
func defaults(cfg *Config) {
if cfg.API.Address == "" {
cfg.API.Address = utils.GetEnv(module.EnvOpenEdgeMasterAPI)
}
cfg.API.Username = cfg.UniqueName()
cfg.API.Password = utils.GetEnv(module.EnvOpenEdgeModuleToken)
}
这里满就是为了设置master API的地址,这个就是master模块中创建API的缘故,也是启动、关闭函数模块的关键。
配置设置好后,开始创建Manager:
// NewManager loads all functions and return
func NewManager(c Config) (*Manager, error) {
cli, err := master.NewClient(c.API)
···
m := &Manager{
cfg: c,
cli: cli,
fcs: make(map[string]*Function),
log: logger.WithFields("manager", "function"),
}
for _, fc := range c.Functions {
m.fcs[fc.Name] = newFunction(m, fc)
}
return m, nil
}
cli是为了与master节点(API)通信的,fcs中存储了所有我们定义的函数,其核心就在于newFunction方法:
func newFunction(m *Manager, c config.Function) *Function {
···
f.pool = pool.NewObjectPool(context.Background(), newFuncionFactory(f), pc)
return f
}
这里面创建一个对象池,这个对象池里面用于对我们刚刚定义的Function进行管理。
Manager创建过后,回到New方法,接下来用于创建mo,其中最重要的是如下部分:
// New creates a new module
func New(confDate string) (module.Module, error) {
···
for _, r := range cfg.Rules {
···
rr, err := create(r, cfg.Hub, f)
···
m.rrs = append(m.rrs, rr)
}
return m, nil
}
循环里面的f就是我们配置文件定义的函数,然后使用create方法创建ruler,并添加至mo的rrs中(rrs就是用来存放ruler的)。
func create(r Rule, cc config.MQTTClient, f *Function) (*ruler, error) {
···
cc.Subscriptions = []config.Subscription{config.Subscription{Topic: r.Subscribe.Topic, QOS: r.Subscribe.QOS}}
fd, err := NewDispatcher(f)
···
return &ruler{
r: &r,
fd: fd,
md: mqtt.NewDispatcher(cc),
}, nil
}
create方法创建了ruler,其中有两个属性最为重要,一个是fd,另一个是md,fd根据用途我猜应该是Function Dispatcher,md是MQTT Dispatcher,前者主要用于通过使用master API启动一个函数运算模块(后面会提到),后者主要进行MQTT消息转发处理,里面存放了每个函数订阅的主题信息,两者是互相配合工作的(后面会提到)。
核心代码如下:
// Start starts module
func (m *mo) Start() error {
for _, rr := range m.rrs {
err := rr.start()
···
}
return nil
}
其核心就是启动ruler:
func (rr *ruler) start() error {
···
h := mqtt.Handler{}
h.ProcessPublish = func(p *packet.Publish) error {
return rr.fd.Invoke(p)
}
···
return rr.md.Start(h)
}
这个方法先是设置了MQTT消息类型为publish的方法,也就是调用Function Dispatcher的invoke方法(这个方法会讲到),然后启动MQTT,MQTT启动方法如下:
// Start starts dispatcher
func (d *Dispatcher) Start(h Handler) error {
return d.tomb.Go(func() error {
return d.supervisor(h)
})
}
// Supervisor the supervised reconnect loop
func (d *Dispatcher) supervisor(handler Handler) error {
···
client, err := NewClient(d.config, handler)
···
// run dispatcher on client
current, dying = d.dispatcher(client, current)
···
}
// NewClient returns a new client
func NewClient(cc config.MQTTClient, handler Handler) (*Client, error) {
···
c := &Client{
···
}
err = c.connect()
···
return c, nil
}
首先调用supervisor方法循环启动(启动失败了再重启),然后调用NewClient方法创建一个新的MQTT连接,这里面c.connect()
中涉及了一个connect方法,这个后面会用到:
func (c *Client) connect() (err error) {
// allocate packet
connect := packet.NewConnect()
···
// send connect packet
err = c.send(connect, false)
···
// start process routine
c.tomb.Go(c.processor)
···
// allocate subscribe packet
subscribe := packet.NewSubscribe()
subscribe.ID = 1
subscribe.Subscriptions = c.config.GetSubscriptions()
···
// send packet
err = c.send(subscribe, true)
if err != nil {
return c.die(err)
}
return nil
}
// processes incoming packets
func (c *Client) processor() error {
···
for {
// get next packet from connection
pkt, err := c.conn.Receive()
···
switch p := pkt.(type) {
case *packet.Publish:
err = c.handler.ProcessPublish(p)
case *packet.Puback:
err = c.handler.ProcessPuback(p)
···
}
}
首先创建一个真正的MQTT连接——connect(真正是表示这个是由其他库完成的),然后发送连接包,并注册相关MQTT包的处理方法(processor方法),然后发送进行相关主题的订阅。这里面processor方法里面,处理Publish(ProcessPublish)、Puback(ProcessPuback)方法不就是在ruler的start方法中定义的ProcessPublish吗!
func (rr *ruler) start() error {
···
h := mqtt.Handler{}
h.ProcessPublish = func(p *packet.Publish) error {
//TODO
//Blank:开启方法的开端
return rr.fd.Invoke(p)
}
···
return rr.md.Start(h)
}
重新回到supervisor方法,里面剩下一个dispatch方法没有解释:
// reads from the queues and calls the current client
func (d *Dispatcher) dispatcher(client *Client, current packet.Generic) (packet.Generic, bool) {
···
if current != nil {
err := client.Send(current)
···
}
for {
select {
case pkt := <-d.channel:
err := client.Send(pkt)
if err != nil {
return pkt, false
}
case <-client.Dying():
return nil, false
case <-d.tomb.Dying():
return nil, true
}
}
}
这个方法其实就是读取dispatcher中的消息,然后进行发送至hub模块,也就是真正进行与hub模块的通信。
这样,function模块的启动就介绍完了,主要是为每一个函数封装为一个ruler,ruler中有一个Funtion Dispatcher和一个MQTT Dispatcher,后者用于订阅、发布消息,当MQTT Dispatcher收到消息后由Function Dispatcher找到对应的function,然后向master API发送创建一个函数模块的请求,由master API负责启动模块
另外master启动函数模块后,函数模块会启动一个小的grpc服务器,由function通过rpc调用函数模块的Handle方法,Handle方法会打开我们定义的python文件,然后运行,并将运行结果返回给function模块,function模块获得返回值后调用在ruler.fd(Function Dispatcher)中设置的callback,将结果发送至hub模块,由hub模块负责进行转发消息。