Go语言实现Etcd服务发现(Etcd & Service Discovery & Go)

胡俊美
2023-12-01

package etcd

client.go

package etcd

import (
	"context"
	"time"

	"go.etcd.io/etcd/api/v3/mvccpb"
	clientv3 "go.etcd.io/etcd/client/v3"
)

type Client struct {
	Servers []string
	Timeout int64
	close   func() error
}

func (client *Client) Register(path string, data []byte) error {
	conn, err := clientv3.New(clientv3.Config{
		Endpoints:   client.Servers,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		return err
	}

	ctx := context.Background()

	lease, err := conn.Grant(ctx, client.Timeout)
	if err != nil {
		return err
	}

	_, err = conn.Put(ctx, path, string(data), clientv3.WithLease(lease.ID))
	if err != nil {
		return err
	}

	keepAliveCh, err := conn.KeepAlive(context.Background(), lease.ID)
	if err != nil {
		return err
	}

	go func() {
		for {
			<-keepAliveCh
		}
	}()

	client.close = func() error {
		if _, err = conn.Revoke(ctx, lease.ID); err != nil {
			return err
		}
		return conn.Close()
	}

	return nil
}

func (client *Client) Deregister() error {
	return client.close()
}

func (client *Client) WatchNode(path string,
	putHandler func(string, []byte) error,
	deleteHandler func(string) error) error {
	conn, err := clientv3.New(clientv3.Config{
		Endpoints:   client.Servers,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		return err
	}

	ctx := context.Background()
	resp, err := conn.Get(ctx, path, clientv3.WithPrefix())
	if err != nil {
		return err
	}

	for _, kv := range resp.Kvs {
		if err := putHandler(string(kv.Key), kv.Value); err != nil {
			return err
		}
	}

	watchChan := conn.Watch(ctx, path, clientv3.WithPrefix())
	for watchResp := range watchChan {
		for _, ev := range watchResp.Events {
			switch ev.Type {
			case mvccpb.PUT:
				if err := putHandler(string(ev.Kv.Key), ev.Kv.Value); err != nil {
					return err
				}
			case mvccpb.DELETE:
				if err := deleteHandler(string(ev.Kv.Key)); err != nil {
					return err
				}
			}
		}
	}

	return nil
}


node.go

package etcd

import "log"

type Node struct {
	Path string
	Host string
	Port int
}

func (node *Node) putCallback(path string, data []byte) error {
	log.Printf("[Callback] Node %s is updated", path)
	// To do
	return nil
}

func (node *Node) deleteCallback(path string) error {
	log.Printf("[Callback] Node %s is deleted", path)
	// To do
	return nil
}

func (node *Node) getWatchNodes() []string {
	nodes := []string{}
	// Test code begins
	nodes = append(nodes, "/services")
	// Test code ends
	return nodes
}

etcd.go

package etcd

import (
	"fmt"
	"log"
	"time"
)

func watchNode(client *Client, node *Node, path string) {
	go func() {
		for {
			err := client.WatchNode(path, node.putCallback, node.deleteCallback)
			if err != nil {
				log.Println(err)
			}
			time.Sleep(time.Second)
		}
	}()
}

func registerNode(client *Client, node *Node) {
	err := client.Register(node.Path, []byte(fmt.Sprintf("%s:%d", node.Host, node.Port)))
	if err != nil {
		log.Println(err)
	}
}

func Init(client *Client, node *Node) {
	registerNode(client, node)
	for _, path := range node.getWatchNodes() {
		watchNode(client, node, path)
	}
}


package main

main.go

package main

import (
	"log"
	"os"
	"strconv"
	"strings"
	"time"

	"./etcd"
)

func main() {
	if len(os.Args) < 4 {
		return
	}

	log.Println(os.Args)

	path := "/services/" + os.Args[1]
	host := os.Args[2]
	port, err := strconv.Atoi(os.Args[3])
	if err != nil {
		log.Fatalln(err)
	}

	etcdHosts := "0.0.0.0:2379"

	etcd.Init(&etcd.Client{Servers: strings.Split(etcdHosts, ","), Timeout: 5}, &etcd.Node{Path: path, Host: host, Port: port})
	time.Sleep(time.Second * 100)
}
 类似资料: