package etcd
client.go
package etcd
import (
"context"
"time"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
)
type Client struct {
Servers []string
Timeout int64
close func() error
}
func (client *Client) Register(path string, data []byte) error {
conn, err := clientv3.New(clientv3.Config{
Endpoints: client.Servers,
DialTimeout: 5 * time.Second,
})
if err != nil {
return err
}
ctx := context.Background()
lease, err := conn.Grant(ctx, client.Timeout)
if err != nil {
return err
}
_, err = conn.Put(ctx, path, string(data), clientv3.WithLease(lease.ID))
if err != nil {
return err
}
keepAliveCh, err := conn.KeepAlive(context.Background(), lease.ID)
if err != nil {
return err
}
go func() {
for {
<-keepAliveCh
}
}()
client.close = func() error {
if _, err = conn.Revoke(ctx, lease.ID); err != nil {
return err
}
return conn.Close()
}
return nil
}
func (client *Client) Deregister() error {
return client.close()
}
func (client *Client) WatchNode(path string,
putHandler func(string, []byte) error,
deleteHandler func(string) error) error {
conn, err := clientv3.New(clientv3.Config{
Endpoints: client.Servers,
DialTimeout: 5 * time.Second,
})
if err != nil {
return err
}
ctx := context.Background()
resp, err := conn.Get(ctx, path, clientv3.WithPrefix())
if err != nil {
return err
}
for _, kv := range resp.Kvs {
if err := putHandler(string(kv.Key), kv.Value); err != nil {
return err
}
}
watchChan := conn.Watch(ctx, path, clientv3.WithPrefix())
for watchResp := range watchChan {
for _, ev := range watchResp.Events {
switch ev.Type {
case mvccpb.PUT:
if err := putHandler(string(ev.Kv.Key), ev.Kv.Value); err != nil {
return err
}
case mvccpb.DELETE:
if err := deleteHandler(string(ev.Kv.Key)); err != nil {
return err
}
}
}
}
return nil
}
node.go
package etcd
import "log"
type Node struct {
Path string
Host string
Port int
}
func (node *Node) putCallback(path string, data []byte) error {
log.Printf("[Callback] Node %s is updated", path)
// To do
return nil
}
func (node *Node) deleteCallback(path string) error {
log.Printf("[Callback] Node %s is deleted", path)
// To do
return nil
}
func (node *Node) getWatchNodes() []string {
nodes := []string{}
// Test code begins
nodes = append(nodes, "/services")
// Test code ends
return nodes
}
etcd.go
package etcd
import (
"fmt"
"log"
"time"
)
func watchNode(client *Client, node *Node, path string) {
go func() {
for {
err := client.WatchNode(path, node.putCallback, node.deleteCallback)
if err != nil {
log.Println(err)
}
time.Sleep(time.Second)
}
}()
}
func registerNode(client *Client, node *Node) {
err := client.Register(node.Path, []byte(fmt.Sprintf("%s:%d", node.Host, node.Port)))
if err != nil {
log.Println(err)
}
}
func Init(client *Client, node *Node) {
registerNode(client, node)
for _, path := range node.getWatchNodes() {
watchNode(client, node, path)
}
}
package main
main.go
package main
import (
"log"
"os"
"strconv"
"strings"
"time"
"./etcd"
)
func main() {
if len(os.Args) < 4 {
return
}
log.Println(os.Args)
path := "/services/" + os.Args[1]
host := os.Args[2]
port, err := strconv.Atoi(os.Args[3])
if err != nil {
log.Fatalln(err)
}
etcdHosts := "0.0.0.0:2379"
etcd.Init(&etcd.Client{Servers: strings.Split(etcdHosts, ","), Timeout: 5}, &etcd.Node{Path: path, Host: host, Port: port})
time.Sleep(time.Second * 100)
}