go-zero 1—rpc服务的创建和rpc服务之间的调用,并介绍go-zero 服务启动的流程

莫英喆
2023-12-01

前期准备

如果对 grpc 还不太了解的,可以看看我的这栏文章https://blog.csdn.net/wanmei002/category_11067794.html

因为 服务发现和服务注册用到了 etcd , 但是最新的 grpc 跟 etcd 不兼容,

所以 protoc-gen-go 跟 grpc 的版本要降级

go get -u github.com/golang/protobuf/protoc-gen-go@v1.3.2

go get google.golang.org/grpc@v1.29.1

先贴以下我的 go.mod

require (
	github.com/golang/protobuf v1.4.2
	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
	github.com/tal-tech/go-zero v1.1.4
	google.golang.org/grpc v1.29.1
)

简单初始化 一个 grpc server

以下代码来自 go-zero 文档, 地址: https://github.com/tal-tech/zero-doc/blob/main/doc/shorturl.md

  • 建一个文件,进入 初始化: go mod init *****
  • goctl rpc template -o transform.proto
  • goctl rpc proto -src transform.proto -dir .
  • go get github.com/golang/protobuf@v1.4.2
  • go get google.golang.org/grpc@v1.29.1
  • go get 获得所有的依赖
  • 查看下 etc/transform.yaml 文件里的etcd 地址:端口 是否正确
  • go run transform.go 运行起来程序

rpc 服务之间的调用

修改配置文件

比如我们又建了一个服务 AAA,要调用 transform 服务

修改 AAA 服务的配置文件

  • 修改在 etc/ 目录下的 yaml 文件,在其中添加 transform 服务的 etcd 信息,添加内容如下:
Transform: // 名字可以自己取
  Etcd:
    Hosts:
      - 127.0.0.1:2379  // transform 服务注册的 etcd
    Key: transform.rpc  // transform 服务注册的 服务名
  • 在 AAA 服务目录下的 internal/svc/servicecontext.go 文件中添加 transform 客户端连接
    type ServiceContext struct {
    	Config config.Config
    	Transform transformer.Transformer // Transformer服务 客户端接口 位置一般是: transform/transformer/transformer.go
    }

创建 transform 服务连接,也是在 servicecontext.go 文件中

func NewServiceContext(c config.Config) *ServiceContext {
	return &ServiceContext{
		Config: c,
		// 创建 transform 客户端连接
		Transform: transformer.NewTransformer(zrpc.MustNewClient(c.Transform)),// c.Transform 是在 yaml 中配置的 transform 服务的配置信息
	}
}

逻辑一般都在项目下的 internal/logic 文件夹下, 在 rpc 方法中,在其中调用 其它 gRPC 项目的方法可以
用 l.svcCtx.Transform 对象里的方法

现在我们来了解下 go-zero 服务是怎么运行了

新建的实现 proto 中声明的 rpc 方法

// 第二个参数的值 会赋值给 s 的register属性,然后 调用 s.Start() 方法, 注册服务
s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {
		transform.RegisterTransformerServer(grpcServer, srv)
	})

让我们看看新建的时候都做了什么

第一步

func MustNewServer(c RpcServerConf, register internal.RegisterFn) *RpcServer {
	server, err := NewServer(c, register)// 这个是第二步
	if err != nil {
		log.Fatal(err)
	}

	return server
}

第二步

func NewServer(c RpcServerConf, register internal.RegisterFn) (*RpcServer, error) {
	var err error
    // 如果要 Auth 认证,需要配置 redis ,这里检查是否配置了 redis
	if err = c.Validate(); err != nil {
		return nil, err
	}

	var server internal.Server
	metrics := stat.NewMetrics(c.ListenOn)
	if c.HasEtcd() {
        // 检查 配置的 ip
		listenOn := figureOutListenOn(c.ListenOn)
		// 重要的一步 连接etcd 并注册服务,并保持续期  这是第三步
        server, err = internal.NewRpcPubServer(c.Etcd.Hosts, c.Etcd.Key, listenOn, internal.WithMetrics(metrics))
		if err != nil {
			return nil, err
		}
	} else {
		server = internal.NewRpcServer(c.ListenOn, internal.WithMetrics(metrics))
	}

	server.SetName(c.Name)
	if err = setupInterceptors(server, c, metrics); err != nil {
		return nil, err
	}

	rpcServer := &RpcServer{
		server:   server,
		register: register,
	}
	if err = c.SetUp(); err != nil {
		return nil, err
	}

	return rpcServer, nil
}

第三步

server, err = internal.NewRpcPubServer(c.Etcd.Hosts, c.Etcd.Key, listenOn, internal.WithMetrics(metrics))

上面的 NewRpcPubServer 方法里的代码

func NewRpcPubServer(etcdEndpoints []string, etcdKey, listenOn string, opts ...ServerOption) (Server, error) {
	registerEtcd := func() error {
        // 连接 etcd 并设置租约
		pubClient := discov.NewPublisher(etcdEndpoints, etcdKey, listenOn)
		return pubClient.KeepAlive()
	}
	server := keepAliveServer{
		registerEtcd: registerEtcd,
                      // 这是第四步
		Server:       NewRpcServer(listenOn, opts...),
	}

	return server, nil
}

第四步

func NewRpcServer(address string, opts ...ServerOption) Server {
	var options rpcServerOptions
	for _, opt := range opts {
		opt(&options)
	}
	if options.metrics == nil {
		options.metrics = stat.NewMetrics(address)
	}
    // 这个 server 是比较重要的 他有个方法
	return &rpcServer{
		baseRpcServer: newBaseRpcServer(address, options.metrics),
	}
}

rpcServer 的 Start 方法, 这个方法就是注册grpc 服务 和 注册拦截器

func (s *rpcServer) Start(register RegisterFn) error {
	lis, err := net.Listen("tcp", s.address)
	if err != nil {
		return err
	}

	unaryInterceptors := []grpc.UnaryServerInterceptor{
		serverinterceptors.UnaryTracingInterceptor(s.name),
		serverinterceptors.UnaryCrashInterceptor(),
		serverinterceptors.UnaryStatInterceptor(s.metrics),
		serverinterceptors.UnaryPrometheusInterceptor(),
	}
	unaryInterceptors = append(unaryInterceptors, s.unaryInterceptors...)
	streamInterceptors := []grpc.StreamServerInterceptor{
		serverinterceptors.StreamCrashInterceptor,
	}
	streamInterceptors = append(streamInterceptors, s.streamInterceptors...)
	options := append(s.options, WithUnaryServerInterceptors(unaryInterceptors...),
		WithStreamServerInterceptors(streamInterceptors...))
	server := grpc.NewServer(options...)
	register(server)
	// we need to make sure all others are wrapped up
	// so we do graceful stop at shutdown phase instead of wrap up phase
	waitForCalled := proc.AddWrapUpListener(func() {
		server.GracefulStop()
	})
	defer waitForCalled()

	return server.Serve(lis)
}

第五步 启动起来

func (rs *RpcServer) Start() {
	if err := rs.server.Start(rs.register); err != nil {
		logx.Error(err)
		panic(err)
	}
}

就这样 grpc 服务就跑起来了

 类似资料: