当前位置: 首页 > 工具软件 > rpcx > 使用案例 >

RPCX框架

汤博
2023-12-01
RPCX
  • RPCX是Golang借鉴java生态圈中的流行框架构建的功能丰富的微服务平台,实现了一个高性能的、可容错的,插件式的RPC框架。
  • RPCX的目标:
    简单: 易于学习、易于开发、易于集成和易于发布
    高性能:远远高于grpc-go, 更不用说dubbo和motan
    服务发现和服务治理:方便开发大规模的微服务集群
    跨平台: rpcx 3.0底层不再使用标准rpc库,而是采用跨平台的二进制协议,高效但是方便多语言开发
  • 它的特点包括:
        1、指出纯的go方法, 不需要额外的定义
        2、可插拔的设计,可以方便扩展服务发现插件、tracing等
        3、支持 TCP、HTTP、QUIC、KCP等协议
        4、支持多种编码方式, 比如JSON、Protobuf、MessagePack 和 原始字节数据
        5、服务发现支持 单机对单机、单机对多机、zookeeper、etcd、consul、mDNS等多种发现方式
        6、容错支持 Failover、Failfast、Failtry等多种模式
        7、负载均衡支持随机选取、顺序选取、一致性哈希、基于权重的选取、基于网络质量的选取和就近选取等多种均衡方式
        8、支持压缩
        9、支持扩展信息传递(元数据)
        10、支持身份验证
        11、支持自动heartbeat和单向请求
        12、支持metrics、log、timeout、别名、断路器、TLS等特性
  • RPCX规范:
    服务方法的要求:
        1、可见性
        2、接收三个参数(context.Context,args ,args2),且最后一个参数应当是指针类型
        3、返回一个错误类型
    *   服务可以是一个基本类型,也可以是一个自定义的类型这不影响服务

    *   服务监听初始化:服务需要有一个TCP/UDP的服务进程来监听,需要实例化监听注册服务
        *   func NewServer(options ...OptionFn) *Server {
                s := &Server{
                    Plugins: &pluginContainer{},
                    options: make(map[string]interface{}),
                }

                for _, op := range options {
                    op(s)
                }
                return s
            }//由此,也可以直接使用Server{}构造自己需要的服务监视器
                type Server struct {
                    Plugins PluginContainer//包含服务端所有的插件
                    // AuthFunc 可以用来鉴权,检测客户端的权限
                    AuthFunc func(ctx context.Context, req *protocol.Message, token string) error
                    // 包含过滤后或者不可导出的字段
                    }
            //rpcx提供了三个可选的OptionFn启动选项:
                func WithReadTimeout(readTimeout time.Duration) OptionFn//读超时
                func WithTLSConfig(cfg *tls.Config) OptionFn//tls证书
                func WithWriteTimeout(writeTimeout time.Duration) OptionFn//写超时
            *   config := &tls.Config{Certificates: []tls.Certificate{cert}}
                 s := server.NewServer(server.WithTLSConfig(config))//TLS加密TCP流
  • 服务注册:Server.Register()与Server.RegisterName()

    • Rpcx 也支持将纯函数注册为服务,函数必须满足以下的要求:
      函数可以是可导出的或者不可导出的
      接受3个参数,第一个是 context.Context类型,其他2个都是可导出(或内置)的类型。
      第3个参数是一个指针
      有一个 error 类型的返回值
      RegisterFunction(“service”, function, “”)//将纯函数注册为服务
    • 服务启动:
           func (s *Server) Close() error
           func (s *Server) RegisterOnShutdown(f func())
           func (s *Server) Serve(network, address string) (err error)//通过TCP/UDP的方式通信
           func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request)//以HTTP的方式暴露
           服务
          
       * rpcx支持的网络类型
     
           tcp: 推荐使用
           http: 通过劫持http连接实现
           unix: unix domain sockets
           reuseport: 要求 SO_REUSEPORT socket 选项, 仅支持 Linux kernel 3.9+
           quic: support quic protocol
           kcp: sopport kcp protocol

   *   客户端调用服务:
       d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
       xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
       defer xclient.Close()
       xclient.Call()
       func (client *Client) Call(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}) error
   *   相关方法:
           
           func (client *Client) Close() error
           func (c *Client) Connect(network, address string) error
           func (client *Client) Go(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call
           func (client *Client) IsClosing() bool
           func (client *Client) IsShutdown() bool
       *   XClient是对Client的封装,其中
       Broadcast 表示向所有服务器发送请求,只有所有服务器正确返回时才会成功。此时FailMode 和 SelectMode的设置是无效的。请设置超时来避免阻塞。
       Fork 表示向所有服务器发送请求,只要任意一台服务器正确返回就成功。此时FailMode 和 SelectMode的设置是无效的。
*   rpcx使用network @ Host: port格式表示一项服务。在network 可以 tcp , http ,unix ,quic或kcp。该Host可以所主机名或IP地址。

####### 服务注册中心
1、Peer2Peer
2、MultiServer:
   多个服务器负载分担:
   d := client.NewMultipleServersDiscovery([]*client.KVPair{{Key: *addr1}, {Key: *addr2}})
   xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
   defer xclient.Close()
   MultipleServersDiscovery.Update()用于服务的更新
3、ZooKeeper
.....
####### 默认rpcx支持4种编解码方式:
   *   Json
   *   Protobuffer//google
   *   MsgPack//跨语言的编解码工具
   *   SerializeNone//不对数据编解码,要求数据是[]byte
   服务端和客户端始终保持同样的编解码方式,客户端使用json请求则服务端返回值为json
####### 自定义编解码方式:
```bash
官方MsgPack
type MsgpackCodec struct{}//定义自定义的编解码类型

// Encode encodes an object into slice of bytes.
func (c MsgpackCodec) Encode(i interface{}) ([]byte, error) {
   var buf bytes.Buffer
   enc := msgpack.NewEncoder(&buf)
   //enc.UseJSONTag(true)
   err := enc.Encode(i)
   return buf.Bytes(), err
}

// Decode decodes an object from slice of bytes.
func (c MsgpackCodec) Decode(data []byte, i interface{}) error {
   dec := msgpack.NewDecoder(bytes.NewReader(data))
   //dec.UseJSONTag(true)
   err := dec.Decode(i)
   return err
}
//自定义的编解码与给定格式都是一致的,只需要将对应的数据使用自定义的格式编解码返回即可

服务端添加Gob: 

func main() {
   flag.Parse()

   share.Codecs[protocol.SerializeType(4)] = &GobCodec{}//指定序列化类型
   s := server.NewServer()
   //s.RegisterName("Arith", new(example.Arith), "")
   s.Register(new(example.Arith), "")
   s.Serve("tcp", *addr)
}

type GobCodec struct {
}

func (c *GobCodec) Decode(data []byte, i interface{}) error {
   enc := gob.NewDecoder(bytes.NewBuffer(data))
   err := enc.Decode(i)
   return err
}

func (c *GobCodec) Encode(i interface{}) ([]byte, error) {
   var buf bytes.Buffer
   enc := gob.NewEncoder(&buf)
   err := enc.Encode(i)
   return buf.Bytes(), err
}
客户端也是一致的
认证

服务端:需要定义一个认证函数用于token的校验,server.AuthFunc接收校验函数

nc auth(ctx context.Context, req *protocol.Message, token string) error {
    if token == "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJwYXNzd29yZCI6IjE5OTgzNyIsInVzZXJuYW1lIjoiaGFkZXMifQ.zg1dDuhn393DVyIH_1s7eSj74JsBw_vLYbUiq72tAm8" {
        return nil
    }
 return errors.New("wrong token")
}
s.AuthFunc=auth
客户端:
client.Auth(token)//只有通过Auth()认证通信才能正常进行

错误模式
  • RPCX中一共包含4种错误模式:
    1、FailFast一旦一个节点访问错误立即返回错误
    2、Failtry重复访问一个节点直到访问成功或者达到最大访问次数
    3、FailBackup:这种情况下如果访问的节点在一定时间内不返回结果那么它会访问另一个节点,注意该模式只针对两个节点
    4、Failover当访问一个节点产生错误时会继续访问其它打节点直到节点访问成功或者达到失败的最大阈值
    • 失败模式设定在Xclient创建时设定,但是仅仅针对同步模式,异步模式下该参数是无意义的
    • 访问时间和重试次数在Option的相关选项中设置
Fork模式
    • 该模式类似于FailBackup但是它针对拥有该服务的所有节点,只要有节点成功返回就返回给客户端而不仅限于两个,使用xclient.Fork()直接调用参数同Call
广播模式
  • 该模式要求所有的服务节点都成功返回才返回,有任意节点出错都会返回错误
路由
  • 当服务规模较大时,通常有微服务集群来负载均衡,这时就出现了一个服务部署在多个节点上,客户端选择访问服务的节点的过程就是RPCX的路由
    • 分类:
      • Random_selector:随机选择节点访问(RandomSelect)
      • Roundrobin_selector 轮询方式访问,保证每个节点都能被访问到(RoundRobin)
      • Weighted_selector :和Nginx一样的平滑的基于权重的轮询(WeightedRoundRobin)
      • Ping_selector:网络质量优先,依次ping选择网络质量最好的(WeightedICMP)
      • hash_selector:一致性哈希,使用 JumpConsistentHash 选择节点, 相同的servicePath, serviceMethod 和 参数会路由到同一个节点上。 JumpConsistentHash 是一个快速计算一致性哈希的算法,但是有一个缺陷是它不能删除节点,如果删除节点,路由就不准确了,所以在节点有变动的时候它会重新计算一致性哈希。(ConsistentHash)
      • geo_selector:地理位置优先:服务注册时需要指定所在的经纬度,根据经纬度的相对位置来选择最佳的
      • user_selecor:自定义路由规则( SelectByUser)
    • 路由设置在Xclient创建时指定
元数据(metadata)
  • 元数据不是通信过程中的请求或响应数据,而是辅助的标签之类的信息,是一个字符串键值对
    • 客户端读取发送元数据需要在上下文中设置share.ReqMetaDataKey,接收需要设置share.ResMetaDataKey
      ctx := context.WithValue(context.Background(), share.ReqMetaDataKey, map[string]string{“test”: “test”})
      ctx = context.WithValue(ctx, share.ResMetaDataKey, make(map[string]string))
    • 服务端:
      reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
      resMeta := ctx.Value(share.ResMetaDataKey).(map[string]string)
Option
type Option struct {
    // group选项为空会被忽略
    Group string
    /*
     option.Group可以设置客户端所属分组,分组属于一个元数据,服务端可以在注册服务时指定分组如server.Register("Arith","group=test")
        只有属于同一分组的客户端才能访问对应的服务客户端option.Group="test"
    */

    // 重试次数
    Retries int

    // tcp和quic的tls配置
    TLSConfig *tls.Config
    // kcp.BlockCrypt
    Block interface{}
    // RPCPath for http connection
    RPCPath string
    /连接超时时间
    ConnectTimeout time.Duration
    //读超时
    ReadTimeout time.Duration
    // 写超时设置
    WriteTimeout time.Duration

    // BackupLatency is used for Failbackup mode. rpcx will sends another request if the first response doesn't return in BackupLatency time.//设置FailBackup时间
    BackupLatency time.Duration

    //生成一个默认的断路器
    GenBreaker func() Breaker
    /*
    Breaker
    断路器,当一个服务节点连续产生一定次数的错误后断路器会断开该节点连接以保护服务访问,一段时间后又恢复,可以自定义断路器
    */

    SerializeType protocol.SerializeType//序列化类型
    CompressType  protocol.CompressType

    Heartbeat         bool
    HeartbeatInterval time.Duration
    /*
     *  用于客户端与服务器连接的保持时间:
        option := client.DefaultOption
        option.Heartbeat = true
        option.HeartbeatInterval = time.Second
    */
}
 类似资料: