kazoo-2.6.1
上文start概述中,只是简单的概述了kazoo客户端初始化之后,调用了start方法,本文继续详细的了解相关的细节。
本文主要是分析一下基本流程与启动的架构实现,示例代码如下;
from kazoo.client import KazooClient
zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()
# Determine if a node exists
if zk.exists("/my/favorite"):
# Do something
# Print the version of a node and its data
data, stat = zk.get("/my/favorite")
zk.stop()
示例代码还是如上所示。
通过上文分析,可知在线程中运行的是self.zk_loop函数,
def zk_loop(self):
"""Main Zookeeper handling loop"""
self.logger.log(BLATHER, 'ZK loop started')
self.connection_stopped.clear() # 清除停止连接
retry = self.retry_sleeper.copy() # 重试类拷贝
try:
while not self.client._stopped.is_set(): # 如果没有被设置为停止
# If the connect_loop returns STOP_CONNECTING, stop retrying
if retry(self._connect_loop, retry) is STOP_CONNECTING: # 循环执行self._connect_loop
break
except RetryFailedError:
self.logger.warning("Failed connecting to Zookeeper "
"within the connection retry policy.")
finally:
self.connection_stopped.set() # 如果退出或者异常则清除相关数据并关闭会话
self.client._session_callback(KeeperState.CLOSED)
self.logger.log(BLATHER, 'Connection stopped')
可知运行的时候是通过包裹了retry来进行启动运行的,首先来分析一下该类,默认情况下是KazooRetry类实例。
class KazooRetry(object):
"""Helper for retrying a method in the face of retry-able
exceptions"""
RETRY_EXCEPTIONS = (
ConnectionLoss,
OperationTimeoutError,
ForceRetryError
)
EXPIRED_EXCEPTIONS = (
SessionExpiredError,
)
def __init__(self, max_tries=1, delay=0.1, backoff=2, max_jitter=None,
max_delay=60, ignore_expire=True, sleep_func=time.sleep,
deadline=None, interrupt=None):
"""Create a :class:`KazooRetry` instance for retrying function
calls with uniform jitter
:param max_tries: How many times to retry the command. -1 means
infinite tries.
:param delay: Initial delay between retry attempts.
:param backoff: Backoff multiplier between retry attempts.
Defaults to 2 for exponential backoff.
:param max_jitter: *Deprecated* Jitter is now uniformly distributed
across retries.
:param max_delay: Maximum delay in seconds, regardless of other
backoff settings. Defaults to one minute.
:param ignore_expire:
Whether a session expiration should be ignored and treated
as a retry-able command.
:param interrupt:
Function that will be called with no args that may return
True if the retry should be ceased immediately. This will
be called no more than every 0.1 seconds during a wait
between retries.
"""
if max_jitter is not None:
warnings.warn(
'Passing max_jitter to retry configuration is deprecated.'
' Retry jitter is now automacallity uniform across retries.'
' The parameter will be ignored.',
DeprecationWarning, stacklevel=2)
self.max_tries = max_tries # 最大重试次数
self.delay = delay # 重试延时时间
self.backoff = backoff
self.max_delay = float(max_delay)
self._attempts = 0 # 尝试次数
self._cur_delay = delay # 当前延时
self.deadline = deadline # 停止时间
self._cur_stoptime = None
self.sleep_func = sleep_func # 休眠函数
self.retry_exceptions = self.RETRY_EX如果连接Los或者操作超时等错误CEPTIONS
self.interrupt = interrupt
if ignore_expire:
self.retry_exceptions += self.EXPIRED_EXCEPTIONS
def reset(self):
"""Reset the attempt counter"""
self._attempts = 0 # 重置为0
self._cur_delay = self.delay # 重置延迟
self._cur_stoptime = None # 置空停止时间
def copy(self):
"""Return a clone of this retry manager"""
obj = KazooRetry(max_tries=self.max_tries,
delay=self.delay,
backoff=self.backoff,
max_delay=self.max_delay,
sleep_func=self.sleep_func,
deadline=self.deadline,
interrupt=self.interrupt)
obj.retry_exceptions = self.retry_exceptions
return obj # 重新生成一个已有参数的该类
def __call__(self, func, *args, **kwargs):
"""Call a function with arguments until it completes without
throwing a Kazoo exception
:param func: Function to call
:param args: Positional arguments to call the function with
:params kwargs: Keyword arguments to call the function with
The function will be called until it doesn't throw one of the
retryable exceptions (ConnectionLoss, OperationTimeout, or
ForceRetryError), and optionally retrying on session
expiration.
"""
self.reset() # 首先重置
while True: # 循环
try:
if self.deadline is not None and self._cur_stoptime is None: # 如果截止时间与当前停止时间都为空
self._cur_stoptime = time.time() + self.deadline # 设置当前需要停止的时间
return func(*args, **kwargs) # 执行函数
except ConnectionClosedError: # 如果连接出错直接报错
raise
except self.retry_exceptions: # 如果连接Los或者操作超时等错误
# Note: max_tries == -1 means infinite tries.
if self._attempts == self.max_tries: # 检查当前重试次数是否与最大尝试次数一致
raise RetryFailedError("Too many retry attempts") # 如果与最大尝试次数一致则报错
self._attempts += 1
sleeptime = random.randint(0, 1 + int(self._cur_delay)) # 随机休眠时间
if self._cur_stoptime is not None and \
time.time() + sleeptime >= self._cur_stoptime:
raise RetryFailedError("Exceeded retry deadline") # 如果当前停止时间不为空并且当前待休眠的时间超过需要停止的时间则报错
if self.interrupt: # 如果有中断
while sleeptime > 0: # 当前休眠时间大于0
# Break the time period down and sleep for no
# longer than 0.1 before calling the interrupt
if sleeptime < 0.1: # 如果休眠时间小于0.1
self.sleep_func(sleeptime) # 休息该时间并减掉该时间
sleeptime -= sleeptime
else:
self.sleep_func(0.1) # 休息0.1
sleeptime -= 0.1
if self.interrupt(): # 调用中断回调函数如果为真则报错
raise InterruptedError()
else:
self.sleep_func(sleeptime) # 如果没有则直接休眠
self._cur_delay = min(self._cur_delay * self.backoff,
self.max_delay) # 获取最小的延迟时间
通过调用retry的call方法,将执行传入的func并将该func加入了重试机制,并制定了休眠时间等额外扩展功能。
while not close_connection:
# Watch for something to read or send
jitter_time = random.randint(0, 40) / 100.0
# Ensure our timeout is positive
timeout = max([read_timeout / 2.0 - jitter_time,
jitter_time])
s = self.handler.select([self._socket, self._read_sock],
[], [], timeout)[0]
if not s:
if self.ping_outstanding.is_set():
self.ping_outstanding.clear()
raise ConnectionDropped(
"outstanding heartbeat ping not received")
self._send_ping(connect_timeout)
elif s[0] == self._socket:
response = self._read_socket(read_timeout)
close_connection = response == CLOSE_RESPONSE
else:
self._send_request(read_timeout, connect_timeout)
首先讲解的就是self.handler.select函数
在self._connect_attempt函数中,调用了handler.select方法来进行对本地连接的socket进行可读可写的检查,在默认启动的过程中,handle默认初始化的是SequentialThreadingHandler,查看该类的select方法;
def select(self, *args, **kwargs):
# if we have epoll, and select is not expected to work
# use an epoll-based "select". Otherwise don't touch
# anything to minimize changes
if _HAS_EPOLL: # 检查本机是否支持epoll
# if the highest fd we've seen is > 1023
if max(map(_to_fileno, chain(*args[:3]))) > 1023:
return self._epoll_select(*args, **kwargs) # 调用epoll事件驱动模型
return self._select(*args, **kwargs) # 调用select事件驱动模型
def _select(self, *args, **kwargs):
timeout = kwargs.pop('timeout', None) # 获取超时时间
# either the time to give up, or None
end = (time.time() + timeout) if timeout else None # 计算得出截止时间
while end is None or time.time() < end: # 如果超时时间没有或者当前时间小于终止时间
if end is not None: # 如果终止时间不为空则设置过期时间
# make a list, since tuples aren't mutable
args = list(args)
# set the timeout to the remaining time
args[3] = end - time.time()
try:
return select.select(*args, **kwargs) # 调用select函数
except select.error as ex:
# if the system call was interrupted, we'll retry until timeout
# in Python 3, system call interruptions are a native exception
# in Python 2, they are not
errnum = ex.errno if isinstance(ex, OSError) else ex[0]
if errnum == errno.EINTR:
continue
raise
# if we hit our timeout, lets return as a timeout
return ([], [], [])
def _epoll_select(self, rlist, wlist, xlist, timeout=None):
"""epoll-based drop-in replacement for select to overcome select
limitation on a maximum filehandle value
"""
if timeout is None: # 如果超时时间未空
timeout = -1 # 设置为-1
eventmasks = defaultdict(int)
rfd2obj = defaultdict(list)
wfd2obj = defaultdict(list)
xfd2obj = defaultdict(list) # 获取读写列表
read_evmask = select.EPOLLIN | select.EPOLLPRI # Just in case # 设置读信号
def store_evmasks(obj_list, evmask, fd2obj):
for obj in obj_list:
fileno = _to_fileno(obj)
eventmasks[fileno] |= evmask
fd2obj[fileno].append(obj)
store_evmasks(rlist, read_evmask, rfd2obj) # 添加需要监听的信号
store_evmasks(wlist, select.EPOLLOUT, wfd2obj)
store_evmasks(xlist, select.EPOLLERR, xfd2obj)
poller = select.epoll() # 获取epoll
for fileno in eventmasks:
poller.register(fileno, eventmasks[fileno]) # 注册相关的文件描述符
try:
events = poller.poll(timeout) # 检查是否有事件发生
revents = []
wevents = []
xevents = []
for fileno, event in events: # 遍历事件
if event & read_evmask:
revents += rfd2obj.get(fileno, []) # 如果有读事件则添加到读事件中
if event & select.EPOLLOUT:
wevents += wfd2obj.get(fileno, []) # 如果有写事件则添加到写事件中
if event & select.EPOLLERR:
xevents += xfd2obj.get(fileno, []) # 如果有出错事件则添加到出错事件中
finally:
poller.close() # 关闭
return revents, wevents, xevents # 返回处理事件列表
从该类的处理流程可知,依靠IO复用来提供与服务端的交互。
def _read_socket(self, read_timeout):
"""Called when there's something to read on the socket"""
client = self.client
header, buffer, offset = self._read_header(read_timeout) # 读取头部信息
if header.xid == PING_XID: # 如果读取的是ping心跳
self.logger.log(BLATHER, 'Received Ping')
self.ping_outstanding.clear()
elif header.xid == AUTH_XID: # 如果是认证信息
self.logger.log(BLATHER, 'Received AUTH')
request, async_object, xid = client._pending.popleft()
if header.err:
async_object.set_exception(AuthFailedError())
client._session_callback(KeeperState.AUTH_FAILED)
else:
async_object.set(True)
elif header.xid == WATCH_XID: # 如果是观察器
self._read_watch_event(buffer, offset)
elif self.sasl_cli and not self.sasl_cli.complete:
# SASL authentication is not yet finished, this can only
# be a SASL packet
self.logger.log(BLATHER, 'Received SASL')
try:
challenge, _ = SASL.deserialize(buffer, offset)
except Exception:
raise ConnectionDropped('error while SASL authentication.')
response = self.sasl_cli.process(challenge)
if response:
# authentication not yet finished, answering the challenge
self._send_sasl_request(challenge=response,
timeout=client._session_timeout)
else:
# authentication is ok, state is CONNECTED or CONNECTED_RO
# remove sensible information from the object
self._set_connected_ro_or_rw(client)
self.sasl_cli.dispose()
else:
self.logger.log(BLATHER, 'Reading for header %r', header)
return self._read_response(header, buffer, offset) # 如果都不是则返回读响应请求体
def _read(self, length, timeout):
msgparts = []
remaining = length
with self._socket_error_handling():
while remaining > 0: # 如果还需要继续读
# Because of SSL framing, a select may not return when using
# an SSL socket because the underlying physical socket may not
# have anything to select, but the wrapped object may still
# have something to read as it has previously gotten enough
# data from the underlying socket.
if (hasattr(self._socket, "pending")
and self._socket.pending() > 0): # 如果有pending属性并且pending不为空则什么都不执行
pass
else:
s = self.handler.select([self._socket], [], [], timeout)[0] # 检查socket是否有读有读事件
if not s: # pragma: nocover
# If the read list is empty, we got a timeout. We don't
# have to check wlist and xlist as we don't set any
raise self.handler.timeout_exception(
"socket time-out during read") # 如果超时则报超时异常
chunk = self._socket.recv(remaining) # 获取指定长度的数据
if chunk == b'': # 如果为空则读失败即连接失败
raise ConnectionDropped('socket connection broken')
msgparts.append(chunk) # 保存读取的数据
remaining -= len(chunk) # 计算还需要读入多少数据
return b"".join(msgparts) # 返回总共的数据
def _read_header(self, timeout):
b = self._read(4, timeout) # 读长度为4的数据
length = int_struct.unpack(b)[0] # 解析获取的数据包长度
b = self._read(length, timeout) # 读取剩余的长度
header, offset = ReplyHeader.deserialize(b, 0) # 解析数据
return header, b, offset
def _read_response(self, header, buffer, offset):
client = self.client # 获取发送的客户端
request, async_object, xid = client._pending.popleft() # 获取请求队列中的数据
if header.zxid and header.zxid > 0: # 获取头部的zxid标记
client.last_zxid = header.zxid
if header.xid != xid: # 如果头部id与发送的id不一致则报错
exc = RuntimeError('xids do not match, expected %r '
'received %r', xid, header.xid)
async_object.set_exception(exc)
raise exc
# Determine if its an exists request and a no node error
exists_error = (header.err == NoNodeError.code and
request.type == Exists.type)
# Set the exception if its not an exists error
if header.err and not exists_error: # 检查是否出错
callback_exception = EXCEPTIONS[header.err]()
self.logger.debug(
'Received error(xid=%s) %r', xid, callback_exception)
if async_object:
async_object.set_exception(callback_exception)
elif request and async_object: # 检查是否有请求与处理结果对象
if exists_error:
# It's a NoNodeError, which is fine for an exists
# request
async_object.set(None)
else:
try:
response = request.deserialize(buffer, offset) # 解析返回数据中的数据
except Exception as exc:
self.logger.exception(
"Exception raised during deserialization "
"of request: %s", request)
async_object.set_exception(exc)
return
self.logger.debug(
'Received response(xid=%s): %r', xid, response)
# We special case a Transaction as we have to unchroot things
if request.type == Transaction.type:
response = Transaction.unchroot(client, response)
async_object.set(response) # 唤醒客户端等待数据的线程并将值传入
# Determine if watchers should be registered
watcher = getattr(request, 'watcher', None) # 检查是否有watcher
if not client._stopped.is_set() and watcher:
if isinstance(request, (GetChildren, GetChildren2)):
client._child_watchers[request.path].add(watcher) # 添加到watcher中
else:
client._data_watchers[request.path].add(watcher)
if isinstance(request, Close): # 检查是否关闭如果是则关闭
self.logger.log(BLATHER, 'Read close response')
return CLOSE_RESPONSE
主要就是根据传入的数据,去读取相关的数据,并根据接受的数据去解析头部,来决定是否需要继续读入,如果还需要读入则读入后解析成response,然后将response传入给当前客户端等待的线程。
def _send_request(self, read_timeout, connect_timeout):
"""Called when we have something to send out on the socket"""
client = self.client # 获取客户端
try:
request, async_object = client._queue[0] # 从需要发送数据的队列中读取待发送的数据
except IndexError:
# Not actually something on the queue, this can occur if
# something happens to cancel the request such that we
# don't clear the socket below after sending
try:
# Clear possible inconsistence (no request in the queue
# but have data in the read socket), which causes cpu to spin.
self._read_sock.recv(1)
except OSError:
pass
return
# Special case for testing, if this is a _SessionExpire object
# then throw a SessionExpiration error as if we were dropped
if request is _SESSION_EXPIRED: # 如果是请求过期则报错
raise SessionExpiredError("Session expired: Testing")
if request is _CONNECTION_DROP:
raise ConnectionDropped("Connection dropped: Testing")
# Special case for auth packets
if request.type == Auth.type: # 判断请求类型
xid = AUTH_XID
else:
self._xid = (self._xid % 2147483647) + 1
xid = self._xid
self._submit(request, connect_timeout, xid) # 提交该发送的请求
client._queue.popleft() # 从队列中移除该请求
self._read_sock.recv(1)
client._pending.append((request, async_object, xid)) # 在_pending中添加该已经处理的请求数据
def _submit(self, request, timeout, xid=None):
"""Submit a request object with a timeout value and optional
xid"""
b = bytearray() # 生成二进制
if xid:
b.extend(int_struct.pack(xid)) # 如果有xid则添加xid
if request.type:
b.extend(int_struct.pack(request.type)) # 如果有类型则添加类型
b += request.serialize() # 获取请求的序列化数据
self.logger.log(
(BLATHER if isinstance(request, Ping) else logging.DEBUG),
"Sending request(xid=%s): %s", xid, request)
self._write(int_struct.pack(len(b)) + b, timeout) # 将该数据发送出去
def _write(self, msg, timeout):
"""Write a raw msg to the socket"""
sent = 0
msg_length = len(msg)
with self._socket_error_handling():
while sent < msg_length: # 检查已经发送的数据长度
s = self.handler.select([], [self._socket], [], timeout)[1] # 检查是否可写
if not s: # pragma: nocover
# If the write list is empty, we got a timeout. We don't
# have to check rlist and xlist as we don't set any
raise self.handler.timeout_exception("socket time-out"
" during write")
msg_slice = buffer(msg, sent) # 获取待发送的数据
bytes_sent = self._socket.send(msg_slice) # 发送该数据
if not bytes_sent: # 如果发送失败则报错
raise ConnectionDropped('socket connection broken')
sent += bytes_sent # 增加已经发送的数据长度
发送请求的过程,就是根据请求进行序列化,然后将序列化的数据通过IO复用发送给服务器。
def exists(self, path, watch=None):
"""Check if a node exists.
If a watch is provided, it will be left on the node with the
given path. The watch will be triggered by a successful
operation that creates/deletes the node or sets the data on the
node.
:param path: Path of node.
:param watch: Optional watch callback to set for future changes
to this path.
:returns: ZnodeStat of the node if it exists, else None if the
node does not exist.
:rtype: :class:`~kazoo.protocol.states.ZnodeStat` or `None`.
:raises:
:exc:`~kazoo.exceptions.ZookeeperError` if the server
returns a non-zero error code.
"""
return self.exists_async(path, watch=watch).get() # 调用exists_async方法然后调用返回值的get方法
def exists_async(self, path, watch=None):
"""Asynchronously check if a node exists. Takes the same
arguments as :meth:`exists`.
:rtype: :class:`~kazoo.interfaces.IAsyncResult`
"""
if not isinstance(path, string_types):
raise TypeError("Invalid type for 'path' (string expected)")
if watch and not callable(watch):
raise TypeError("Invalid type for 'watch' (must be a callable)")
async_result = self.handler.async_result() # 默认请求时SequentialThreadingHandler的async_result即是AsyncResult类实例
self._call(Exists(_prefix_root(self.chroot, path), watch), # 调用self._call方法并包装Exists类进行数据的序列化
async_result)
return async_result # 返回该实例
def _call(self, request, async_object):
"""Ensure there's an active connection and put the request in
the queue if there is.
Returns False if the call short circuits due to AUTH_FAILED,
CLOSED, EXPIRED_SESSION or CONNECTING state.
"""
if self._state == KeeperState.AUTH_FAILED: # 检查是否认证失败
async_object.set_exception(AuthFailedError()) # 认证失败则报错认证失败
return False
elif self._state == KeeperState.CLOSED: # 检查是否关闭
async_object.set_exception(ConnectionClosedError(
"Connection has been closed")) # 抛出错误异常
return False
elif self._state in (KeeperState.EXPIRED_SESSION,
KeeperState.CONNECTING): # 检查是否过期或者链接中
async_object.set_exception(SessionExpiredError()) # 报错
return False
self._queue.append((request, async_object)) # 添加到待处理列表中
# wake the connection, guarding against a race with close()
write_sock = self._connection._write_sock # 检查socket是否正常
if write_sock is None:
async_object.set_exception(ConnectionClosedError(
"Connection has been closed"))
try:
write_sock.send(b'\0')
except:
async_object.set_exception(ConnectionClosedError(
"Connection has been closed"))
此时就将exists通过Exists来序列化与解析从服务端返回的数据,调用self._call方法将request返回到_queue队列中,然后会在_send_request的处理流程中从队列中取出,然后进行数据发送,在数据发送给服务器的过程中,生成了一个async_result类实例,最后代用该实例的get()方法进行等待,由于默认的是SequentialThreadingHandler中的async_result实例,继续分析如下;
class AsyncResult(utils.AsyncResult):
"""A one-time event that stores a value or an exception"""
def __init__(self, handler):
super(AsyncResult, self).__init__(handler,
threading.Condition,
KazooTimeoutError)
class AsyncResult(object):
"""A one-time event that stores a value or an exception"""
def __init__(self, handler, condition_factory, timeout_factory):
self._handler = handler # 传入SequentialThreadingHandler
self._exception = _NONE
self._condition = condition_factory() # 传入threading.Condition
self._callbacks = []
self._timeout_factory = timeout_factory
self.value = None
def ready(self):
"""Return true if and only if it holds a value or an
exception"""
return self._exception is not _NONE
def successful(self):
"""Return true if and only if it is ready and holds a value"""
return self._exception is None
@property
def exception(self):
if self._exception is not _NONE:
return self._exception
def set(self, value=None):
"""Store the value. Wake up the waiters."""
with self._condition:
self.value = value # 当服务端将数据返回时先保存该值
self._exception = None # 设置异常为空
self._do_callbacks() # 执行回调函数
self._condition.notify_all() # 唤醒等待该条件变量的线程,以此达到将数据传入
def set_exception(self, exception):
"""Store the exception. Wake up the waiters."""
with self._condition:
self._exception = exception # 如果有异常
self._do_callbacks() # 执行回调方法
self._condition.notify_all() # 唤醒所有等待该事件的线程
def get(self, block=True, timeout=None):
"""Return the stored value or raise the exception.
If there is no value raises TimeoutError.
"""
with self._condition: # 通过条件变量来实现数据的等待交互
if self._exception is not _NONE: # 如果异常不为空
if self._exception is None: # 如果异常为空
return self.value # 返回存入的值
raise self._exception # 否则报错
elif block: # 如果是阻塞等待服务端结果
self._condition.wait(timeout) # 通过条件变量等待timeout时间,在本例中是阻塞等待数据
if self._exception is not _NONE: # 如果异常不为空
if self._exception is None: # 判断是否为空
return self.value # 返回保存的数据
raise self._exception # 否则直接抛出
# if we get to this point we timeout
raise self._timeout_factory()
def _do_callbacks(self):
"""Execute the callbacks that were registered by :meth:`rawlink`.
If the handler is in running state this method only schedules
the calls to be performed by the handler. If it's stopped,
the callbacks are called right away."""
for callback in self._callbacks:
if self._handler.running:
self._handler.completion_queue.put(
functools.partial(callback, self)) # 执行将回调函数推到队列中去执行
else:
functools.partial(callback, self)() # 如果没有运行则直接本地运行
至此可以完成的看出,通过conditon来完成将服务端处理完成的数据返回然等待服务端返回数据的线程继续执行下去,至此与服务器的基本交互流程就分析完成。
通过分析zk.exists的工作流程,工作线程将客户端要发送的数据发送给服务器端,然后客户端线程陷入等待,等待工作线程读取到服务端返回的数据,然后继续执行下去,这大概就是与服务器交互的基本流程。由于本人才疏学浅,如有错误请批评指正。