个人觉得其实可以大概看一下文档就跳过官方的stringsvc例子了,实际实操可以从addsvc开始。
addsvc
的pkg分层如下
|- addendpoint
|- middleware.go
|- set.go
|- addservice
|- middleware.go
|- service.go
|- addtransport
|- grpc.go
|- http.go
|- jsonrpc.go
|- thrift.go
因此可看出,项目结构分层是:service -> endpoint -> transport(各种传输协议)
这里是存放主要的核心业务代码,可以独立于任何框架使用。
package addservice
import (
"context"
"github.com/go-kit/kit/log"
)
// 1. 定义一个service的interface规范
type AddService interface {
Sum(ctx context.Context, a, b int) (int, error)
}
// 2. 定义一个service class
type basicService struct{}
func NewBasicService() AddService {
return basicService{}
}
const (
intMax = 1<<31 - 1
intMin = -(intMax + 1)
maxLen = 10
)
func (s basicService) Sum(_ context.Context, a, b int) (int, error) {
if a == 0 && b == 0 {
return 0, ErrTwoZeroes
}
if (b > 0 && a > (intMax-b)) || (b < 0 && a < (intMin-b)) {
return 0, ErrIntOverflow
}
return a + b, nil
}
// 3. 构造service类对象
func New(logger log.Logger, ints, chars metrics.Counter) AddService {
var svc AddService
{
// 4. 可以在这里加入需要的midllewares
svc = NewBasicService()
}
return svc
}
endpoint是一个go-kit的抽象类,作用是将service封装一层,使其更好地适配go-kit的其他功能,本质上就是封装一下输入输出,提供一个RPC服务
核心代码如下:
package addendpoint
import (
"context"
"time"
"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/ratelimit"
"golang.org/x/time/rate"
"github.com/pascallin/go-micro-services/pkg/addsvc/addservice"
)
// 1. 定义一个enpoint set类
type Set struct {
SumEndpoint endpoint.Endpoint
}
// 2. 定义输入输出结构
type SumRequest struct {
A, B int
}
type SumResponse struct {
V int `json:"v"`
Err error `json:"-"`
}
// 3. 初始化一个endpoint
func MakeSumEndpoint(s addservice.AddService) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
req := request.(SumRequest)
v, err := s.Sum(ctx, req.A, req.B)
return SumResponse{V: v, Err: err}, nil
}
}
// 4. 定义方法的调用逻辑
func (s Set) Sum(ctx context.Context, a, b int) (int, error) {
resp, err := s.SumEndpoint(ctx, SumRequest{A: a, B: b})
if err != nil {
return 0, err
}
response := resp.(SumResponse)
return response.V, response.Err
}
// 5. 创建一个endpoint set对象
func New(svc addservice.AddService) Set {
var sumEndpoint endpoint.Endpoint
{
sumEndpoint = MakeSumEndpoint(svc)
sumEndpoint = ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 1))(sumEndpoint)
// 6. 这里可以使用针对endpoint的中间件
}
return Set{
SumEndpoint: sumEndpoint
}
}
endpoint还只是一个RPC调用,想要暴露服务给外部使用,还需要用到transport。
go-kit内置了各种各样的transport供我们使用,下面以HTTP为例。核心代码如下。
package addtransport
import (
"bytes"
"context"
"encoding/json"
"errors"
"net/http"
"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/transport"
httptransport "github.com/go-kit/kit/transport/http"
"github.com/pascallin/go-micro-services/pkg/addsvc/addendpoint"
"github.com/pascallin/go-micro-services/pkg/addsvc/addservice"
)
// 1. 定义方法的输入输出的编解码逻辑
func DecodeHTTPSumRequest(_ context.Context, r *http.Request) (interface{}, error) {
var req addendpoint.SumRequest
err := json.NewDecoder(r.Body).Decode(&req)
return req, err
}
func errorEncoder(_ context.Context, err error, w http.ResponseWriter) {
w.WriteHeader(err2code(err))
json.NewEncoder(w).Encode(errorWrapper{Error: err.Error()})
}
func err2code(err error) int {
switch err {
case addservice.ErrTwoZeroes, addservice.ErrMaxSizeExceeded, addservice.ErrIntOverflow:
return http.StatusBadRequest
}
return http.StatusInternalServerError
}
func EncodeHTTPGenericResponse(ctx context.Context, w http.ResponseWriter, response interface{}) error {
if f, ok := response.(endpoint.Failer); ok && f.Failed() != nil {
errorEncoder(ctx, f.Failed(), w)
return nil
}
w.Header().Set("Content-Type", "application/json; charset=utf-8")
return json.NewEncoder(w).Encode(response)
}
// 2. 基于gorilla/mux创建HTTP Handler,可以使用其他http server framework
func NewHTTPHandler(endpoints addendpoint.Set, logger log.Logger) http.Handler {
options := []httptransport.ServerOption{
httptransport.ServerErrorEncoder(errorEncoder),
httptransport.ServerErrorHandler(transport.NewLogErrorHandler(logger)),
}
r := mux.NewRouter()
r.Methods("POST").Path("/sum").Handler(httptransport.NewServer(
endpoints.SumEndpoint,
DecodeHTTPSumRequest,
EncodeHTTPGenericResponse,
options,
))
return r
}
下面在入口文件只需要初始化service、endpoint、http transport handler就可以运行起来
package addsvc
import (
"flag"
"net"
"net/http"
"github.com/go-kit/kit/log"
"github.com/oklog/oklog/pkg/group"
"github.com/pascallin/go-micro-services/pkg/addsvc/addendpoint"
"github.com/pascallin/go-micro-services/pkg/addsvc/addservice"
"github.com/pascallin/go-micro-services/pkg/addsvc/addtransport"
)
func main() {
// 1. 定义命令行参数
var (
httpAddr = flag.String("http-addr", ":8080", "HTTP listen address")
)
flag.Parse()
// 2. 定义一个全局用的logger
var logger log.Logger
{
logger = log.NewLogfmtLogger(os.Stderr)
logger = log.With(logger, "ts", log.DefaultTimestampUTC)
logger = log.With(logger, "caller", log.DefaultCaller)
}
// 3. 初始化service、endpoint、httpHandler
var (
service = addservice.New(logger, ints, chars)
endpoints = addendpoint.New(service)
httpHandler = addtransport.NewHTTPHandler(endpoints, logger)
)
// 4. 定义一个全局的server并绑定httpHadnler
var g group.Group
{
httpListener, err := net.Listen("tcp", *httpAddr)
if err != nil {
logger.Log("transport", "HTTP", "during", "Listen", "err", err)
os.Exit(1)
}
g.Add(func() error {
logger.Log("transport", "HTTP", "addr", *httpAddr)
return http.Serve(httpListener, httpHandler)
}, func(error) {
httpListener.Close()
})
}
// 5. 定义一个中断事件监听
{
// This function just sits and waits for ctrl-C.
cancelInterrupt := make(chan struct{})
g.Add(func() error {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
select {
case sig := <-c:
return fmt.Errorf("received signal %s", sig)
case <-cancelInterrupt:
return nil
}
}, func(error) {
close(cancelInterrupt)
})
}
// 6. 运行服务
logger.Log("exit", g.Run())
}
go-kit还有一些其他的功能,如中间件的使用、加入zipkin分布式日志跟踪和服务发现等,下次再加以说明
github repository:https://github.com/pascallin/go-micro-services