基于go-kit开发grpc接口,并使用go-kit注册到consul中。
如果不知道go-kit几个层级,那么这篇文章不适合你,先去看go-kit基础,再来看这篇文章
我们首先编写proto文件
syntax = "proto3";
package gproto;
message StringRequest {
string A = 1;
string B = 2;
}
message HealthRequest {
string A = 1;
}
message HealthResponse {
string Status = 1;
}
message StringResponse {
string Msg = 1;
}
service StringServices {
rpc Concat (StringRequest) returns (StringResponse);
rpc Diff (StringRequest) returns (StringResponse);
rpc HealtStatus (HealthRequest) returns (HealthResponse);
}
编写好proto文件之后,进行代码生成
protoc --go_out=plugins=grpc:./ ./test.proto
利用go-kit实现接口,go-kit实现的接口,只用于数据透传,并调用底层endpoint,并不做逻辑执行
要调用底层,我们就是要通过数据透传,进行调用endpoint,然后再进行调用service层代码
我们看数据透传层
package router
import (
"bkgrpc/endpoint"
."bkgrpc/proto"
"bkgrpc/services"
"bkgrpc/transport"
"context"
"github.com/go-kit/kit/transport/grpc"
)
type ServicesW interface {
Concat(context.Context, *StringRequest) (*StringResponse, error)
Diff(context.Context, *StringRequest) (*StringResponse, error)
HealtStatus(context.Context, *HealthRequest) (*HealthResponse, error)
}
type ServicesWA struct {
concat grpc.Handler
diff grpc.Handler
healtStatus grpc.Handler
}
func (s ServicesWA) Concat(ctx context.Context, request *StringRequest) (*StringResponse, error) {
_ , rep , err :=s.concat.ServeGRPC(ctx , request)
return rep.(*StringResponse) , err
}
func (s ServicesWA) Diff(ctx context.Context, request *StringRequest) (*StringResponse, error) {
_ , rep , err := s.diff.ServeGRPC(ctx , request)
return rep.(*StringResponse) , err
}
func (s ServicesWA) HealtStatus(ctx context.Context, request *HealthRequest) (*HealthResponse, error){
_ , rep , err := s.healtStatus.ServeGRPC(ctx , request)
return rep.(*HealthResponse) , err
}
注意:这个数据透传层只是“表面”上完成了接口,但是底层调用了ServeGRPC接口,如果有好奇的朋友,可以去这个接口看看,并看一下grpc.Handler是如果实现这个接口的,其实这个接口底层实现,就是,调用decode进行编码,再通过调用endpoint层进行底层调用service逻辑函数,再进行解码
好,此时,我们继续,实现底层的service的各个接口的逻辑
service.go
package services
import (
."bkgrpc/proto"
"context"
"errors"
)
type Service interface {
Concat(context.Context, *StringRequest) (*StringResponse, error)
Diff(context.Context, *StringRequest) (*StringResponse, error)
HealtStatus(context.Context, *HealthRequest) (*HealthResponse, error)
}
type ServicesA struct {}
func (s ServicesA) Concat (_ context.Context, request *StringRequest) (*StringResponse, error) {
if len(request.A + request.B) > 10 {
return nil, errors.New("too long strings")
}
return &StringResponse{Msg: request.B + request.A} , nil
}
func (s ServicesA) Diff(_ context.Context, request *StringRequest) (*StringResponse, error) {
if len(request.A + request.B) > 10{
return nil , errors.New("too long strings")
}
if request.A == request.B {
return &StringResponse{Msg: "two strings is same"} , nil
} else {
return &StringResponse{Msg: "two strings is not same"} , nil
}
}
func (s ServicesA) HealtStatus(_ context.Context, request *HealthRequest) (*HealthResponse, error) {
return &HealthResponse{Status: "the status is health"} , nil
}
这是serivce逻辑实现,由于是一个demo,就基本没有逻辑,concat是进行字符串拼接,diff是判断字符串是否相同
接下来,继续,我们实现endpoint层,endpoint层是进行业务转发,调用底层serivce函数
package endpoint
import (
"context"
"github.com/go-kit/kit/endpoint"
"bkgrpc/services"
"bkgrpc/proto"
)
type EndpointA struct {
ConcatEndpoint endpoint.Endpoint
DiffEndpoint endpoint.Endpoint
HealthEndpoint endpoint.Endpoint
}
func MakeConcatEndpoint (svc services.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
req := request.(*gproto.StringRequest)
response , err =svc.Concat(ctx , req)
return
}
}
func MakeDiffEndpoint (svc services.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
req := request.(*gproto.StringRequest)
response , err = svc.Diff(ctx , req)
return
}
}
func MakeHealthEndpoint (svc services.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
req := request.(*gproto.HealthRequest)
response , err = svc.HealtStatus(ctx , req)
return
}
}
这就不多讲了,如果看不懂,需要先去看go-kit基础。
接下来,进行transport层,主要进行编码和解码
package transport
import (
gproto "bkgrpc/proto"
"context"
)
func DecodString (ctx context.Context, req interface{}) (request interface{}, err error) {
request = req.(*gproto.StringRequest)
return
}
func DecodeHealth (ctx context.Context, req interface{}) (request interface{}, err error) {
request = req.(*gproto.HealthRequest)
return
}
func HealthEncode (ctx context.Context, rep interface{}) (response interface{}, err error){
response = rep.(*gproto.HealthResponse)
return
}
func EncodString (ctx context.Context, rep interface{}) (response interface{}, err error){
response = rep.(*gproto.StringResponse)
return
}
下面重要的是,如果将这写接口整合到一起,进行一个router的编写
package router
import (
"bkgrpc/endpoint"
."bkgrpc/proto"
"bkgrpc/services"
"bkgrpc/transport"
"context"
"github.com/go-kit/kit/transport/grpc"
)
type ServicesW interface {
Concat(context.Context, *StringRequest) (*StringResponse, error)
Diff(context.Context, *StringRequest) (*StringResponse, error)
HealtStatus(context.Context, *HealthRequest) (*HealthResponse, error)
}
type ServicesWA struct {
concat grpc.Handler
diff grpc.Handler
healtStatus grpc.Handler
}
func (s ServicesWA) Concat(ctx context.Context, request *StringRequest) (*StringResponse, error) {
_ , rep , err :=s.concat.ServeGRPC(ctx , request)
return rep.(*StringResponse) , err
}
func (s ServicesWA) Diff(ctx context.Context, request *StringRequest) (*StringResponse, error) {
_ , rep , err := s.diff.ServeGRPC(ctx , request)
return rep.(*StringResponse) , err
}
func (s ServicesWA) HealtStatus(ctx context.Context, request *HealthRequest) (*HealthResponse, error){
_ , rep , err := s.healtStatus.ServeGRPC(ctx , request)
return rep.(*HealthResponse) , err
}
func NewRouter (endpoints endpoint.EndpointA) services.Service {
return &ServicesWA{healtStatus: grpc.NewServer(endpoints.HealthEndpoint , transport.DecodeHealth , transport.HealthEncode),
diff: grpc.NewServer(endpoints.DiffEndpoint , transport.DecodString , transport.EncodString),
concat: grpc.NewServer(endpoints.ConcatEndpoint , transport.DecodString , transport.EncodString),
}
}
我将router和数据穿透的接口写到一个文件中了,可能会有一些重复,但是主要讲的是router的实现
grpc.NewServer实现了ServeGRPC接口,所以,我们使用grpc.NewServer实现。
并将endpoint层transport层整合到一起
之前使用过http web进行服务注册,其实大同小异,都差不多。
package register
import (
"github.com/go-kit/kit/sd/consul"
"github.com/hashicorp/consul/api"
"log"
"strconv"
)
type ConsulServer struct {
Adress string
Port int
Client consul.Client
}
func NewRegister(adress string , port int) *ConsulServer {
config := api.DefaultConfig()
config.Address = "127.0.0.1:8500"
apiserver , err := api.NewClient(config)
if err != nil {
log.Println(err)
return nil
}
client := consul.NewClient(apiserver)
return &ConsulServer{Client: client , Adress: adress , Port: port}
}
func (c ConsulServer) Register (router ,name , id string) bool {
err := c.Client.Register(&api.AgentServiceRegistration{
Port: c.Port,
ID: id,
Address: c.Adress,
Name: name,
Check: &api.AgentServiceCheck{GRPC: c.Adress + ":" + strconv.Itoa(c.Port) + "/" + router,
Interval: "5s",
Timeout: "5s",
},
})
if err != nil {
log.Println(err)
return false
}
return true
}
和http注册不同的是再check中不同http是一个restful接口,可以通过url进行访问,但是grpc不行,这也就是两者区别。
所以在进行服务健康检查时,就有所不同。
下面进行服务启动main
package main
import (
"bkgrpc/endpoint"
"bkgrpc/proto"
"bkgrpc/register"
"bkgrpc/router"
"bkgrpc/services"
"google.golang.org/grpc"
"log"
"net"
)
func main () {
svc := services.ServicesA{}
endpoints := endpoint.EndpointA{
ConcatEndpoint: endpoint.MakeConcatEndpoint(svc),
DiffEndpoint: endpoint.MakeDiffEndpoint(svc),
HealthEndpoint: endpoint.MakeHealthEndpoint(svc),
}
r := router.NewRouter(endpoints)
lis , err := net.Listen("tcp" , ":8085")
if err != nil {
log.Println(err)
return
}
grpcserver := grpc.NewServer()
gproto.RegisterStringServicesServer(grpcserver , r)
//register into consul
client := register.NewRegister("127.0.0.1" , 8085)
client.Register("health" , "testname" , "testid")
grpcserver.Serve(lis)
}
使用创建service实例,进行服务整合。
所有源代码都在gitee上,(github上传太慢)
源码地址go-kit+grpc+consul源码地址