使用python的etcd3库去监听etcd时遇到下面问题:
仍借用etcd3库,自己实现watch
import threading
import grpc
from six.moves import queue
from urllib3.util import url
from etcd3.watch import WatchResponse, _new_request_iter
from etcd3 import etcdrpc, utils, events, client
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_result
def is_false(value):
return value is False
def return_last_value(retry_state):
return retry_state.outcome.result()
class Watch(object):
def __init__(self, etcd_url, timeout=None):
self.etcd_url = url.parse_url(etcd_url)
self.timeout = timeout
self.etcd_client = client(self.host, self.port, timeout=timeout)
self.leader_url = None
self.channel = None
self._request_queue = queue.Queue(maxsize=10)
self._response_queue = queue.Queue()
self._watch_id = None
self._callbacks = {} # {"watch_id", callback}
@property
def host(self):
return self.etcd_url.hostname
@property
def port(self):
return self.etcd_url.port
@property
def _default_callback(self):
def callback(response):
self._response_queue.put(response)
return callback
def _wait_etcd_ok(self, wait=float(1), retry_times=3):
# 默认重试3次,每次间隔1s
@retry(stop=stop_after_attempt(retry_times), wait=wait_fixed(wait),
retry=retry_if_result(is_false),
retry_error_callback=return_last_value)
def check():
try:
return self.etcd_client.status()
except Exception as e:
return e
return check()
def get_etcd_leader(self):
nodes = 0
s = self._wait_etcd_ok(wait=0.1)
if isinstance(s, Exception):
raise s
for _ in self.etcd_client.members:
nodes += 1
if nodes > 1:
self.leader_url = url.parse_url(s.leader.client_urls[0])
else:
self.leader_url = self.etcd_url
return self.leader_url
def cancel_watch(self):
cancel = etcdrpc.WatchCancelRequest()
cancel.watch_id = self._watch_id
rq = etcdrpc.WatchRequest(cancel_request=cancel)
self._request_queue.put(rq)
def _pre_watch(self, key, **kwargs):
self.get_etcd_leader() # 此方式可避免vip漂移时无法监听的问题
self.channel = grpc.insecure_channel(self.leader_url.netloc)
watch_stub = etcdrpc.WatchStub(self.channel)
create_watch = etcdrpc.WatchCreateRequest()
create_watch.key = utils.to_bytes(key)
if kwargs.get('range_end') is not None:
create_watch.range_end = utils.to_bytes(kwargs.get('range_end'))
if kwargs.get('start_revision') is not None:
create_watch.start_revision = kwargs.get('start_revision')
if kwargs.get('progress_notify'):
create_watch.progress_notify = kwargs.get('progress_notify')
if kwargs.get('filters') is not None:
create_watch.filters = kwargs.get('filters')
if kwargs.get('prev_kv'):
create_watch.prev_kv = kwargs.get('prev_kv')
return watch_stub, create_watch
def _handle_response(self, rs, callback):
if rs.created:
self._callbacks[rs.watch_id] = callback
if rs.events or not (rs.created or rs.canceled):
new_events = [events.new_event(event) for event in rs.events]
response = WatchResponse(rs.header, new_events)
callback(response)
def _do_watch(self, key, callback, **kwargs):
if not callback:
callback = self._default_callback
while True:
try:
watch_stub, create_watch = self._pre_watch(key, **kwargs)
rq = etcdrpc.WatchRequest(create_request=create_watch)
self._request_queue.put(rq)
response_iter = watch_stub.Watch(_new_request_iter(self._request_queue))
for rs in response_iter:
if rs.created:
self._watch_id = rs.watch_id
if rs.canceled:
return
self._handle_response(rs, callback)
except grpc.RpcError as err:
print("grpc error: ", err.code())
self._request_queue.put(None)
# print("retry etcd status: ", self._wait_etcd_ok(wait=0.5))
self._request_queue = queue.Queue(maxsize=10)
except Exception as e:
print("watch error: ", e)
def watch(self, key):
t = threading.Thread(target=self._do_watch, args=(key, None))
t.start()
def iterator():
while True:
response = self._response_queue.get()
for event in response.events:
yield event
def cancel():
self.cancel_watch()
return iterator(), cancel
def watch_with_callback(self, key, callback):
t = threading.Thread(target=self._do_watch, args=(key, callback))
t.start()
def watch_prefix_with_callback(self, prefix, callback):
kwargs = {'range_end': utils.increment_last_byte(utils.to_bytes(prefix))}
t = threading.Thread(target=self._do_watch, args=(prefix, callback), kwargs=kwargs)
t.start()
return t
def test_callback(events):
for event in events.events:
print("-----callback---", event.value.decode())
w = Watch("etcd://172.19.206.164:2379", timeout=5)
# iterator, c = w.watch("/test")
# for i in iterator:
# print("-------", i)
w.watch_prefix_with_callback("/test", test_callback)
print("watching...")
以上代码仅供参考,虽不会出现上面的问题,但仍需持续完善功能: