测试环境:centos7
官方给的下载方法(当前版本3.5.0):
脚本如下:
#!/bin/bash
ETCD_VER=v3.5.0
# choose either URL
GOOGLE_URL=https://storage.googleapis.com/etcd
GITHUB_URL=https://github.com/etcd-io/etcd/releases/download
DOWNLOAD_URL=${GOOGLE_URL}
rm -f /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz
rm -rf /tmp/etcd-download-test && mkdir -p /tmp/etcd-download-test
curl -L ${DOWNLOAD_URL}/${ETCD_VER}/etcd-${ETCD_VER}-linux-amd64.tar.gz -o /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz
tar xzvf /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz -C /tmp/etcd-download-test --strip-components=1
rm -f /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz
/tmp/etcd-download-test/etcd --version
/tmp/etcd-download-test/etcdctl version
/tmp/etcd-download-test/etcdutl version
安装完成之后文件解压到了/tmp/etcd-download-test
目录,可以移动到其他目录(tmp目录在机器关机重启后会被删除)
# 移动到/opt目录下
mv /tmp/etcd-download-test /opt/etcd
配置文件模板,etcd.conf.yml
内容如下:
# This is the configuration file for the etcd server.
# Human-readable name for this member.
name: 'default'
# Path to the data directory.
data-dir:
# Path to the dedicated wal directory.
wal-dir:
# Number of committed transactions to trigger a snapshot to disk.
snapshot-count: 10000
# Time (in milliseconds) of a heartbeat interval.
heartbeat-interval: 100
# Time (in milliseconds) for an election to timeout.
election-timeout: 1000
# Raise alarms when backend size exceeds the given quota. 0 means use the
# default quota.
quota-backend-bytes: 0
# List of comma separated URLs to listen on for peer traffic.
listen-peer-urls: http://localhost:2380
# List of comma separated URLs to listen on for client traffic.
listen-client-urls: http://localhost:2379
# Maximum number of snapshot files to retain (0 is unlimited).
max-snapshots: 5
# Maximum number of wal files to retain (0 is unlimited).
max-wals: 5
# Comma-separated white list of origins for CORS (cross-origin resource sharing).
cors:
# List of this member's peer URLs to advertise to the rest of the cluster.
# The URLs needed to be a comma-separated list.
initial-advertise-peer-urls: http://localhost:2380
# List of this member's client URLs to advertise to the public.
# The URLs needed to be a comma-separated list.
advertise-client-urls: http://localhost:2379
# Discovery URL used to bootstrap the cluster.
discovery:
# Valid values include 'exit', 'proxy'
discovery-fallback: 'proxy'
# HTTP proxy to use for traffic to discovery service.
discovery-proxy:
# DNS domain used to bootstrap initial cluster.
discovery-srv:
# Initial cluster configuration for bootstrapping.
initial-cluster:
# Initial cluster token for the etcd cluster during bootstrap.
initial-cluster-token: 'etcd-cluster'
# Initial cluster state ('new' or 'existing').
initial-cluster-state: 'new'
# Reject reconfiguration requests that would cause quorum loss.
strict-reconfig-check: false
# Enable runtime profiling data via HTTP server
enable-pprof: true
# Valid values include 'on', 'readonly', 'off'
proxy: 'off'
# Time (in milliseconds) an endpoint will be held in a failed state.
proxy-failure-wait: 5000
# Time (in milliseconds) of the endpoints refresh interval.
proxy-refresh-interval: 30000
# Time (in milliseconds) for a dial to timeout.
proxy-dial-timeout: 1000
# Time (in milliseconds) for a write to timeout.
proxy-write-timeout: 5000
# Time (in milliseconds) for a read to timeout.
proxy-read-timeout: 0
client-transport-security:
# Path to the client server TLS cert file.
cert-file:
# Path to the client server TLS key file.
key-file:
# Enable client cert authentication.
client-cert-auth: false
# Path to the client server TLS trusted CA cert file.
trusted-ca-file:
# Client TLS using generated certificates
auto-tls: false
peer-transport-security:
# Path to the peer server TLS cert file.
cert-file:
# Path to the peer server TLS key file.
key-file:
# Enable peer client cert authentication.
client-cert-auth: false
# Path to the peer server TLS trusted CA cert file.
trusted-ca-file:
# Peer TLS using generated certificates.
auto-tls: false
# The validity period of the self-signed certificate, the unit is year.
self-signed-cert-validity: 1
# Enable debug-level logging for etcd.
log-level: debug
logger: zap
# Specify 'stdout' or 'stderr' to skip journald logging even when running under systemd.
log-outputs: [stderr]
# Force to create a new one member cluster.
force-new-cluster: false
auto-compaction-mode: periodic
auto-compaction-retention: "1"
单节点测试可以修改下列项:
# 数据保存目录
data-dir
wal-dir
# 监听地址,可改为http://0.0.0.0:2379
listen-client-urls
# 在etcd命令所在目录执行启动命令
./etcd --config-file=./etcd_config.yaml
# 测试
./etcdctl put foo foo1 # --endpoints=0.0.0.0:2379
./etcdctl get foo
多节点测试需修改配置文件的如下项:
# 集群中的节点名称
name
# 数据保存目录
data-dir
wal-dir
# 监听地址,可改为http://0.0.0.0:2379
listen-client-urls
# 与其他节点通信的地址,可改为http://0.0.0.0:2380
listen-peer-urls
# 集群包含的所有节点,也可在后续通过客户端添加,格式为:<node_name1>=<node_url1>,<node_name2>=<node_url2>
initial-cluster
使用dockerfile构建,也可使用已有的etcd镜像,这里使用dockerfile构建
这里基于centos:7.9.2009来构建,dockerfile_etcd内容:
FROM centos:7.9.2009
ADD etcd /opt/etcd
WORKDIR /opt/etcd
CMD ["./etcd_start.sh"] #这个启动命令在将某一个节点移除时会出现该节点启动不了的问题
etcd_start.sh内容:
#!/bin/bash
CUR_DIR=`cd $(dirname $0) && pwd`
if [ -z $NODE_NAME ]; then
NODE_NAME='default'
fi
sed -i "s#name:.*#name: ${NODE_NAME}#g" ${CUR_DIR}/etcd_config.yaml
sed -i "s#initial-cluster:.*#initial-cluster: ${INITIAL_CLUSTER}#g" ${CUR_DIR}/etcd_config.yaml
${CUR_DIR}/etcd --config-file=${CUR_DIR}/etcd_config.yaml
使用docker-compose构建集群,etcd.yaml文件内容:
version: "3"
services:
etcd_n1:
container_name: etcd_n1
privileged: true
build:
context: .
dockerfile: dockerfile_etcd
image: etcd:v1.0
ports:
- 2379:2379
networks:
etcd_network:
ipv4_address: 172.20.0.2
environment:
NODE_NAME: n1
INITIAL_CLUSTER: "n1=http://172.20.0.2:2380,n2=http://172.20.0.3:2380,n3=http://172.20.0.4:2380"
etcd_n2:
container_name: etcd_n2
privileged: true
build:
context: .
dockerfile: dockerfile_etcd
image: etcd:v1.0
ports:
- 12379:2379
networks:
etcd_network:
ipv4_address: 172.20.0.3
environment:
NODE_NAME: n2
INITIAL_CLUSTER: "n1=http://172.20.0.2:2380,n2=http://172.20.0.3:2380,n3=http://172.20.0.4:2380"
etcd_n3:
container_name: etcd_n3
privileged: true
build:
context: .
dockerfile: dockerfile_etcd
image: etcd:v1.0
ports:
- 22379:2379
networks:
etcd_network:
ipv4_address: 172.20.0.4
environment:
NODE_NAME: n3
INITIAL_CLUSTER: "n1=http://172.20.0.2:2380,n2=http://172.20.0.3:2380,n3=http://172.20.0.4:2380"
networks:
etcd_network:
driver: bridge
ipam:
driver: default
config:
- subnet: 172.20.0.0/24
启动并查看节点情况:
# 后台启动
./docker-compose -f etcd.yaml up -d
# 选择一个节点查看运行状态和结果
docker exec -it etcd_n1 ./etcdctl member list -w table
docker exec -it etcd_n1 ./etcdctl endpoint status --cluster -w table
测试完成后删除容器和镜像
脚本rm_test_docker.sh内容
#!/bin/bash
container_ids=`docker ps -a | grep etcd | awk '{print $1}'`
images_ids=`docker images | grep etcd | awk '{print $3}' | uniq`
for i in ${container_ids[@]}
do
echo "rm container $i"
docker stop $i
docker rm $i
done
for j in ${images_ids[@]}
do
echo "rm image $j"
docker rmi $j -f
done
注: etcd中使用的bbolt和grpc存在版本冲突的问题
解决方法:
在go.mod中替换适配的版本
// 根据go mod tidy替换bbolt版本
replace github.com/coreos/bbolt => go.etcd.io/bbolt v1.3.6
// 替换grpc版本
replace google.golang.org/grpc => google.golang.org/grpc v1.26.0
package main
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"log"
"time"
)
func main() {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"0.0.0.0:2379"}})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
if _, err = cli.RoleAdd(context.TODO(), "root"); err != nil {
log.Fatal(err)
}
if _, err = cli.UserAdd(context.TODO(), "root", "123"); err != nil {
log.Fatal(err)
}
if _, err = cli.UserGrantRole(context.TODO(), "root", "root"); err != nil {
log.Fatal(err)
}
if _, err = cli.RoleAdd(context.TODO(), "r"); err != nil {
log.Fatal(err)
}
if _, err = cli.RoleGrantPermission(
context.TODO(),
"r", // role name
"foo", // key
"zoo", // range end
clientv3.PermissionType(clientv3.PermReadWrite),
); err != nil {
log.Fatal("RoleGrantPermission error: ", err)
}
if _, err = cli.UserAdd(context.TODO(), "u", "123"); err != nil {
log.Fatal("UserAdd u error: ", err)
}
if _, err = cli.UserGrantRole(context.TODO(), "u", "r"); err != nil {
log.Fatal("UserGrantRole u error: ", err)
}
if _, err = cli.AuthEnable(context.TODO()); err != nil {
log.Fatal(err)
}
cliAuth, err := clientv3.New(clientv3.Config{
Endpoints: []string{"0.0.0.0:2379"},
DialTimeout: time.Second * 5,
Username: "u",
Password: "123",
})
if err != nil {
log.Fatal("new error:", err)
}
defer cliAuth.Close()
if _, err = cliAuth.Put(context.TODO(), "foo1", "bar"); err != nil {
log.Fatal(err)
}
_, err = cliAuth.Txn(context.TODO()).
If(clientv3.Compare(clientv3.Value("zoo1"), ">", "abc")).
Then(clientv3.OpPut("zoo1", "XYZ")).
Else(clientv3.OpPut("zoo1", "ABC")).
Commit()
fmt.Println("txn error", err)
// now check the permission with the root account
rootCli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"0.0.0.0:2379"},
DialTimeout: time.Second * 5,
Username: "root",
Password: "123",
})
if err != nil {
log.Fatal(err)
}
defer rootCli.Close()
resp, err := rootCli.RoleGet(context.TODO(), "r")
if err != nil {
log.Fatal("RoleGet r error: ", err)
}
fmt.Printf("user u permission: key %q, range end %q\n", resp.Perm[0].Key, resp.Perm[0].RangeEnd)
if _, err = rootCli.AuthDisable(context.TODO()); err != nil {
log.Fatal("AuthDisable error: ", err)
}
_, err = cli.RoleDelete(context.TODO(), "r")
if err != nil {
log.Fatal("RoleDelete r error: ", err)
}
_, err = cli.RoleDelete(context.TODO(), "root")
if err != nil {
log.Fatal("RoleDelete root error: ", err)
}
_, err = cli.UserDelete(context.TODO(), "u")
if err != nil {
log.Fatal("UserDelete u error: ", err)
}
_, err = cli.UserDelete(context.TODO(), "root")
if err != nil {
log.Fatal("UserDelete root error: ", err)
}
}
package main
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"log"
)
func main() {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"172.20.20.55:2379"}})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
resp, err := cli.MemberList(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Println("members:", len(resp.Members))
for _, v := range resp.Members {
fmt.Println(v)
}
_, err = cli.MemberRemove(context.Background(), resp.Members[1].ID)
if err != nil {
log.Fatal(err)
}
//mresp, err := cli.MemberAddAsLearner(context.Background(), []string{"http://localhost:32381"})
//if err != nil {
// log.Fatal(err)
//}
mresp, err := cli.MemberAdd(context.Background(), resp.Members[1].PeerURLs)
if err != nil {
log.Fatal(err)
}
fmt.Println("added member.PeerURLs:", mresp.Member.PeerURLs)
fmt.Println("members count:", len(mresp.Members))
_, err = cli.MemberUpdate(context.Background(), resp.Members[1].ID, resp.Members[1].PeerURLs)
if err != nil {
log.Fatal(err)
}
}
package main
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"log"
"time"
)
func main() {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"172.20.20.55:2379"}})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
_, err = cli.Put(ctx, "", "sample_value")
cancel()
if err != nil {
switch err {
case context.Canceled:
fmt.Printf("ctx is canceled by another routine: %v\n", err)
case context.DeadlineExceeded:
fmt.Printf("ctx is attached with a deadline is exceeded: %v\n", err)
case rpctypes.ErrEmptyKey:
fmt.Printf("client-side error: %v\n", err)
default:
fmt.Printf("bad cluster endpoints, which are not etcd servers: %v\n", err)
}
}
ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
presp, err := cli.Put(ctx, "foo", "bar1")
cancel()
if err != nil {
log.Fatal(err)
}
ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
_, err = cli.Put(ctx, "foo", "bar2")
cancel()
if err != nil {
log.Fatal(err)
}
ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
resp, err := cli.Get(ctx, "foo")
cancel()
if err != nil {
log.Fatal(err)
}
for _, ev := range resp.Kvs {
fmt.Printf("%s : %s\n", ev.Key, ev.Value)
}
ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
resp, err = cli.Get(ctx, "foo", clientv3.WithRev(presp.Header.Revision))
cancel()
if err != nil {
log.Fatal(err)
}
for _, ev := range resp.Kvs {
fmt.Printf("%s : %s\n", ev.Key, ev.Value)
}
// sorted
for i := range make([]int, 3) {
ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
_, err = cli.Put(ctx, fmt.Sprintf("key_%d", i), "value")
cancel()
if err != nil {
log.Fatal(err)
}
}
ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
resp, err = cli.Get(ctx, "key", clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend))
cancel()
if err != nil {
log.Fatal(err)
}
for _, ev := range resp.Kvs {
fmt.Printf("%s : %s\n", ev.Key, ev.Value)
}
ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
// count keys about to be deleted
gresp, err := cli.Get(ctx, "key", clientv3.WithPrefix())
if err != nil {
log.Fatal(err)
}
// delete the keys
dresp, err := cli.Delete(ctx, "key", clientv3.WithPrefix())
if err != nil {
log.Fatal(err)
}
fmt.Println("gresp.Kvs len:", len(gresp.Kvs), ", Deleted all keys:", int64(len(gresp.Kvs)) == dresp.Deleted)
// compact 所有比此版本低的历史都将删除
ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
resp, err = cli.Get(ctx, "foo")
cancel()
if err != nil {
log.Fatal(err)
}
compRev := resp.Header.Revision // specify compact revision of your choice
ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
_, err = cli.Compact(ctx, compRev)
cancel()
if err != nil {
log.Fatal(err)
}
ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
resp, err = cli.Get(ctx, "foo", clientv3.WithRev(presp.Header.Revision))
cancel()
fmt.Println(err) // error occurred
// txn
kvc := clientv3.NewKV(cli)
_, err = kvc.Put(context.TODO(), "key", "xyz")
if err != nil {
log.Fatal(err)
}
ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
_, err = kvc.Txn(ctx).
// txn value comparisons are lexical
If(clientv3.Compare(clientv3.Value("key"), ">", "abc")).
// the "Then" runs, since "xyz" > "abc"
Then(clientv3.OpPut("key", "XYZ")).
// the "Else" does not run
Else(clientv3.OpPut("key", "ABC")).
Commit()
cancel()
if err != nil {
log.Fatal(err)
}
gresp, err = kvc.Get(context.TODO(), "key")
if err != nil {
log.Fatal(err)
}
for _, ev := range gresp.Kvs {
fmt.Printf("%s : %s\n", ev.Key, ev.Value)
}
// do
ops := []clientv3.Op{
clientv3.OpPut("put-key", "123"),
clientv3.OpGet("put-key"),
clientv3.OpPut("put-key", "456")}
for _, op := range ops {
if _, err := cli.Do(context.TODO(), op); err != nil {
log.Fatal(err)
}
}
ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
resp, err = cli.Get(ctx, "put-key")
cancel()
if err != nil {
log.Fatal(err)
}
for _, ev := range resp.Kvs {
fmt.Printf("%s : %s\n", ev.Key, ev.Value)
}
}
等待租约在指定时间后过期:
package main
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"log"
"time"
)
func main() {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"0.0.0.0:2379"}})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// minimum lease TTL is 5-second
resp, err := cli.Grant(context.TODO(), 5)
if err != nil {
log.Fatal(err)
}
// after 5 seconds, the key 'foo' will be removed
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID))
if err != nil {
log.Fatal(err)
}
gresp, err := cli.Get(context.TODO(), "foo")
if err != nil {
log.Fatal(err)
}
fmt.Println(gresp.Kvs)
time.Sleep(time.Second * 6)
gresp, err = cli.Get(context.TODO(), "foo")
if err != nil {
log.Fatal(err)
}
fmt.Println(gresp.Kvs)
}
主动撤销租约:
package main
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"log"
)
func main() {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"0.0.0.0:2379"}})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
resp, err := cli.Grant(context.TODO(), 5)
if err != nil {
log.Fatal(err)
}
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID))
if err != nil {
log.Fatal(err)
}
// revoking lease expires the key attached to its lease ID
_, err = cli.Revoke(context.TODO(), resp.ID)
if err != nil {
log.Fatal(err)
}
gresp, err := cli.Get(context.TODO(), "foo")
if err != nil {
log.Fatal(err)
}
fmt.Println("number of keys:", len(gresp.Kvs))
}
增加KeepAlive使租约不过期,也可使用KeepAliveOnce只更新一次:
package main
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"log"
"time"
)
func main() {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"0.0.0.0:2379"}})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
resp, err := cli.Grant(context.TODO(), 5)
if err != nil {
log.Fatal(err)
}
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID))
if err != nil {
log.Fatal(err)
}
// the key 'foo' will be kept forever
ch, kaerr := cli.KeepAlive(context.TODO(), resp.ID)
if kaerr != nil {
log.Fatal(kaerr)
}
ka := <-ch
if ka != nil {
fmt.Println("ttl:", ka.TTL)
} else {
fmt.Println("Unexpected NULL")
}
time.Sleep(time.Second * 6)
gresp, err := cli.Get(context.TODO(), "foo")
if err != nil {
log.Fatal(err)
}
fmt.Println(gresp.Kvs)
}
碎片整理:
package main
import (
"context"
"github.com/coreos/etcd/clientv3"
"log"
)
func main() {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"172.20.20.55:2379"}})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
_, err = cli.Status(context.Background(), "172.20.20.55:2379")
if err != nil {
log.Fatal(err)
}
if _, err = cli.Defragment(context.TODO(), "172.20.20.55:2379"); err != nil {
log.Fatal(err)
}
}
package main
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"github.com/prometheus/client_golang/prometheus/promhttp"
"io"
"log"
"net"
"net/http"
"strings"
)
func main() {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"172.20.20.55:2379"}})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// get a key so it shows up in the metrics as a range RPC
cli.Get(context.TODO(), "test_key")
// listen for all Prometheus metrics
ln, err := net.Listen("tcp", ":0")
if err != nil {
log.Fatal(err)
}
donec := make(chan struct{})
go func() {
defer close(donec)
http.Serve(ln, promhttp.Handler())
}()
defer func() {
ln.Close()
<-donec
}()
// make an http request to fetch all Prometheus metrics
url := "http://" + ln.Addr().String() + "/metrics"
resp, err := http.Get(url)
if err != nil {
log.Fatalf("fetch error: %v", err)
}
b, err := io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
log.Fatalf("fetch error: reading %s: %v", url, err)
}
fmt.Println("resp body", string(b))
// confirm range request in metrics
for _, l := range strings.Split(string(b), "\n") {
if strings.Contains(l, `grpc_client_started_total{grpc_method="Range"`) {
fmt.Println(l)
break
}
}
}
package main
import (
"context"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/pkg/transport"
"log"
"time"
)
func main() {
tlsInfo := transport.TLSInfo{
CertFile: "/tmp/test-certs/test-name-1.pem",
KeyFile: "/tmp/test-certs/test-name-1-key.pem",
TrustedCAFile: "/tmp/test-certs/trusted-ca.pem",
}
tlsConfig, err := tlsInfo.ClientConfig()
if err != nil {
log.Fatal(err)
}
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"172.20.20.55:2379"},
DialTimeout: time.Second*5,
TLS: tlsConfig,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close() // make sure to close the client
_, err = cli.Put(context.TODO(), "foo", "bar")
if err != nil {
log.Fatal(err)
}
}
示例1:
package main
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"log"
"strconv"
"time"
)
func main() {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"172.20.20.55:2379"}})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
go func() {
var i = 0
for {
time.Sleep(time.Second)
cli.Put(context.Background(), "foo", "bar"+strconv.Itoa(i))
i++
}
}()
rch := cli.Watch(context.Background(), "foo")
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}
示例2(WithPrefix):
package main
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"log"
"strconv"
"time"
)
func main() {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"172.20.20.55:2379"}})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
go func() {
var i = 0
for {
time.Sleep(time.Second)
cli.Put(context.Background(), "foo1", "bar"+strconv.Itoa(i))
i++
}
}()
rch := cli.Watch(context.Background(), "foo", clientv3.WithPrefix())
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}
示例3(WithRange):
package main
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"log"
"time"
)
func main() {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"172.20.20.55:2379"}})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
go func() {
time.Sleep(time.Second)
cli.Put(context.Background(), "foo1", "bar1")
cli.Put(context.Background(), "foo5", "bar5")
cli.Put(context.Background(), "foo2", "bar2")
cli.Put(context.Background(), "foo3", "bar3")
}()
rch := cli.Watch(context.Background(), "foo1", clientv3.WithRange("foo4"))
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}
示例4(WithProgessNotify):
package main
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"log"
"time"
)
func main() {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"172.20.20.55:2379"}})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
rch := cli.Watch(context.Background(), "foo", clientv3.WithProgressNotify())
closedch := make(chan bool)
go func() {
// This assumes that cluster is configured with frequent WatchProgressNotifyInterval
// e.g. WatchProgressNotifyInterval: 200 * time.Millisecond.
time.Sleep(time.Second)
err := cli.Close()
if err != nil {
log.Fatal(err)
}
close(closedch)
}()
wresp := <-rch
fmt.Println("wresp.IsProgressNotify:", wresp.IsProgressNotify())
<-closedch
}
package main
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"log"
"sync"
"time"
)
func main() {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"0.0.0.0:2379"}})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// create two separate sessions for election competition
s1, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s1.Close()
e1 := concurrency.NewElection(s1, "/test/election")
s2, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s2.Close()
e2 := concurrency.NewElection(s2, "/test/election")
// create competing candidates, with e1 initially losing to e2
var wg sync.WaitGroup
wg.Add(2)
electc := make(chan *concurrency.Election, 2)
go func() {
defer wg.Done()
// delay candidacy so e2 wins first
time.Sleep(3 * time.Second)
log.Println("e1.Campaign")
if err := e1.Campaign(context.Background(), "e1"); err != nil {
log.Fatal(err)
}
log.Println("e1.Campaign success")
electc <- e1
}()
go func() {
defer wg.Done()
log.Println("e2.Campaign")
if err := e2.Campaign(context.Background(), "e2"); err != nil {
log.Fatal(err)
}
log.Println("e2.Campaign success")
electc <- e2
}()
cctx, cancel := context.WithCancel(context.TODO())
defer cancel()
e := <-electc
fmt.Println("completed first election with", string((<-e.Observe(cctx)).Kvs[0].Value))
// resign so next candidate can be elected
if err := e.Resign(context.TODO()); err != nil {
log.Fatal(err)
}
e = <-electc
fmt.Println("completed second election with", string((<-e.Observe(cctx)).Kvs[0].Value))
wg.Wait()
}
注: 注意版本,高版本才支持TryLock,Lock函数会阻塞
package main
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"log"
)
func main() {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"172.20.20.55:2379"}})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
s1, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s1.Close()
m1 := concurrency.NewMutex(s1, "/test/my-lock")
s2, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s2.Close()
m2 := concurrency.NewMutex(s2, "/test/my-lock")
// 高版本示例
// acquire lock for s1
if err = m1.Lock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("acquired lock for s1")
if err = m2.Lock(context.TODO()); err == nil {
log.Fatal("should not acquire lock")
}
if err == concurrency.ErrLocked {
fmt.Println("cannot acquire lock for s2, as already locked in another session")
}
if err = m1.Unlock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("released lock for s1")
if err = m2.TryLock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("acquired lock for s2")
// 低版本示例
// acquire lock for s1
if err := m1.Lock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("acquired lock for s1")
m2Locked := make(chan struct{})
go func() {
defer close(m2Locked)
// wait until s1 is locks /my-lock/
if err := m2.Lock(context.TODO()); err != nil {
log.Fatal(err)
}
}()
if err := m1.Unlock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("released lock for s1")
<-m2Locked
fmt.Println("acquired lock for s2")
}
示例:
package main
import (
"context"
"github.com/coreos/etcd/clientv3"
"log"
"time"
)
func main() {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"172.20.20.57:2379"}, DialTimeout: 10 * time.Second})
if err != nil {
log.Fatal("clientv3.New", err)
}
defer cli.Close()
ctx := context.Background()
_, err = cli.Delete(ctx, "test")
if err != nil {
log.Fatal(err)
}
// 事务,key不存在,则设置初始值
r, err := cli.Txn(ctx).If(clientv3.Compare(clientv3.CreateRevision("test"), "=", 0)).Then(clientv3.OpPut("test", "abc")).Commit()
if err != nil {
log.Fatal(err)
}
log.Println(err, r)
if !r.Succeeded {
log.Println(r.Responses)
}
// 事务,比较后修改
cmp := clientv3.Compare(clientv3.Value("test"), "=", "123")
opPut := clientv3.OpPut("test", "abc")
opGet := clientv3.OpGet("test")
resp, err := cli.Txn(ctx).If(cmp).Then(opPut).Else(opGet).Commit()
if err != nil {
log.Fatal(err)
}
// 如果事务执行中比较失败,则获取值
if !resp.Succeeded {
log.Println("resp:", resp.Responses[0].GetResponseRange().Kvs[0])
}
}