当前位置: 首页 > 工具软件 > Keywatch > 使用案例 >

python3 watch etcd

云利
2023-12-01

问题描述

使用python的etcd3库去监听etcd时遇到下面问题:

  1. etcd重启时,代码抛出异常且etcd重启恢复后无法继续监听
  2. 当使用vip时,如果vip发生漂移,则也无法继续监听

解决

仍借用etcd3库,自己实现watch

import threading
import grpc
from six.moves import queue
from urllib3.util import url

from etcd3.watch import WatchResponse, _new_request_iter
from etcd3 import etcdrpc, utils, events, client
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_result


def is_false(value):
    return value is False


def return_last_value(retry_state):
    return retry_state.outcome.result()


class Watch(object):
    def __init__(self, etcd_url, timeout=None):
        self.etcd_url = url.parse_url(etcd_url)
        self.timeout = timeout
        self.etcd_client = client(self.host, self.port, timeout=timeout)
        self.leader_url = None
        self.channel = None
        self._request_queue = queue.Queue(maxsize=10)
        self._response_queue = queue.Queue()
        self._watch_id = None
        self._callbacks = {}  # {"watch_id", callback}

    @property
    def host(self):
        return self.etcd_url.hostname

    @property
    def port(self):
        return self.etcd_url.port

    @property
    def _default_callback(self):
        def callback(response):
            self._response_queue.put(response)
        return callback

    def _wait_etcd_ok(self, wait=float(1), retry_times=3):
        # 默认重试3次,每次间隔1s
        @retry(stop=stop_after_attempt(retry_times), wait=wait_fixed(wait),
               retry=retry_if_result(is_false),
               retry_error_callback=return_last_value)
        def check():
            try:
                return self.etcd_client.status()
            except Exception as e:
                return e
        return check()

    def get_etcd_leader(self):
        nodes = 0
        s = self._wait_etcd_ok(wait=0.1)
        if isinstance(s, Exception):
            raise s
        for _ in self.etcd_client.members:
            nodes += 1
        if nodes > 1:
            self.leader_url = url.parse_url(s.leader.client_urls[0])
        else:
            self.leader_url = self.etcd_url
        return self.leader_url

    def cancel_watch(self):
        cancel = etcdrpc.WatchCancelRequest()
        cancel.watch_id = self._watch_id
        rq = etcdrpc.WatchRequest(cancel_request=cancel)
        self._request_queue.put(rq)

    def _pre_watch(self, key, **kwargs):
        self.get_etcd_leader()  # 此方式可避免vip漂移时无法监听的问题
        self.channel = grpc.insecure_channel(self.leader_url.netloc)
        watch_stub = etcdrpc.WatchStub(self.channel)
        create_watch = etcdrpc.WatchCreateRequest()
        create_watch.key = utils.to_bytes(key)
        if kwargs.get('range_end') is not None:
            create_watch.range_end = utils.to_bytes(kwargs.get('range_end'))
        if kwargs.get('start_revision') is not None:
            create_watch.start_revision = kwargs.get('start_revision')
        if kwargs.get('progress_notify'):
            create_watch.progress_notify = kwargs.get('progress_notify')
        if kwargs.get('filters') is not None:
            create_watch.filters = kwargs.get('filters')
        if kwargs.get('prev_kv'):
            create_watch.prev_kv = kwargs.get('prev_kv')
        return watch_stub, create_watch

    def _handle_response(self, rs, callback):
        if rs.created:
            self._callbacks[rs.watch_id] = callback
        if rs.events or not (rs.created or rs.canceled):
            new_events = [events.new_event(event) for event in rs.events]
            response = WatchResponse(rs.header, new_events)
            callback(response)

    def _do_watch(self, key, callback, **kwargs):
        if not callback:
            callback = self._default_callback
        while True:
            try:
                watch_stub, create_watch = self._pre_watch(key, **kwargs)
                rq = etcdrpc.WatchRequest(create_request=create_watch)
                self._request_queue.put(rq)
                response_iter = watch_stub.Watch(_new_request_iter(self._request_queue))
                for rs in response_iter:
                    if rs.created:
                        self._watch_id = rs.watch_id
                    if rs.canceled:
                        return
                    self._handle_response(rs, callback)

            except grpc.RpcError as err:
                print("grpc error: ", err.code())
                self._request_queue.put(None)
                # print("retry etcd status: ", self._wait_etcd_ok(wait=0.5))
                self._request_queue = queue.Queue(maxsize=10)
            except Exception as e:
                print("watch error: ", e)

    def watch(self, key):
        t = threading.Thread(target=self._do_watch, args=(key, None))
        t.start()

        def iterator():
            while True:
                response = self._response_queue.get()
                for event in response.events:
                    yield event

        def cancel():
            self.cancel_watch()
        return iterator(), cancel

    def watch_with_callback(self, key, callback):
        t = threading.Thread(target=self._do_watch, args=(key, callback))
        t.start()

    def watch_prefix_with_callback(self, prefix, callback):
        kwargs = {'range_end': utils.increment_last_byte(utils.to_bytes(prefix))}
        t = threading.Thread(target=self._do_watch, args=(prefix, callback), kwargs=kwargs)
        t.start()
        return t


def test_callback(events):
    for event in events.events:
        print("-----callback---", event.value.decode())


w = Watch("etcd://172.19.206.164:2379", timeout=5)
# iterator, c = w.watch("/test")
# for i in iterator:
#     print("-------", i)
w.watch_prefix_with_callback("/test", test_callback)
print("watching...")

以上代码仅供参考,虽不会出现上面的问题,但仍需持续完善功能:

  1. 当前Watch实例化后只能起一个监听服务,不能起多个,起多个需要实例化多个Watch
  2. 优雅的cancel
 类似资料:

相关阅读

相关文章

相关问答