1、指出纯的go方法, 不需要额外的定义
2、可插拔的设计,可以方便扩展服务发现插件、tracing等
3、支持 TCP、HTTP、QUIC、KCP等协议
4、支持多种编码方式, 比如JSON、Protobuf、MessagePack 和 原始字节数据
5、服务发现支持 单机对单机、单机对多机、zookeeper、etcd、consul、mDNS等多种发现方式
6、容错支持 Failover、Failfast、Failtry等多种模式
7、负载均衡支持随机选取、顺序选取、一致性哈希、基于权重的选取、基于网络质量的选取和就近选取等多种均衡方式
8、支持压缩
9、支持扩展信息传递(元数据)
10、支持身份验证
11、支持自动heartbeat和单向请求
12、支持metrics、log、timeout、别名、断路器、TLS等特性
服务方法的要求:
1、可见性
2、接收三个参数(context.Context,args ,args2),且最后一个参数应当是指针类型
3、返回一个错误类型
* 服务可以是一个基本类型,也可以是一个自定义的类型这不影响服务
* 服务监听初始化:服务需要有一个TCP/UDP的服务进程来监听,需要实例化监听注册服务
* func NewServer(options ...OptionFn) *Server {
s := &Server{
Plugins: &pluginContainer{},
options: make(map[string]interface{}),
}
for _, op := range options {
op(s)
}
return s
}//由此,也可以直接使用Server{}构造自己需要的服务监视器
type Server struct {
Plugins PluginContainer//包含服务端所有的插件
// AuthFunc 可以用来鉴权,检测客户端的权限
AuthFunc func(ctx context.Context, req *protocol.Message, token string) error
// 包含过滤后或者不可导出的字段
}
//rpcx提供了三个可选的OptionFn启动选项:
func WithReadTimeout(readTimeout time.Duration) OptionFn//读超时
func WithTLSConfig(cfg *tls.Config) OptionFn//tls证书
func WithWriteTimeout(writeTimeout time.Duration) OptionFn//写超时
* config := &tls.Config{Certificates: []tls.Certificate{cert}}
s := server.NewServer(server.WithTLSConfig(config))//TLS加密TCP流
服务注册:Server.Register()与Server.RegisterName()
func (s *Server) Close() error
func (s *Server) RegisterOnShutdown(f func())
func (s *Server) Serve(network, address string) (err error)//通过TCP/UDP的方式通信
func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request)//以HTTP的方式暴露
服务
* rpcx支持的网络类型
tcp: 推荐使用
http: 通过劫持http连接实现
unix: unix domain sockets
reuseport: 要求 SO_REUSEPORT socket 选项, 仅支持 Linux kernel 3.9+
quic: support quic protocol
kcp: sopport kcp protocol
* 客户端调用服务:
d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()
xclient.Call()
func (client *Client) Call(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}) error
* 相关方法:
func (client *Client) Close() error
func (c *Client) Connect(network, address string) error
func (client *Client) Go(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call
func (client *Client) IsClosing() bool
func (client *Client) IsShutdown() bool
* XClient是对Client的封装,其中
Broadcast 表示向所有服务器发送请求,只有所有服务器正确返回时才会成功。此时FailMode 和 SelectMode的设置是无效的。请设置超时来避免阻塞。
Fork 表示向所有服务器发送请求,只要任意一台服务器正确返回就成功。此时FailMode 和 SelectMode的设置是无效的。
* rpcx使用network @ Host: port格式表示一项服务。在network 可以 tcp , http ,unix ,quic或kcp。该Host可以所主机名或IP地址。
####### 服务注册中心
1、Peer2Peer
2、MultiServer:
多个服务器负载分担:
d := client.NewMultipleServersDiscovery([]*client.KVPair{{Key: *addr1}, {Key: *addr2}})
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()
MultipleServersDiscovery.Update()用于服务的更新
3、ZooKeeper
.....
####### 默认rpcx支持4种编解码方式:
* Json
* Protobuffer//google
* MsgPack//跨语言的编解码工具
* SerializeNone//不对数据编解码,要求数据是[]byte
服务端和客户端始终保持同样的编解码方式,客户端使用json请求则服务端返回值为json
####### 自定义编解码方式:
```bash
官方MsgPack
type MsgpackCodec struct{}//定义自定义的编解码类型
// Encode encodes an object into slice of bytes.
func (c MsgpackCodec) Encode(i interface{}) ([]byte, error) {
var buf bytes.Buffer
enc := msgpack.NewEncoder(&buf)
//enc.UseJSONTag(true)
err := enc.Encode(i)
return buf.Bytes(), err
}
// Decode decodes an object from slice of bytes.
func (c MsgpackCodec) Decode(data []byte, i interface{}) error {
dec := msgpack.NewDecoder(bytes.NewReader(data))
//dec.UseJSONTag(true)
err := dec.Decode(i)
return err
}
//自定义的编解码与给定格式都是一致的,只需要将对应的数据使用自定义的格式编解码返回即可
服务端添加Gob:
func main() {
flag.Parse()
share.Codecs[protocol.SerializeType(4)] = &GobCodec{}//指定序列化类型
s := server.NewServer()
//s.RegisterName("Arith", new(example.Arith), "")
s.Register(new(example.Arith), "")
s.Serve("tcp", *addr)
}
type GobCodec struct {
}
func (c *GobCodec) Decode(data []byte, i interface{}) error {
enc := gob.NewDecoder(bytes.NewBuffer(data))
err := enc.Decode(i)
return err
}
func (c *GobCodec) Encode(i interface{}) ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err := enc.Encode(i)
return buf.Bytes(), err
}
客户端也是一致的
服务端:需要定义一个认证函数用于token的校验,server.AuthFunc接收校验函数
nc auth(ctx context.Context, req *protocol.Message, token string) error {
if token == "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJwYXNzd29yZCI6IjE5OTgzNyIsInVzZXJuYW1lIjoiaGFkZXMifQ.zg1dDuhn393DVyIH_1s7eSj74JsBw_vLYbUiq72tAm8" {
return nil
}
return errors.New("wrong token")
}
s.AuthFunc=auth
客户端:
client.Auth(token)//只有通过Auth()认证通信才能正常进行
type Option struct {
// group选项为空会被忽略
Group string
/*
option.Group可以设置客户端所属分组,分组属于一个元数据,服务端可以在注册服务时指定分组如server.Register("Arith","group=test")
只有属于同一分组的客户端才能访问对应的服务客户端option.Group="test"
*/
// 重试次数
Retries int
// tcp和quic的tls配置
TLSConfig *tls.Config
// kcp.BlockCrypt
Block interface{}
// RPCPath for http connection
RPCPath string
/连接超时时间
ConnectTimeout time.Duration
//读超时
ReadTimeout time.Duration
// 写超时设置
WriteTimeout time.Duration
// BackupLatency is used for Failbackup mode. rpcx will sends another request if the first response doesn't return in BackupLatency time.//设置FailBackup时间
BackupLatency time.Duration
//生成一个默认的断路器
GenBreaker func() Breaker
/*
Breaker
断路器,当一个服务节点连续产生一定次数的错误后断路器会断开该节点连接以保护服务访问,一段时间后又恢复,可以自定义断路器
*/
SerializeType protocol.SerializeType//序列化类型
CompressType protocol.CompressType
Heartbeat bool
HeartbeatInterval time.Duration
/*
* 用于客户端与服务器连接的保持时间:
option := client.DefaultOption
option.Heartbeat = true
option.HeartbeatInterval = time.Second
*/
}