创建项目成功后,进入api目录下可以看到api.proto文件:
option go_package = "api";
option (gogoproto.goproto_getters_all) = false;
service RPCDemo {
rpc Ping(.google.protobuf.Empty) returns (.google.protobuf.Empty);
rpc SayHello(HelloReq) returns (.google.protobuf.Empty);
rpc SayHelloURL(HelloReq) returns (HelloResp) {
option (google.api.http) = {
get: "/kratos-demo/say_hello"
};
};
}
message HelloReq {
string name = 1 [(gogoproto.moretags) = 'form:"name" validate:"required"'];
}
message HelloResp {
string Content = 1 [(gogoproto.jsontag) = 'content'];
}
运行:
kratos tool protoc --grpc --bm api.proto
命令可以得到api.pb.go 和 api.bm.go
api.proto是gRPC server的描述文件
api.pb.go是基于api.proto生成的代码文件,用于rpc调用,具体逻辑可在internal/service/serevice.go 内实现
api.bm.go是基于api.proto生成的代码文件,用于http调用,将参数绑定后,调用serevice.go中方法,并返回json结果。
进入internal/server/grpc目录打开server.go文件,可以看到以下代码,只需要替换以下注释内容就可以启动一个gRPC服务。
package grpc
import (
"github.com/luslin/tools/kratos-demo/api"
"github.com/go-kratos/kratos/pkg/conf/paladin"
"github.com/go-kratos/kratos/pkg/net/rpc/warden"
)
// New new a grpc server.
func New(svc api.RPCDemoServer) (ws *warden.Server, err error) {
var (
cfg warden.ServerConfig
ct paladin.TOML
)
if err = paladin.Get("grpc.toml").Unmarshal(&ct); err != nil {
return
}
if err = ct.Get("Server").UnmarshalTOML(&cfg); err != nil {
return
}
ws = warden.NewServer(&cfg)
// 替换这里 RegisterRPCDemoServer 在 api.pb.go 中
api.RegisterRPCDemoServer(ws.Server(), svc)
ws, err = ws.Start()
return
}
注册方法 internal/service/service.go
var Provider = wire.NewSet(New, wire.Bind(new(api.RPCDemoServer), new(*Service)))
// Service service.
type Service struct {
ac *paladin.Map
dao dao.Dao
}
// New new a service and return.
func New(d dao.Dao) (s *Service, cf func(), err error) {
s = &Service{
ac: &paladin.TOML{},
dao: d,
}
cf = s.Close
err = paladin.Watch("application.toml", s.ac)
return
}
// SayHello grpc demo func.
func (s *Service) SayHello(ctx context.Context, req *api.HelloReq) (reply *empty.Empty, err error) {
reply = new(empty.Empty)
fmt.Printf("hello %s", req.Name)
return
}
// SayHelloURL bm demo func.
func (s *Service) SayHelloURL(ctx context.Context, req *api.HelloReq) (reply *api.HelloResp, err error) {
reply = &api.HelloResp{
Content: "hello " + req.Name,
}
fmt.Printf("hello url %s", req.Name)
return
}
// Ping ping the resource.
func (s *Service) Ping(ctx context.Context, e *empty.Empty) (*empty.Empty, error) {
return &empty.Empty{}, s.dao.Ping(ctx)
}
// Close close the resource.
func (s *Service) Close() {
}
请进入internal/service内找到SayHello方法,注意方法的入参和出参,都是按照gRPC的方法声明对应的:
第一个参数必须是context.Context,第二个必须是proto内定义的message对应生成的结构体
第一个返回值必须是proto内定义的message对应生成的结构体,第二个参数必须是error
在http框架bm中,如果共用proto文件生成bm代码,那么也可以直接使用该service方法
建议service严格按照此格式声明方法使其能够在bm和warden内共用。
请进入internal/dao方法内,一般对资源的处理都会在这一层封装。
对于client端,前提必须有对应proto文件生成的代码,那么有两种选择:
拷贝proto文件到自己项目下并且执行代码生成
直接import服务端的api package
这也是业务代码我们加了一层internal的关系,服务对外暴露的只有接口
不管哪一种方式,以下初始化gRPC client的代码建议伴随生成的代码存放在统一目录下:
api/client_test.go
package api
import (
"context"
"fmt"
"github.com/go-kratos/kratos/pkg/net/rpc/warden"
"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc"
"log"
"testing"
)
const (
address = "localhost:9000"
)
// 传统rpc调用
func TestNewClient(t *testing.T) {
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
defer conn.Close()
c := NewRPCDemoClient(conn)
r, err := c.Ping(context.TODO(), &empty.Empty{})
if err != nil {
log.Fatal( err)
}
fmt.Println(r)
rep, err := c.SayHelloURL(context.TODO(),&HelloReq{Name: "lin"})
if err != nil {
log.Fatal( err)
}
fmt.Println(rep.Content)
}
// kratos 封装的rpc调用
func TestClient2(t *testing.T) {
client := warden.NewClient(&warden.ClientConfig{})
cc, err := client.Dial(context.Background(), fmt.Sprintf("direct://default/%s", address))
if err != nil {
panic(err)
}
rpc_cli := NewRPCDemoClient(cc)
rep, err := rpc_cli.SayHelloURL(context.TODO(),&HelloReq{Name: "lin"})
if err != nil {
log.Fatal( err)
}
fmt.Println(rep.Content)
}
服务注册与发现最简单的就是direct固定服务端地址的直连方式。也就是服务端正常监听端口启动不进行额外操作,客户端使用如下target:
direct://default/127.0.0.1:9000,127.0.0.1:9091
其中direct为协议类型,此处表示直接使用该URL内提供的地址127.0.0.1:9000,127.0.0.1:9091进行连接,而default在此处无意义仅当做占位符。
要将本项目注册到discovery中:
func DiscoveryRegister() func(){
hn, _ := os.Hostname()
dis := discovery.New(nil)
ins := &naming.Instance{
Zone: "sh001",
Env: "dev",
AppID: "kratos_grpc",
Hostname: hn,
Addrs: []string{
"grpc://192.168.1.88:9000",
"http://192.168.1.88:9056",
},
}
cancel, err := dis.Register(context.Background(), ins)
if err != nil {
panic(err)
}
// 省略...
// 特别注意!!!
// cancel必须在进程退出时执行!!!
return cancel
}
在服务退出时,调用cancel从discovery中去掉本服务
可以在discovery中看到本服务:
"kratos_grpc":[
{
"region":"sh",
"zone":"sh001",
"env":"dev",
"appid":"kratos_grpc",
"hostname":"local",
"addrs":[
"grpc://192.168.1.88:9000"
],
"version":"",
"metadata":null,
"status":1,
"reg_timestamp":1589962982365217546,
"up_timestamp":1589962982365217546,
"renew_timestamp":1589962982365217546,
"dirty_timestamp":1589962982358288851,
"latest_timestamp":1589962982365217546
}
要使用discovery需要在业务的NewClient前进行注册,代码如下:
package api
import (
"context"
"fmt"
"github.com/go-kratos/kratos/pkg/naming/discovery"
"github.com/go-kratos/kratos/pkg/net/rpc/warden/resolver"
"github.com/go-kratos/kratos/pkg/net/rpc/warden"
"google.golang.org/grpc"
)
// AppID .
const AppID = "kratos_grpc"
func init() {
// NOTE: 注意这段代码,表示要使用discovery进行服务发现
// NOTE: 还需注意的是,resolver.Register是全局生效的,所以建议该代码放在进程初始化的时候执行
// NOTE: !!!切记不要在一个进程内进行多个不同中间件的Register!!!
// NOTE: 在启动应用时,可以通过flag(-discovery.nodes) 或者 环境配置(DISCOVERY_NODES)指定discovery节点
resolver.Register(discovery.Builder())
}
// NewClient new grpc client
func NewClient(cfg *warden.ClientConfig, opts ...grpc.DialOption) (RPCDemoClient, error) {
client := warden.NewClient(cfg, opts...)
cc, err := client.Dial(context.Background(), fmt.Sprintf("discovery://default/%s", AppID))
if err != nil {
return nil, err
}
// 注意替换这里:
// NewDemoClient方法是在"api"目录下代码生成的
// 对应proto文件内自定义的service名字,请使用正确方法名替换
return NewRPCDemoClient(cc), nil
}
测试
func TestClient2(t *testing.T) {
client := warden.NewClient(&warden.ClientConfig{})
cc, err := client.Dial(context.Background(), fmt.Sprintf("discovery://default/%s", "kratos_grpc"))
if err != nil {
panic(err)
}
rpc_cli := NewRPCDemoClient(cc)
rep, err := rpc_cli.SayHelloURL(context.TODO(),&HelloReq{Name: "lin"})
if err != nil {
log.Fatal( err)
}
fmt.Println(rep.Content)
}
结果:
INFO 05/20-16:28:56.596 grpc-access-log ts=0.002694286 path=/demo.service.v1.RPCDemo/SayHelloURL args=name:"lin" ret=0 ip=192.168.1.88:9000
hello lin
target是discovery://default/${appid},当gRPC内进行解析后会得到scheme=discovery和appid,然后进行以下逻辑:
和使用discovery类似,只需要在注册时使用etcd naming即可
func init(){
// NOTE: 注意这段代码,表示要使用etcd进行服务发现 ,其他事项参考discovery的说明
// NOTE: 在启动应用时,可以通过flag(-etcd.endpoints) 或者 环境配置(ETCD_ENDPOINTS)指定etcd节点
// NOTE: 如果需要自己指定配置时 需要同时设置DialTimeout 与 DialOptions: []grpc.DialOption{grpc.WithBlock()}
resolver.Register(etcd.Builder(nil))
}
etcd的服务注册与discovery基本相同,可以传入详细的etcd配置项, 或者传入nil后通过flag(-etcd.endpoints)/环境配置(ETCD_ENDPOINTS)来指定etcd节点。
grpc-go内置了round-robin轮询,但由于自带的轮询算法不支持权重,也不支持color筛选等需求,故需要重新实现一个负载均衡算法。
该算法在加权轮询法基础上增加了动态调节权重值,用户可以在为每一个节点先配置一个初始的权重分,之后算法会根据节点cpu、延迟、服务端错误率、客户端错误率动态打分,在将打分乘用户自定义的初始权重分得到最后的权重值。
本算法通过随机选择两个node选择优胜者来避免羊群效应,并通过ewma尽量获取服务端的实时状态。
服务端: 服务端获取最近500ms内的CPU使用率(需要将cgroup设置的限制考虑进去,并除于CPU核心数),并将CPU使用率乘与1000后塞入每次grpc请求中的的Trailer中夹带返回: cpu_usage uint64 encoded with string cpu_usage : 1000
客户端: 主要参数:
// NewClient returns a new blank Client instance with a default client interceptor.
// opt can be used to add grpc dial options.
func NewClient(conf *ClientConfig, opt ...grpc.DialOption) *Client {
c := new(Client)
if err := c.SetConfig(conf); err != nil {
panic(err)
}
c.UseOpt(grpc.WithBalancerName(p2c.Name))
c.UseOpt(opt...)
c.Use(c.recovery(), clientLogging(), c.handle())
return c
}