当前位置: 首页 > 工具软件 > Log Kit By Go > 使用案例 >

go-kit微服务学习-官方示例stringsvc学习

佴博实
2023-12-01

kit库

该库详细的文档可以参考官方文档,本文只是针对kit官网给出的stringsvc相关例子示例的学习。

示例代码stringsvc1

package main

import (
	"context"
	"encoding/json"
	"errors"
	"log"
	"net/http"
	"strings"

	"github.com/go-kit/kit/endpoint"
	httptransport "github.com/go-kit/kit/transport/http"
)

// StringService provides operations on strings.
type StringService interface {
	Uppercase(string) (string, error)
	Count(string) int
}

// stringService is a concrete implementation of StringService
type stringService struct{}

func (stringService) Uppercase(s string) (string, error) {
	if s == "" {
		return "", ErrEmpty
	}
	return strings.ToUpper(s), nil
}

func (stringService) Count(s string) int {
	return len(s)
}

// ErrEmpty is returned when an input string is empty.
var ErrEmpty = errors.New("empty string")

// For each method, we define request and response structs
type uppercaseRequest struct {
	S string `json:"s"`
}

type uppercaseResponse struct {
	V   string `json:"v"`
	Err string `json:"err,omitempty"` // errors don't define JSON marshaling
}

type countRequest struct {
	S string `json:"s"`
}

type countResponse struct {
	V int `json:"v"`
}

// Endpoints are a primary abstraction in go-kit. An endpoint represents a single RPC (method in our service interface)
func makeUppercaseEndpoint(svc StringService) endpoint.Endpoint {
	return func(_ context.Context, request interface{}) (interface{}, error) {
		req := request.(uppercaseRequest)
		v, err := svc.Uppercase(req.S)
		if err != nil {
			return uppercaseResponse{v, err.Error()}, nil
		}
		return uppercaseResponse{v, ""}, nil
	}
}

func makeCountEndpoint(svc StringService) endpoint.Endpoint {
	return func(_ context.Context, request interface{}) (interface{}, error) {
		req := request.(countRequest)
		v := svc.Count(req.S)
		return countResponse{v}, nil
	}
}

// Transports expose the service to the network. In this first example we utilize JSON over HTTP.
func main() {
	svc := stringService{}

	uppercaseHandler := httptransport.NewServer(
		makeUppercaseEndpoint(svc),
		decodeUppercaseRequest,
		encodeResponse,
	)

	countHandler := httptransport.NewServer(
		makeCountEndpoint(svc),
		decodeCountRequest,
		encodeResponse,
	)

	http.Handle("/uppercase", uppercaseHandler)
	http.Handle("/count", countHandler)
	log.Fatal(http.ListenAndServe(":7000", nil))
}

func decodeUppercaseRequest(_ context.Context, r *http.Request) (interface{}, error) {
	var request uppercaseRequest
	if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
		return nil, err
	}
	return request, nil
}

func decodeCountRequest(_ context.Context, r *http.Request) (interface{}, error) {
	var request countRequest
	if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
		return nil, err
	}
	return request, nil
}

func encodeResponse(_ context.Context, w http.ResponseWriter, response interface{}) error {
	return json.NewEncoder(w).Encode(response)
}

第一个例子就简单的一百余行代码,这段代码就可以运行出基础的逻辑流程。在终端中运行该脚本。

wuzi$ go run stringsvc1.go

在终端中输入如下访问;

 wuzi$ curl -XPOST -d'{"s":"hello, world"}' localhost:7000/uppercase
{"v":"HELLO, WORLD"}
 wuzi$ curl -XPOST -d'{"s":"hello, world"}' localhost:7000/count
{"v":12}

这就是官方给出的最简单的示例,我们大致来分析一下这个示例的执行的逻辑流程。

在进行深入分析的开始,我们可以通过文档的描述可知,kit抽象了三个比较重要的概念

  1. Service,服务是定义的一组接口,该接口贯穿到整个中间件的调用,是串联起来所有流程的一个接口。
  2. Endpoint,可以抽象的定义为任何一个服务的请求,如rpc、http等,可当做流程中返回值的类型。
  3. Transports,通信的方式选择,可选择http,rpc和amqp等通信方式。

有了如上的三个基本概念之后,我们开始查看一个流程的过程。

流程分析
	svc := stringService{}

	uppercaseHandler := httptransport.NewServer(
		makeUppercaseEndpoint(svc),
		decodeUppercaseRequest,
		encodeResponse,
	)

	countHandler := httptransport.NewServer(
		makeCountEndpoint(svc),
		decodeCountRequest,
		encodeResponse,
	)

	http.Handle("/uppercase", uppercaseHandler)
	http.Handle("/count", countHandler)

众所周知,http.Handle中传入的都是一个handler类型的数据,即uppercaseHandler和countHandler都实现了handler的方法。在本例中httptransport是github.com/go-kit/kit/transport/http,即指定了http的方式,查看该对应的NewServer方法。

func NewServer(
	e endpoint.Endpoint,
	dec DecodeRequestFunc,
	enc EncodeResponseFunc,
	options ...ServerOption,
) *Server {
	s := &Server{
		e:            e,
		dec:          dec,
		enc:          enc,
		errorEncoder: DefaultErrorEncoder,
		errorHandler: transport.NewLogErrorHandler(log.NewNopLogger()),
	}
	for _, option := range options {
		option(s)
	}
	return s
}

从本例中可看出最终返回了Server,即Server实现了ServeHTTP方法。

func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	ctx := r.Context()

	if len(s.finalizer) > 0 {
		iw := &interceptingWriter{w, http.StatusOK, 0}
		defer func() {
			ctx = context.WithValue(ctx, ContextKeyResponseHeaders, iw.Header())
			ctx = context.WithValue(ctx, ContextKeyResponseSize, iw.written)
			for _, f := range s.finalizer {  // 是否注册了请求处理完成之后的回调方法如果有则遍历执行
				f(ctx, iw.code, r)
			}
		}()
		w = iw
	}

	for _, f := range s.before {    // 检查是否有再请求执行之前的注册回调函数有则执行
		ctx = f(ctx, r)
	}

	request, err := s.dec(ctx, r)    // 解析传入的参数并生成request
	if err != nil {
		s.errorHandler.Handle(ctx, err)
		s.errorEncoder(ctx, err, w)
		return
	}

	response, err := s.e(ctx, request)   // 调用service中注册的处理业务的函数处理
	if err != nil {
		s.errorHandler.Handle(ctx, err)
		s.errorEncoder(ctx, err, w)
		return
	}

	for _, f := range s.after {   			// 是否有在处理完成之后执行的回调函数如果有则执行
		ctx = f(ctx, w)
	}

	if err := s.enc(ctx, w, response); err != nil {   // 调用解析返回的数据将解析的数据返回
		s.errorHandler.Handle(ctx, err)
		s.errorEncoder(ctx, err, w)
		return
	}
}

在本例中真正执行的方法其实是如下两个函数;

func makeUppercaseEndpoint(svc StringService) endpoint.Endpoint {
	return func(_ context.Context, request interface{}) (interface{}, error) {
		req := request.(uppercaseRequest)
		v, err := svc.Uppercase(req.S)
		if err != nil {
			return uppercaseResponse{v, err.Error()}, nil
		}
		return uppercaseResponse{v, ""}, nil
	}
}

func makeCountEndpoint(svc StringService) endpoint.Endpoint {
	return func(_ context.Context, request interface{}) (interface{}, error) {
		req := request.(countRequest)
		v := svc.Count(req.S)
		return countResponse{v}, nil
	}
}

通过传入的svc来调用真正执行业务的Uppercase方法和Count方法,而这些方法的传入参数都是通过解析函数decodeUppercaseRequest和decodeCountRequest来解析并通过encodeResponse将数据序列化返回。

看完了第一个stringsvc1,发现好像跟普通的web框架好像流程上也没有太多不同。接着我们再直接查看stringsvc3的示例代码。

示例代码stringsvc3

示例代码就不贴了,stringsvc3代码自行查看

项目的结构就划分成了如下几个文件,

  • instrumenting.go,该文件主要就是进行监控数据的中间件。
  • logging.go,主要就是打印日志的中间件。
  • proxying.go,代理服务的中间件,将服务代理到后端。
  • service.go,编写服务的文件。
  • transport.go,通信编码解码,编写传入数据与解析数据服务调用的文件。
  • main.go,程序执行的入口文件。

根据官方提供的启动方式;

MacBook-Pro:stringsvc3 wuzi$ stringsvc3 -listen=:8001 &
[1] 55441
listen=:8001 caller=proxying.go:27 proxy_to=none
listen=:8001 caller=main.go:68 msg=HTTP addr=:8001
MacBook-Pro:stringsvc3 wuzi$ stringsvc3 -listen=:8002 &
[2] 55473
listen=:8002 caller=proxying.go:27 proxy_to=none
listen=:8002 caller=main.go:68 msg=HTTP addr=:8002
MacBook-Pro:stringsvc3 wuzi$ stringsvc3 -listen=:8003 &
[3] 55506
listen=:8003 caller=proxying.go:27 proxy_to=none
listen=:8003 caller=main.go:68 msg=HTTP addr=:8003
MacBook-Pro:stringsvc3 wuzi$ stringsvc3 -listen=:7000 -proxy=localhost:8001,localhost:8002,localhost:8003
listen=:7000 caller=proxying.go:46 proxy_to="[localhost:8001 localhost:8002 localhost:8003]"
listen=:7000 caller=main.go:68 msg=HTTP addr=:7000

此时打开新的终端访问;

MacBook-Pro:microt wuzi$ for s in foo bar baz ; do curl -d"{\"s\":\"$s\"}" localhost:7000/uppercase ; done

运行服务的终端就会输出;

listen=:8001 caller=logging.go:22 method=uppercase input=foo output=FOO err=null took=2.593µs
listen=:7000 caller=logging.go:22 method=uppercase input=foo output=FOO err=null took=4.498745ms
listen=:8002 caller=logging.go:22 method=uppercase input=bar output=BAR err=null took=3.469µs
listen=:7000 caller=logging.go:22 method=uppercase input=bar output=BAR err=null took=4.661837ms
listen=:8003 caller=logging.go:22 method=uppercase input=baz output=BAZ err=null took=2.66µs
listen=:7000 caller=logging.go:22 method=uppercase input=baz output=BAZ err=null took=2.090753ms

从输出可以看出,将三个请求分别代理到了不同的后端服务上去了。

反向代理流程

在main.go函数中;

package main

import (
	"context"
	"flag"
	"net/http"
	"os"

	stdprometheus "github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/promhttp"

	"github.com/go-kit/kit/log"
	kitprometheus "github.com/go-kit/kit/metrics/prometheus"
	httptransport "github.com/go-kit/kit/transport/http"
)

func main() {
	var (
		listen = flag.String("listen", ":8080", "HTTP listen address")
		proxy  = flag.String("proxy", "", "Optional comma-separated list of URLs to proxy uppercase requests")
	)
	flag.Parse()

	var logger log.Logger
	logger = log.NewLogfmtLogger(os.Stderr)
	logger = log.With(logger, "listen", *listen, "caller", log.DefaultCaller)

	fieldKeys := []string{"method", "error"}
	requestCount := kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
		Namespace: "my_group",
		Subsystem: "string_service",
		Name:      "request_count",
		Help:      "Number of requests received.",
	}, fieldKeys)
	requestLatency := kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
		Namespace: "my_group",
		Subsystem: "string_service",
		Name:      "request_latency_microseconds",
		Help:      "Total duration of requests in microseconds.",
	}, fieldKeys)
	countResult := kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
		Namespace: "my_group",
		Subsystem: "string_service",
		Name:      "count_result",
		Help:      "The result of each count method.",
	}, []string{})

	var svc StringService
	svc = stringService{}
	svc = proxyingMiddleware(context.Background(), *proxy, logger)(svc) // 注册反向代理的中间件,如果proxy有数据则是反向代理开启
	svc = loggingMiddleware(logger)(svc)
	svc = instrumentingMiddleware(requestCount, requestLatency, countResult)(svc)

	uppercaseHandler := httptransport.NewServer(
		makeUppercaseEndpoint(svc),
		decodeUppercaseRequest,
		encodeResponse,
	)
	countHandler := httptransport.NewServer(
		makeCountEndpoint(svc),
		decodeCountRequest,
		encodeResponse,
	)

	http.Handle("/uppercase", uppercaseHandler)
	http.Handle("/count", countHandler)
	http.Handle("/metrics", promhttp.Handler())
	logger.Log("msg", "HTTP", "addr", *listen)
	logger.Log("err", http.ListenAndServe(*listen, nil))
}

其中主要起作用的就是proxyingMiddleware,因为该中间件将决定启动的实例是不是执行反向代理的服务。

package main

import (
	"context"
	"errors"
	"fmt"
	"net/url"
	"strings"
	"time"

	"golang.org/x/time/rate"

	"github.com/sony/gobreaker"

	"github.com/go-kit/kit/circuitbreaker"
	"github.com/go-kit/kit/endpoint"
	"github.com/go-kit/kit/log"
	"github.com/go-kit/kit/ratelimit"
	"github.com/go-kit/kit/sd"
	"github.com/go-kit/kit/sd/lb"
	httptransport "github.com/go-kit/kit/transport/http"
)

func proxyingMiddleware(ctx context.Context, instances string, logger log.Logger) ServiceMiddleware {
	// If instances is empty, don't proxy.
	if instances == "" {
		logger.Log("proxy_to", "none")
		return func(next StringService) StringService { return next }  // 如果不是代理服务则直接就行下一个中间件处理
	}

	// Set some parameters for our client.
	var (
		qps         = 100                    // beyond which we will return an error
		maxAttempts = 3                      // per request, before giving up
		maxTime     = 250 * time.Millisecond // wallclock time, before giving up
	)

	// Otherwise, construct an endpoint for each instance in the list, and add
	// it to a fixed set of endpoints. In a real service, rather than doing this
	// by hand, you'd probably use package sd's support for your service
	// discovery system.
	var (
		instanceList = split(instances)
		endpointer   sd.FixedEndpointer
	)
	logger.Log("proxy_to", fmt.Sprint(instanceList))   // 获取代理的监听列表
	for _, instance := range instanceList {            // 遍历每一个监控列表
		var e endpoint.Endpoint
		e = makeUppercaseProxy(ctx, instance)            // 创建一个连接实例当前基于http
		e = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(e)  // 设置熔断机制
		e = ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), qps))(e)  // 设置限速机制
		endpointer = append(endpointer, e)
	}

	// Now, build a single, retrying, load-balancing endpoint out of all of
	// those individual endpoints.
	balancer := lb.NewRoundRobin(endpointer)    // 使用轮训的负载均衡方式
	retry := lb.Retry(maxAttempts, maxTime, balancer)   // 设置重试参数

	// And finally, return the ServiceMiddleware, implemented by proxymw.
	return func(next StringService) StringService {
		return proxymw{ctx, next, retry}         // 调用proxymw的服务处理方式
	}
}

// proxymw implements StringService, forwarding Uppercase requests to the
// provided endpoint, and serving all other (i.e. Count) requests via the
// next StringService.
type proxymw struct {
	ctx       context.Context
	next      StringService     // Serve most requests via this service...
	uppercase endpoint.Endpoint // ...except Uppercase, which gets served by this endpoint
}

func (mw proxymw) Count(s string) int {
	return mw.next.Count(s)
}

func (mw proxymw) Uppercase(s string) (string, error) {
	response, err := mw.uppercase(mw.ctx, uppercaseRequest{S: s})  // 调用后端服务来处理清楚
	if err != nil {
		return "", err
	}

	resp := response.(uppercaseResponse)   // 将处理之后的后端请求返回
	if resp.Err != "" {
		return resp.V, errors.New(resp.Err)
	}
	return resp.V, nil
}

func makeUppercaseProxy(ctx context.Context, instance string) endpoint.Endpoint {
	if !strings.HasPrefix(instance, "http") {
		instance = "http://" + instance
	}
	u, err := url.Parse(instance)
	if err != nil {
		panic(err)
	}
	if u.Path == "" {
		u.Path = "/uppercase"
	}
	return httptransport.NewClient(
		"GET",
		u,
		encodeRequest,
		decodeUppercaseResponse,
	).Endpoint()
}

func split(s string) []string {
	a := strings.Split(s, ",")
	for i := range a {
		a[i] = strings.TrimSpace(a[i])
	}
	return a
}

当请求进来的时候第一个接入的就是该中间件,如果是代理的实例则执行代理服务将请求通过轮训的方式发到后端中,然后将处理完成的请求再返回给客户端。每次请求都选择一个后端实例将请求发送出去。

总结

通过两个示例的学习与总结,kit提供的更多的是一个标准的开箱即用的一些方案和流程,如果把单独的方向代理,服务注册发现,服务降级熔断等内容来实现的话,不同的团队可能会开发出不同的标准来,使用kit库可以提供更标准的解决方案。由于本人才疏学浅,如有错误请批评指正。

 类似资料: