服务端
var (
port = ":50051"
crtFile = "server.crt"
keyFile = "server.key"
)
func main() {
// 读取解析公私钥,创建TLS的证书
cert, err := tls.LoadX509KeyPair(crtFile,keyFile)
if err != nil {
log.Fatalf("failed to load key pair: %s", err)
}
opts := []grpc.ServerOption{
grpc.Creds(credentials.NewServerTLSFromCert(&cert))
}
// 通过TLS证书来创建RPC服务器,
s := grpc.NewServer(opts...)
pb.RegisterProductInfoServer(s, &server{})
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
客户端
var (
address = "localhost:50051"
hostname = "localhost
crtFile = "server.crt"
)
func main() {
// 读取解析公开证书,创建TLS的证书
creds, err := credentials.NewClientTLSFromFile(crtFile, hostname)
if err != nil {
log.Fatalf("failed to load credentials: %v", err)
}
opts := []grpc.DialOption{
grpc.WithTransportCredentials(creds),
}
// 建立安全连接
conn, err := grpc.Dial(address, opts...)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewProductInfoClient(conn)
}
这个比较简单了,客户端发送的头信息中 Authorization由字符串“ basic base64(user:passwd) ”组成
type basicAuth struct {
username string
password string
}
// 编码逻辑
func (b basicAuth) GetRequestMetadata(ctx context.Context, in ...string) (map[string]string, error) {
auth := b.username + ":" + b.password
enc := base64.StdEncoding.EncodeToString([]byte(auth))
return map[string]string{
"authorization": "Basic " + enc,
}, nil
}
// 传递凭证时是否启用安全通道
func (b basicAuth) RequireTransportSecurity() bool {
return true
}
客户端
func main() {
// 读取解析公开证书,创建TLS的证书
creds, err := credentials.NewClientTLSFromFile(crtFile, hostname)
if err != nil {
log.Fatalf("failed to load credentials: %v", err)
}
auth := basicAuth{
username: "admin",
password: "admin",
}
opts := []grpc.DialOption{
// 添加认证凭证函数
grpc.WithPerRPCCredentials(auth),
grpc.WithTransportCredentials(creds),
}
// 建立安全连接
conn, err := grpc.Dial(address, opts...)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewProductInfoClient(conn)
}
服务端
func main() {
// 读取解析公私钥,创建TLS的证书
cert, err := tls.LoadX509KeyPair(crtFile,keyFile)
if err != nil {
log.Fatalf("failed to load key pair: %s", err)
}
opts := []grpc.ServerOption{
// UnaryInterceptor 添加一个拦截器函数
grpc.UnaryInterceptor(ensureValidBasicCredentials),
grpc.Creds(credentials.NewServerTLSFromCert(&cert))
}
// 通过TLS证书来创建RPC服务器,
s := grpc.NewServer(opts...)
pb.RegisterProductInfoServer(s, &server{})
lis, err := net.Listen("tcp", port)
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
// 拦截器函数
func ensureValidBasicCredentials(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (interface{}, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, errMissingMetadata
}
// 检查头信息 authorization
if !valid(md["authorization"]) {
return nil, errInvalidToken
}
// 调用执行RPC
return handler(ctx, req)
}
func valid(authorization []string) bool {
if len(authorization) < 1 {
return false
}
token := strings.TrimPrefix(authorization[0], "Basic ")
return token == base64.StdEncoding.EncodeToString([]byte("admin:admin"))
}
通过令牌凭证访问服务端资源,例子中没有授权服务器,直接使用的硬编码演示
客户端
func main() {
creds, err := credentials.NewClientTLSFromFile(crtFile, hostname)
if err != nil {
log.Fatalf("failed to load credentials: %v", err)
}
// 令牌凭证
perRPC := oauth.NewOauthAccess(fetchToken())
opts := []grpc.DialOption{
// 使用连接凭证
grpc.WithPerRPCCredentials(perRPC),
grpc.WithTransportCredentials(creds),
}
conn, err := grpc.Dial(address, opts...)
defer conn.Close()
c := pb.NewProductInfoClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
}
// 授权服务器
func fetchToken() *oauth2.Token {
return &oauth2.Token{
AccessToken: "some-secret-token",
}
}
服务端
func main() {
cert, err := tls.LoadX509KeyPair(crtFile, keyFile)
opts := []grpc.ServerOption{
grpc.Creds(credentials.NewServerTLSFromCert(&cert)),
// 拦截器检验凭证
grpc.UnaryInterceptor(ensureValidToken),
}
s := grpc.NewServer(opts...)
pb.RegisterProductInfoServer(s, &server{})
lis, err := net.Listen("tcp", port)
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
// 拦截器函数
func ensureValidToken(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, errMissingMetadata
}
if !valid(md["authorization"]) {
return nil, errInvalidToken
}
// 调用执行RPC
return handler(ctx, req)
}
func valid(authorization []string) bool {
if len(authorization) < 1 {
return false
}
token := strings.TrimPrefix(authorization[0], "Bearer ")
return token == "some-secret-token"
}
func main() {
creds, err := credentials.NewClientTLSFromFile(crtFile, hostname)
if err != nil {
log.Fatalf("failed to load credentials: %v", err)
}
// JWT 凭证
jwtCreds, err := oauth.NewJWTAccessFromFile(“token.json”)
opts := []grpc.DialOption{
// 使用连接凭证
grpc.WithPerRPCCredentials(jwtCreds),
grpc.WithTransportCredentials(creds),
}
conn, err := grpc.Dial(address, opts...)
defer conn.Close()
c := pb.NewProductInfoClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
}
func TestServer_AddProduct(t *testing.T) {
// gRPC服务器
grpcServer := initGRPCServerHTTP2()
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
grpcServer.Stop()
t.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewProductInfoClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
name := "Sumsung S10"
description := "Samsung Galaxy S10 is the latest smart phone, launched in
February 2019"
price := float32(700.0)
r, err := c.AddProduct(ctx, &pb.Product{Name: name,Description: description, Price: price})
log.Printf("Res %s", r.Value)
grpcServer.Stop()
}
func TestAddProduct(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
// 创建mock对象,对远程方法调用
mocklProdInfoClient := NewMockProductInfoClient(ctrl)
// 对mock对象进行编码
mocklProdInfoClient.
req := &pb.Product{Name: name, Description: description, Price: price}
// 调用 AddProduct方法
EXPECT().AddProduct(gomock.Any(), &rpcMsg{msg: req},).
// 返回 Product的mock值
Return(&wrapper.StringValue{Value: "ABC123" + name}, nil)
// 调用存根的远程方法
testAddProduct(t, mocklProdInfoClient)
}
func testAddProduct(t *testing.T, client pb.ProductInfoClient) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
r, err := client.AddProduct(ctx, &pb.Product{Name: name,Description: description, Price: price})
}
服务端
docker image build -t productinfo-server -f server/Dockerfile
// 构建环境
FROM golang AS build
ENV location /example/go
// 添加目录
WORKDIR ${location}/server
ADD ./server ${location}/server
ADD ./proto ${location}/proto
// 下载依赖和安装所有包
RUN go get -d ./...
RUN go install ./...
// 编译构建到bin目录
RUN CGO_ENABLED=0 go build -o /bin/productinfo-server
FROM scratch
COPY --from=build /bin/productinfo-server /bin/productinfo-server
ENTRYPOINT ["/bin/productinfo-server"]
// 暴露端口
EXPOSE 50051
客户端
docker run -it --network=mynet --name=productinfo --hostname=productinfo-server -p 50051:50051 grpc-productinfo-server
docker run -it --network=mynet --hostname=productinfo-client
FROM golang AS build
ENV location /example/go
WORKDIR ${location}/client
ADD ./client ${location}/client
ADD ./proto-gen ${location}/proto
RUN go get -d ./...
RUN go install ./...
RUN CGO_ENABLED=0 go build -o /bin/productinfo-client
FROM scratch
COPY --from=build /bin/productinfo-client /bin/productinfo-client
ENTRYPOINT ["/bin/grpc-productinfo-client"]
EXPOSE 50051
使用上面的docker容器部署到k8s中,k8s不直接管理容器,而是以pod为单位包含一个或多个的容器。
同一个pod中运行的多个容器会共享资源和本地网络
kubectl apply -f server/prodinfo-server.yaml
kubectl apply -f client/prodinfo-client.yaml
服务端
apiVersion: apps/v1
kind: Deployment
metadata:
name: productinfo-server
spec:
replicas: 1 # 同时运行的gRPC服务端pod数量
selector:
matchLabels:
app: productinfo-server
template:
metadata:
labels:
app: productinfo-server
spec:
containers:
- name: productinfo-server # gRPC服务端容器的镜像和tag
image: kasunindrasiri/productinfo-server
resources:
limits:
memory: "128Mi"
cpu: "500m"
ports:
- containerPort: 50051
name: grpc
---
apiVersion: v1
kind: Service
metadata:
name: productinfo
spec:
selector:
app: productinfo-server
ports:
- port: 50051
targetPort: 50051
name: grpc
type: NodePort
客户端
apiVersion: batch/v1
kind: Job
metadata:
name: productinfo-client
spec:
completions: 1 # 在job完成之前,pod需要成功运行的次数
parallelism: 1 # 并行运行的pod数量
template:
spec:
containers:
- name: productinfo-client # gRPC客户端容器的镜像
image: kasunindrasiri/productinfo-client
restartPolicy: Never
backoffLimit: 4
负载均衡器
部署的服务端只能暴露给集群中运行的其他pod,k8s提供了ingress为外部应用访问
ingress视为k8s和外部应用的一个负载均衡器,将外部流量转发到service匹配pod转移到内部流量
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
annotations:
kubernetes.io/ingress.class: "nginx"
nginx.ingress.kubernetes.io/ssl-redirect: "false"
nginx.ingress.kubernetes.io/backend-protocol: "GRPC"
name: prodinfo-ingress
spec:
rules:
- host: productinfo # 暴露给外部的主机名
http:
paths:
- backend:
serviceName: productinfo
servicePort: grpc # 服务端口名称
允许暴露gRPC服务器状态,客户端就可以通过探查状态来判断是否健康
syntax = "proto3";
message HealthCheckRequest {
string service = 1;
}
message HealthCheckResponse {
enum ServingStatus {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
}
ServingStatus status = 1;
}
service Health {
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
}
因为前面的需要自己从头去实现比较麻烦,这里就可以使用社区的一个工具 grpc_health_probe
grpc_health_probe -addr=localhost:50051
grpc_health_probe -addr=localhost:50052 -connect-timeout 600ms -rpc-timeout 300ms
打包到docker中
RUN GRPC_HEALTH_PROBE_VERSION=v0.3.0 && \
wget -qO/bin/grpc_health_probe \
https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \
chmod +x /bin/grpc_health_probe”