MOSN 源码解析 - XDS

优质
小牛编辑
138浏览
2023-12-01

本文的内容基于 MOSN v0.9.0。

XDS用来与pilot-discovery通讯做服务发现功能。

XDS是一类发现服务的总称,包含LDS, RDS, CDS, EDS以及SDS。

MOSN通过XDS API可以动态获取Listener(监听器),Route(路由), Cluster(集群), Endpoint(集群成员)以及Secret(证书)配置。

XDS的基本流程:Pilot-Discovery的Model -> XDS.pb -> GRPC -> XDS.pb -> MOSN的Model (GRPC包括序列化和网络传输)。

配置文件&解析

if len(DynamicResources) > 0 && len(StaticResources) > 0 进入XDS模式。

XDS模式下的MOSN配置文件mosn_config.json:

{
  "dynamic_resources": {
    "lds_config": {
      "ads": {}
    },
    "cds_config": {
      "ads": {}
    },
    "ads_config": {
      "api_type": "GRPC",
      "cluster_names": ["xxx"],
      "grpc_services": [
        {
          "envoy_grpc": {
            "cluster_name": "xds-grpc"
          }
        }
      ]
    }
  },
  "static_resources": {
    "clusters": [
      {
        "name": "xds-grpc",
        "type": "STRICT_DNS",
        "lb_policy": "RANDOM",
        "hosts": [
          {
            "socket_address": {"address": "istio-pilot.istio-system.svc.boss.twl", "port_value": 15010}
          }
        ],
        "http2_protocol_options": { }
      }
    ]
  }
}

解析配置文件构建XDSConfig(XDS客户端的配置)。

构建adsClient(XDS客户端)。

func (c *Client) Start(config *config.MOSNConfig) error {
	log.DefaultLogger.Infof("xds client start")

    //解析配置文件
	dynamicResources, staticResources, err := UnmarshalResources(config)
	if err != nil {
		log.DefaultLogger.Warnf("fail to unmarshal xds resources, skip xds: %v", err)
		return errors.New("fail to unmarshal xds resources")
	}
    
    //构建xdsConfig
	xdsConfig := v2.XDSConfig{}
	err = xdsConfig.Init(dynamicResources, staticResources)
	if err != nil {
		log.DefaultLogger.Warnf("fail to init xds config, skip xds: %v", err)
		return errors.New("fail to init xds config")
	}

    //构建adsCLient
	stopChan := make(chan int)
	sendControlChan := make(chan int)
	recvControlChan := make(chan int)
	adsClient := &v2.ADSClient{
		AdsConfig:         xdsConfig.ADSConfig,
		StreamClientMutex: sync.RWMutex{},
		StreamClient:      nil,
		MosnConfig:        config,
		SendControlChan:   sendControlChan,
		RecvControlChan:   recvControlChan,
		StopChan:          stopChan,
	}
	adsClient.Start()
	c.adsClient = adsClient
	return nil
}

初始化和启动xds连接

adsClient.start()

func (adsClient *ADSClient) Start() {
    //构建grpc双向流连接。
	adsClient.StreamClient = adsClient.AdsConfig.GetStreamClient()
	utils.GoWithRecover(func() {
        //认证和开启xds传输,并且设置定时重传
		adsClient.sendThread()
	}, nil)
	utils.GoWithRecover(func() {
        //接受下发数据,并根据类型选择不同的handler处理
		adsClient.receiveThread()
	}, nil)
}

函数细节: https://github.com/mosn/mosn/blob/master/pkg/xds/v2/adssubscriber.go

XDS消息处理和发送

四种类型处理器注册。

func init() {
	RegisterTypeURLHandleFunc(EnvoyListener, HandleEnvoyListener)
	RegisterTypeURLHandleFunc(EnvoyCluster, HandleEnvoyCluster)
	RegisterTypeURLHandleFunc(EnvoyClusterLoadAssignment, HandleEnvoyClusterLoadAssignment)
	RegisterTypeURLHandleFunc(EnvoyRouteConfiguration, HandleEnvoyRouteConfiguration)
}

接受数据类型,将XDS类型转换成MOSN数据类型,并且加入对应的manager。

以HandlerListener为例:

func HandleEnvoyListener(client *ADSClient, resp *envoy_api_v2.DiscoveryResponse) {
	log.DefaultLogger.Tracef("get lds resp,handle it")
    //解析返回消息,生成envoy_listener
	listeners := client.handleListenersResp(resp)
	log.DefaultLogger.Infof("get %d listeners from LDS", len(listeners))
    //转换envoy_listener为mosn_listener,并且加入ListenerAdapter
	conv.ConvertAddOrUpdateListeners(listeners)
	if err := client.reqRoutes(client.StreamClient); err != nil {
		log.DefaultLogger.Warnf("send thread request rds fail!auto retry next period")
	}
}

函数细节: https://github.com/mosn/mosn/blob/master/pkg/xds/v2/default_handler.go

请求与处理流程简单所示,也可接受pilot-discovery的推送:

switch(type):
  case: cluster:
    接收cluster返回,根据clusterName请求Endpoints
  case: endpoint:
    接收Endpoints,请求Listener
  case: listener
    接受Listener,请求Routes
  case: router
    接受Routes
       

XDS类型转换

XDS.pd数据结构类型在: https://github.com/envoyproxy/go-control-plane

收到数据并反序列化为XDS的Model之后,进行结构体转换。

类型转换代码如下: https://github.com/mosn/mosn/blob/master/pkg/xds/conv/convertxds.go