当前位置: 首页 > 工具软件 > grpc-go > 使用案例 >

gRPC学习Go版(三)

贺卜霸
2023-12-01

gRPC安全连接

单向TLS安全连接

服务端

var (
  port = ":50051"
  crtFile = "server.crt"
  keyFile = "server.key"
)

func main() {
  // 读取解析公私钥,创建TLS的证书
  cert, err := tls.LoadX509KeyPair(crtFile,keyFile) 
  if err != nil {
     log.Fatalf("failed to load key pair: %s", err)
  }
  
  opts := []grpc.ServerOption{
     grpc.Creds(credentials.NewServerTLSFromCert(&cert)) 
  }
	// 通过TLS证书来创建RPC服务器,
  s := grpc.NewServer(opts...) 
  pb.RegisterProductInfoServer(s, &server{}) 

  lis, err := net.Listen("tcp", port) 
  if err != nil {
     log.Fatalf("failed to listen: %v", err)
  }
  if err := s.Serve(lis); err != nil { 
     log.Fatalf("failed to serve: %v", err)
  }
}

客户端

var (
  address = "localhost:50051"
  hostname = "localhost
  crtFile = "server.crt"
)

func main() {
  // 读取解析公开证书,创建TLS的证书
  creds, err := credentials.NewClientTLSFromFile(crtFile, hostname) 
  if err != nil {
     log.Fatalf("failed to load credentials: %v", err)
  }
  
  opts := []grpc.DialOption{
     grpc.WithTransportCredentials(creds), 
  }
	// 建立安全连接
  conn, err := grpc.Dial(address, opts...) 
  if err != nil {
     log.Fatalf("did not connect: %v", err)
  }
  defer conn.Close() 
  c := pb.NewProductInfoClient(conn) 
}

basic 认证

这个比较简单了,客户端发送的头信息中 Authorization由字符串“ basic base64(user:passwd) ”组成

type basicAuth struct { 
  username string
  password string
}
// 编码逻辑
func (b basicAuth) GetRequestMetadata(ctx context.Context, in ...string)  (map[string]string, error) { 
  auth := b.username + ":" + b.password
  enc := base64.StdEncoding.EncodeToString([]byte(auth))
  return map[string]string{
     "authorization": "Basic " + enc,
  }, nil
}
// 传递凭证时是否启用安全通道
func (b basicAuth) RequireTransportSecurity() bool { 
  return true
}

客户端

func main() {
  // 读取解析公开证书,创建TLS的证书
  creds, err := credentials.NewClientTLSFromFile(crtFile, hostname) 
  if err != nil {
     log.Fatalf("failed to load credentials: %v", err)
  }
  
  auth := basicAuth{ 
    username: "admin",
    password: "admin",
  }
  opts := []grpc.DialOption{
     // 添加认证凭证函数
     grpc.WithPerRPCCredentials(auth),
     grpc.WithTransportCredentials(creds), 
  }
	// 建立安全连接
  conn, err := grpc.Dial(address, opts...) 
  if err != nil {
     log.Fatalf("did not connect: %v", err)
  }
  defer conn.Close() 
  c := pb.NewProductInfoClient(conn) 
}

服务端

func main() {
  // 读取解析公私钥,创建TLS的证书
  cert, err := tls.LoadX509KeyPair(crtFile,keyFile) 
  if err != nil {
     log.Fatalf("failed to load key pair: %s", err)
  }
  
  opts := []grpc.ServerOption{
     // UnaryInterceptor 添加一个拦截器函数
     grpc.UnaryInterceptor(ensureValidBasicCredentials),
     grpc.Creds(credentials.NewServerTLSFromCert(&cert)) 
  }
	// 通过TLS证书来创建RPC服务器,
  s := grpc.NewServer(opts...) 
  pb.RegisterProductInfoServer(s, &server{}) 

  lis, err := net.Listen("tcp", port) 
  if err := s.Serve(lis); err != nil { 
     log.Fatalf("failed to serve: %v", err)
  }
}
// 拦截器函数
func ensureValidBasicCredentials(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
	handler grpc.UnaryHandler) (interface{}, error) {
	md, ok := metadata.FromIncomingContext(ctx)
	if !ok {
		return nil, errMissingMetadata
	}
	// 检查头信息 authorization
	if !valid(md["authorization"]) {
		return nil, errInvalidToken
	}
	// 调用执行RPC
	return handler(ctx, req)
}

func valid(authorization []string) bool {
	if len(authorization) < 1 {
		return false
	}
	token := strings.TrimPrefix(authorization[0], "Basic ")
	return token == base64.StdEncoding.EncodeToString([]byte("admin:admin"))
}

OAuth 2.0认证

通过令牌凭证访问服务端资源,例子中没有授权服务器,直接使用的硬编码演示

客户端

func main() {
	creds, err := credentials.NewClientTLSFromFile(crtFile, hostname)
	if err != nil {
		log.Fatalf("failed to load credentials: %v", err)
	}
  // 令牌凭证
  perRPC := oauth.NewOauthAccess(fetchToken())

	opts := []grpc.DialOption{
    // 使用连接凭证
		grpc.WithPerRPCCredentials(perRPC),
		grpc.WithTransportCredentials(creds),
	}

	conn, err := grpc.Dial(address, opts...)
	defer conn.Close()
	c := pb.NewProductInfoClient(conn)
  
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
}
// 授权服务器
func fetchToken() *oauth2.Token {
	return &oauth2.Token{
		AccessToken: "some-secret-token",
	}
}

服务端

func main() {
	cert, err := tls.LoadX509KeyPair(crtFile, keyFile)
	opts := []grpc.ServerOption{
		grpc.Creds(credentials.NewServerTLSFromCert(&cert)),
		// 拦截器检验凭证
		grpc.UnaryInterceptor(ensureValidToken),
	}

	s := grpc.NewServer(opts...)
	pb.RegisterProductInfoServer(s, &server{})
	lis, err := net.Listen("tcp", port)
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}
// 拦截器函数
func ensureValidToken(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
	md, ok := metadata.FromIncomingContext(ctx)
	if !ok {
		return nil, errMissingMetadata
	}
	if !valid(md["authorization"]) {
		return nil, errInvalidToken
	}
	// 调用执行RPC
	return handler(ctx, req)
}

func valid(authorization []string) bool {
	if len(authorization) < 1 {
		return false
	}
	token := strings.TrimPrefix(authorization[0], "Bearer ")
	return token == "some-secret-token"
}

JWT认证

func main() {
	creds, err := credentials.NewClientTLSFromFile(crtFile, hostname)
	if err != nil {
		log.Fatalf("failed to load credentials: %v", err)
	}
  // JWT 凭证
  jwtCreds, err := oauth.NewJWTAccessFromFile(“token.json”) 

	opts := []grpc.DialOption{
    // 使用连接凭证
		grpc.WithPerRPCCredentials(jwtCreds),
		grpc.WithTransportCredentials(creds),
	}

	conn, err := grpc.Dial(address, opts...)
	defer conn.Close()
	c := pb.NewProductInfoClient(conn)
  
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
}

gRPC测试

服务端测试

func TestServer_AddProduct(t *testing.T) { 
  // gRPC服务器
	grpcServer := initGRPCServerHTTP2() 
	conn, err := grpc.Dial(address, grpc.WithInsecure()) 
	if err != nil {
    grpcServer.Stop()
    t.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	c := pb.NewProductInfoClient(conn)
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
  
  name := "Sumsung S10"
	description := "Samsung Galaxy S10 is the latest smart phone, launched in
	February 2019"
	price := float32(700.0)
	r, err := c.AddProduct(ctx, &pb.Product{Name: name,Description: description, Price: price}) 
	log.Printf("Res %s", r.Value)
  
  grpcServer.Stop()
}

客户端测试

func TestAddProduct(t *testing.T) {
	ctrl := gomock.NewController(t)
	defer ctrl.Finish()
  // 创建mock对象,对远程方法调用
	mocklProdInfoClient := NewMockProductInfoClient(ctrl) 
  // 对mock对象进行编码
	mocklProdInfoClient. 
  
  req := &pb.Product{Name: name, Description: description, Price: price}
  // 调用 AddProduct方法
	EXPECT().AddProduct(gomock.Any(), &rpcMsg{msg: req},).
  // 返回 Product的mock值
	Return(&wrapper.StringValue{Value: "ABC123" + name}, nil) 
  
	// 调用存根的远程方法
	testAddProduct(t, mocklProdInfoClient) 
}

func testAddProduct(t *testing.T, client pb.ProductInfoClient) {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	r, err := client.AddProduct(ctx, &pb.Product{Name: name,Description: description, Price: price})

}

gRPC部署

部署docker

服务端

docker  image  build  -t  productinfo-server  -f  server/Dockerfile
// 构建环境
FROM golang AS build
ENV location /example/go
// 添加目录
WORKDIR ${location}/server
ADD ./server ${location}/server
ADD ./proto ${location}/proto
// 下载依赖和安装所有包
RUN go get -d ./...
RUN go install ./...
// 编译构建到bin目录
RUN CGO_ENABLED=0 go build -o /bin/productinfo-server

FROM scratch
COPY --from=build /bin/productinfo-server /bin/productinfo-server

ENTRYPOINT ["/bin/productinfo-server"]
// 暴露端口
EXPOSE 50051

客户端

docker run -it --network=mynet --name=productinfo  --hostname=productinfo-server  -p 50051:50051  grpc-productinfo-server

docker run -it --network=mynet --hostname=productinfo-client
FROM golang AS build
ENV location /example/go

WORKDIR ${location}/client
ADD ./client ${location}/client
ADD ./proto-gen ${location}/proto

RUN go get -d ./...
RUN go install ./...

RUN CGO_ENABLED=0 go build -o /bin/productinfo-client

FROM scratch
COPY --from=build /bin/productinfo-client /bin/productinfo-client

ENTRYPOINT ["/bin/grpc-productinfo-client"]
EXPOSE 50051

部署k8s

使用上面的docker容器部署到k8s中,k8s不直接管理容器,而是以pod为单位包含一个或多个的容器。

同一个pod中运行的多个容器会共享资源和本地网络

kubectl apply -f server/prodinfo-server.yaml

kubectl apply -f client/prodinfo-client.yaml

服务端

apiVersion: apps/v1
kind: Deployment
metadata:
  name: productinfo-server
spec:
  replicas: 1		# 同时运行的gRPC服务端pod数量
  selector:
    matchLabels:
      app: productinfo-server
  template:
    metadata:
      labels:
        app: productinfo-server
    spec:
      containers:
      - name: productinfo-server		# gRPC服务端容器的镜像和tag
        image: kasunindrasiri/productinfo-server
        resources:
          limits:
            memory: "128Mi"
            cpu: "500m"
        ports:
        - containerPort: 50051
          name: grpc
---
apiVersion: v1
kind: Service
metadata:
  name: productinfo
spec:
  selector:
    app: productinfo-server
  ports:
  - port: 50051
    targetPort: 50051
    name: grpc
  type: NodePort

客户端

apiVersion: batch/v1
kind: Job
metadata:
  name: productinfo-client
spec:
  completions: 1		# 在job完成之前,pod需要成功运行的次数
  parallelism: 1		# 并行运行的pod数量
  template:
    spec:
      containers:
      - name: productinfo-client		# gRPC客户端容器的镜像
        image: kasunindrasiri/productinfo-client
      restartPolicy: Never
  backoffLimit: 4

负载均衡器

部署的服务端只能暴露给集群中运行的其他pod,k8s提供了ingress为外部应用访问

ingress视为k8s和外部应用的一个负载均衡器,将外部流量转发到service匹配pod转移到内部流量

apiVersion: extensions/v1beta1
kind: Ingress
metadata:
  annotations:
    kubernetes.io/ingress.class: "nginx"
    nginx.ingress.kubernetes.io/ssl-redirect: "false"
    nginx.ingress.kubernetes.io/backend-protocol: "GRPC"
  name: prodinfo-ingress
spec:
  rules:
  - host: productinfo		# 暴露给外部的主机名
    http:
      paths:
      - backend:
          serviceName: productinfo
          servicePort: grpc		# 服务端口名称

健康检查

允许暴露gRPC服务器状态,客户端就可以通过探查状态来判断是否健康

syntax = "proto3";

message HealthCheckRequest { 
  string service = 1;
}
message HealthCheckResponse { 
  enum ServingStatus {
    UNKNOWN = 0;
    SERVING = 1;
    NOT_SERVING = 2;
  }
  ServingStatus status = 1;
}

service Health {
  rpc Check(HealthCheckRequest) returns (HealthCheckResponse); 
  rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse); 
}

健康探针

因为前面的需要自己从头去实现比较麻烦,这里就可以使用社区的一个工具 grpc_health_probe

grpc_health_probe -addr=localhost:50051

grpc_health_probe -addr=localhost:50052 -connect-timeout 600ms -rpc-timeout 300ms

打包到docker中

RUN GRPC_HEALTH_PROBE_VERSION=v0.3.0 && \
    wget -qO/bin/grpc_health_probe \
    https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \
    chmod +x /bin/grpc_health_probe”
 类似资料: