1年多前就看过相关内容了,当时python还不太会用看不懂别人写的代码,最近闲着又翻出来看看顺便解读下pyinotify的代码
使用源自于
http://blog.daviesliu.net/2008/04/24/sync/
这里的代码有2个错误,一个是base多定义了一次,另外就是有几行缩进好像有点问题,需要自己控制下缩进
一行一行解读
flags = IN_CLOSE_WRITE|IN_CREATE|IN_Q_OVERFLOW
这里flags的值是int行的,这里原来我半天没看懂。
如果写成
flags = pyinotify.IN_DELETE | pyinotify.IN_CREATE就好懂多了,这里用几个监控的类型的int值进行逻辑运算成监控需要监控的改变类型的数值具体数值怎么定义可以看看pyinotify.py文件中的class EventsCodes:中定义FLAG_COLLECTIONS的数值
dirs = {}
定义一个空的字典
base = '/log/lighttpd/cache/images/icon/u241'
这里定义了需要监控的文件夹,注意上面连接代码里有个base,自然是原作者忘记注释了其中一个,我们改成/tmp来测试
class UpdateParentDir(ProcessEvent):这里之前看不懂,特别是下面的process_IN_CLOSE_WRITE(self, event):,都不知道event哪里来的因为以前学c么什么函数重载,类的重载。这里其实就是什么派生重载子类而已
我们先看在pyinotify.py里看ProcessEvent这个类,这个类继承自_ProcessEvent这个类...,于是先去瞅瞅_ProcessEvent这个类
_ProcessEvent这个类没有init方法,只有__call__方法,call方法相当于重载了(),具体我们可以测试,我们先call方法里加入print "++++++++++++"
到最后我们再看结果,先跳过
继续看ProcessEvent类的init方法
def __init__(self, pevent=None, **kargs):
self.pevent = pevent
self.my_init(**kargs)
这个init方法也很简单,不赋值也没有问题self.my_init(**kargs)是留给我们自己写方法扩展的,可以不理会。所以这个init方法也没什么好看鸟。
我们可以直接看别人重载的方法在源代码pyinotify.py中的样子
def process_IN_Q_OVERFLOW(self, event):
log.warning('Event queue overflowed.')
def process_default(self, event):
pass
非常明了,不重载之前,原函数只是把对应变化写入log中,重载之后我们可以根据变化做自己想要的操作,比如备份改变的文件,或做同步操作之类。
现在重点是那个event,init里有说明type event: Event instance,不过UpdateParentDir还没开始调用,所以我们先放下Event模块不看。
先看下面的wm = WatchManager()
class WatchManager:
def __init__(self, exclude_filter=lambda path: False):
self._fd = self._inotify_wrapper.inotify_init()
init里主要看self._fd
这个fd是返回inotify监控的节点滴,这里调用了c封装的_inotify_wrapper,应该是初始化监控对象
WatchManager在监控代码中也没传参数,看到后面代码这个类还是通过类的add_watch方法传入内容的,看add_watch方法
def add_watch(self, path, mask, proc_fun=None, rec=False,
auto_add=False, do_glob=False, quiet=True,
exclude_filter=None):
这个方法主要是把path(也是就代码中的base指定的目录)格式化后传入,然后返回个path中内容的字典,监控工作还是没开始。
wd = ret_[rpath] = self.__add_watch(rpath, mask,
proc_fun,
auto_add,
exclude_filter)
add_watch里还调用了__add_watch,__add_watch里面又调用了watch方法,这里主要是从_inotify_wrapper这个c封装中获得inotify的对象
现在可以看把WatchManager和ProcessEvent联系起来的Notifier类了
class Notifier:
def __init__(self, watch_manager, default_proc_fun=None, read_freq=0,
threshold=0, timeout=None):
"""
@type watch_manager: WatchManager instance
@param default_proc_fun: Default processing method. If None, a new
instance of PrintAllEvents will be assigned.
@type default_proc_fun: instance of ProcessEvent
"""
# Watch Manager instance
self._watch_manager = watch_manager
# File descriptor
self._fd = self._watch_manager.get_fd()
# Poll object and registration
self._pollobj = select.poll()
self._pollobj.register(self._fd, select.POLLIN)
# This pipe is correctely initialized and used by ThreadedNotifier
self._pipe = (-1, -1)
# Event queue
self._eventq = deque()
# System processing functor, common to all events
self._sys_proc_fun = _SysProcessEvent(self._watch_manager, self)
# Default processing method
self._default_proc_fun = default_proc_fun
if default_proc_fun is None:
self._default_proc_fun = PrintAllEvents()
# Loop parameters
self._read_freq = read_freq
self._threshold = threshold
self._timeout = timeout
# Coalesce events option
self._coalesce = False
# set of str(raw_event), only used when coalesce option is True
self._eventset = set()
Notifier类传入一个wm类和ProcessEvent类,我们来自己看看init方法代码
self._fd = self._watch_manager.get_fd()
这里看上面WatchManager类的self._fd
self._pollobj = select.poll()
这里就是重点了,poll模型,写过socket的应该知道,异步非阻塞,这里可以看出处理消息方式了
shell中使用while read,这里使用poll模型,效率差距立判了。
python2.7以上才有epoll模型,高版本pyinotify应该会使用epoll模型,如果python版本高,应该自己可以修改这里的代码来使用epoll
self._sys_proc_fun = _SysProcessEvent(self._watch_manager, self)
再调用_SysProcessEvent类,这个类是ProcessEvent的父类,到下面才好理解这个是干嘛的
self._default_proc_fun = default_proc_fun
这里就是我们传入的ProcessEvent类,self._default_proc_fun和self._sys_proc_fun分别在什么情况下用要下面代码才看得出来
init里其他的就不说了,定义队列超时时间之类
ok,到Notifier类初始化完毕,我们的监控都还么正式开始,只是打开了入口(即self._fd = self._inotify_wrapper.inotify_init())
至于dirs.update(wm.add_watch(base, flags, rec=True, auto_add=True))这里当dirs不存在好了,因为wm.add_watch方法会返回一个监控目录根目录内容的字典
所以用了个dirs来装返回值,其实没有也无所谓。
正式开始是在notifier.loop()
我们来看Notifier类的loop方法
def loop(self, callback=None, daemonize=False, **args):
"""
Events are read only one time every min(read_freq, timeout)
seconds at best and only if the size to read is >= threshold.
After this method returns it must not be called again for the same
instance.
@param callback: Functor called after each event processing iteration.
Expects to receive the notifier object (self) as first
parameter. If this function returns True the loop is
immediately terminated otherwise the loop method keeps
looping.
@type callback: callable object or function
@param daemonize: This thread is daemonized if set to True.
@type daemonize: boolean
@param args: Optional and relevant only if daemonize is True. Remaining
keyworded arguments are directly passed to daemonize see
__daemonize() method. If pid_file=None or is set to a
pathname the caller must ensure the file does not exist
before this method is called otherwise an exception
pyinotify.NotifierError will be raised. If pid_file=False
it is still daemonized but the pid is not written in any
file.
@type args: various
"""
if daemonize:
self.__daemonize(**args)
# Read and process events forever
while 1:
try:
self.process_events()
if (callback is not None) and (callback(self) is True):
break
ref_time = time.time()
# check_events is blocking
if self.check_events():
self._sleep(ref_time)
self.read_events()
except KeyboardInterrupt:
# Stop monitoring if sigint is caught (Control-C).
log.debug('Pyinotify stops monitoring.')
break
# Close internals
self.stop()
我们一行一行的看loop的代码
loop传入的参数daemonize可以看daemonize方法,这个其实就是把进程变守护进程的方法,这个和普通守护进程方法差不多
无非就是fork两次setsid父进程退出一类
callback也没什么大用貌似用来自定义的,跳过
下面终于看见while 1了我们的监控开始
loop的循环里首先try process_events方法,于是去看process_events方法
def process_events(self):
"""
Routine for processing events from queue by calling their
associated proccessing method (an instance of ProcessEvent).
It also does internal processings, to keep the system updated.
"""
while self._eventq:
raw_event = self._eventq.popleft() # pop next event
watch_ = self._watch_manager.get_watch(raw_event.wd)
if watch_ is None:
# Not really sure how we ended up here, nor how we should
# handle these types of events and if it is appropriate to
# completly skip them (like we are doing here).
log.warning("Unable to retrieve Watch object associated to %s",
repr(raw_event))
continue
revent = self._sys_proc_fun(raw_event) # system processings
if watch_ and watch_.proc_fun:
watch_.proc_fun(revent) # user processings
else:
self._default_proc_fun(revent)
self._sys_proc_fun.cleanup() # remove olds MOVED_* events records
if self._coalesce:
self._eventset.clear()
由于第一次执行的时候self._eventq队列里肯定没东西是空的我们先跳过process_events看loop方法后面的代码
if (callback is not None) and (callback(self) is True):
break
ref_time = time.time()
这两行简单,跳过
if self.check_events():
这里可以看check_events():方法,可以看见
def check_events(self, timeout=None):
"""
Check for new events available to read, blocks up to timeout
milliseconds.
@param timeout: If specified it overrides the corresponding instance
attribute _timeout.
@type timeout: int
@return: New events to read.
@rtype: bool
"""
while True:
try:
# blocks up to 'timeout' milliseconds
if timeout is None:
timeout = self._timeout
ret = self._pollobj.poll(timeout)
except select.error, err:
if err[0] == errno.EINTR:
continue # interrupted, retry
else:
raise
else:
break
if not ret or (self._pipe[0] == ret[0][0]):
return False
# only one fd is polled
return ret[0][1] & select.POLLIN
check_events就是处理poll的,poll具体怎么用可以google的poll用法,我只用过select所以不太熟悉poll,但是原理是一样的
其实loop里的while 1这里就相当于我以前写select的
while True:
GetList,SendList,ErrList = select.select([self.socket,],[],[],0)
if len(GetList) > 0:
try:
curSock,userAddr = self.socket.accept()
# curSock.settimeout(15)
self.socket_pool.append(curSock)
print "get new socket"
except:
print "error or time out"
get_sock_pool,send_sock_pool,err_sock_pool = select.select(self.socket_pool,[],[],0)
这样的代码了,不停的扫描socket缓冲区,当返回值大于0就接受数据。
loop也是一样,不过用的是poll模型加deque队列(deque队列其实和list差不多,不过比list灵活,可以从两端弹出、插入数值,list只能从后面插)
check完了就read
def read_events(self):
"""
Read events from device, build _RawEvents, and enqueue them.
"""
buf_ = array.array('i', [0])
# get event queue size
if fcntl.ioctl(self._fd, termios.FIONREAD, buf_, 1) == -1:
return
queue_size = buf_[0]
if queue_size < self._threshold:
log.debug('(fd: %d) %d bytes available to read but threshold is '
'fixed to %d bytes', self._fd, queue_size,
self._threshold)
return
try:
# Read content from file
r = os.read(self._fd, queue_size)
except Exception, msg:
raise NotifierError(msg)
log.debug('Event queue size: %d', queue_size)
rsum = 0 # counter
while rsum < queue_size:
s_size = 16
# Retrieve wd, mask, cookie and fname_len
wd, mask, cookie, fname_len = struct.unpack('iIII',
r[rsum:rsum+s_size])
# Retrieve name
fname, = struct.unpack('%ds' % fname_len,
r[rsum + s_size:rsum + s_size + fname_len])
rawevent = _RawEvent(wd, mask, cookie, fname)
if self._coalesce:
# Only enqueue new (unique) events.
raweventstr = str(rawevent)
if raweventstr not in self._eventset:
self._eventset.add(raweventstr)
self._eventq.append(rawevent)
else:
self._eventq.append(rawevent)
rsum += s_size + fname_len
这两个函数都和poll有关,看不懂无所谓,但是大概可以知道这里就是poll使得self._eventq()中有数据(就是把变化的内容传入队列)
read_events后process_events函数就能执行了。
看process_events中有数据以后的执行方式
当self._eventq有内容内容以后
raw_event = self._eventq.popleft()
弹出队列中的内容,这个raw_event就是Event类
watch_ = self._watch_manager.get_watch(raw_event.wd)
通过刚才弹出的对象返回inotify对象
if watch_ is None:
通过上面返回值判断是否被监控,这个判断保险用的,当作不存在
revent = self._sys_proc_fun(raw_event)
创建个叫revent的_SysProcessEvent类,这个类传入的参数raw_event是个event对象,这个event就是变动的文件的相关信息
if watch_ and watch_.proc_fun:
watch_.proc_fun(revent)
else:
self._default_proc_fun(revent)
判断是否把这个类丢给_default_proc_fun。
这里执行了self._default_proc_fun(revent)的话,我们在UpdateParentDir(ProcessEvent):里的方法就会执行
_SysProcessEvent有啥用?其实没啥用,这个类就是定义了默认的各种mark的处理方式让传入的类去继承而已。
在_SysProcessEvent的process_IN_CREATE方法里加入
print "=============="
我们拿改好的代码执行下,当创建一个文件时,出现下面打印(请无视掉caonima....谢谢)
#!/usr/bin/python
from pyinotify import *
import os, os.path
flags = IN_CLOSE_WRITE|IN_CREATE|IN_Q_OVERFLOW
dirs = {}
base = '/log/lighttpd/cache/images/icon/u241'
base = 'tmp'
class UpdateParentDir(ProcessEvent):
def process_IN_CLOSE_WRITE(self, event):
print 'modify', event.pathname
mtime = os.path.getmtime(event.pathname)
p = event.path
while p.startswith(base):
m = os.path.getmtime(p)
if m < mtime:
print 'update', p
os.utime(p, (mtime,mtime))
elif m > mtime:
mtime = m
p = os.path.dirname(p)
process_IN_MODIFY = process_IN_CLOSE_WRITE
def process_IN_Q_OVERFLOW(self, event):
print 'over flow'
max_queued_events.value *= 2
def process_default(self, event):
pass
wm = WatchManager()
notifier = Notifier(wm, UpdateParentDir())
dirs.update(wm.add_watch(base, flags, rec=True, auto_add=True))
notifier.loop()
{'/tmp': 1, '/tmp/.font-unix': 4, '/tmp/.wapi': 5, '/tmp/hsperfdata_root': 2, '/tmp/.ICE-unix': 3}
+++++++++++call+++caonima++++++++++++++
============sys========caonimai============
+++++++++++call+++caonima++++++++++++++
+++++++++++call+++caonima++++++++++++++
+++++++++++call+++caonima++++++++++++++
modify /tmp/14
分析下可以知道,继承_ProcessEvent类的时候先call了一次
在process_events方法中有revent = self._sys_proc_fun(raw_event),所以创建创的时候打印了"========"
所以后面self._default_proc_fun(revent)重载的之前,_ProcessEvent中的process_IN_CREATE其实已经执行过了,即使后面重载process_IN_CREATE方法,原来的process_IN_CREATE
方法还是会被调用过
至于程序怎么识别process_IN_xxx之类的方法可以看_ProcessEvent里的__call__方法
meth = getattr(self, 'process_' + maskname, None)
if meth is not None:
return meth(event)
meth = getattr(self, 'process_IN_' + maskname.split('_')[1], None)
getattr函数很简单,返回名为process_+ maskname的函数
后面多定义了个process_IN +maskname的函数,所以process和process_IN都是可以的函数名
这个pyinotify最重要的就是这几个函数
self._inotify_wrapper = INotifyWrapper.create()
if self._inotify_wrapper is None:
raise InotifyBindingNotFoundError()
self._fd = self._inotify_wrapper.inotify_init() # file descriptor
wd = self._inotify_wrapper.inotify_add_watch(self._fd, path, mask)
我们自己写个类似pyinotify的函数来试试直接调用INotifyWrapper看看
找了下发现INotifyWrapper也是pyinotify里面定义的类,最终找到
try:
libc_name = ctypes.util.find_library('c')
except (OSError, IOError):
pass # Will attemp to load it with None anyway.
if sys.version_info >= (2, 6):
self._libc = ctypes.CDLL(libc_name, use_errno=True)
self._get_errno_func = ctypes.get_errno
else:
self._libc = ctypes.CDLL(libc_name)
try:
location = self._libc.__errno_location
location.restype = ctypes.POINTER(ctypes.c_int)
self._get_errno_func = lambda: location().contents.value
except AttributeError:
pass
# Eventually check that libc has needed inotify bindings.
if (not hasattr(self._libc, 'inotify_init') or
not hasattr(self._libc, 'inotify_add_watch') or
not hasattr(self._libc, 'inotify_rm_watch')):
return False
return True
最终发现是通过ctypes.CDLL('libc.so.6')掉出inotify相关的c封装的