Pyinotify是一个Python模块,用来监测文件系统的变化。 Pyinotify依赖于Linux内核的功能—inotify(内核2.6.13合并)。
1、安装:
#pip install pyinotify
或者
git clone https://github.com/seb-m/pyinotify.git
cd pyinotify/
python setup.py install
2、整理的api接口:
class ProcessEvent(_ProcessEvent):
__init__(self, pevent=None, **kargs):
my_init(self, **kargs):
nested_pevent(self)
process_IN_Q_OVERFLOW(self, event):
process_default(self, event)
process_IN_IGNORED(self, raw_event):
process_IN_MOVE_SELF(self, raw_event):
process_IN_MOVED_TO(self, raw_event):
process_IN_MOVED_FROM(self, raw_event):
process_IN_CREATE(self, raw_event):
cleanup(self)
class Notifier:
__init__(self, watch_manager, default_proc_fun=None, read_freq=0, threshold=0, timeout=None):
append_event(self, event):
proc_fun(self):
coalesce_events(self, coalesce=True):
check_events(self, timeout=None):
read_events(self)
process_events(self):
fork_daemon():
loop(self, callback=None, daemonize=False, **args):
stop(self):
class ThreadedNotifier(threading.Thread, Notifier):
__init__(self, watch_manager, default_proc_fun=None, read_freq=0,threshold=0, timeout=None):
stop(self):
loop(self)
run(self):
class TornadoAsyncNotifier(Notifier):
__init__(self, watch_manager, ioloop, callback=None, default_proc_fun=None, read_freq=0, threshold=0, timeout=None,channel_map=None):
stop(self):
handle_read(self, *args, **kwargs):
class AsyncioNotifier(Notifier): asyncio/trollius event loop adapter.
__init__(self, watch_manager, loop, callback=None,default_proc_fun=None, read_freq=0, threshold=0, timeout=None):
stop(self):
handle_read(self, *args, **kwargs):
class ExcludeFilter:
__init__(self, arg_lst):
examples:
ef1 = ExcludeFilter(["/etc/rc.*", "/etc/hostname"])
ef2 = ExcludeFilter("/my/path/exclude.lst")
class WatchManager:
__init__(self, exclude_filter=lambda path: False)
close(self)
get_fd(self)
get_watch(self, wd)
del_watch(self, wd):
add_watch(self, path, mask, proc_fun=None, rec=False,auto_add=False, do_glob=False, quiet=True, exclude_filter=None)
update_watch(self, wd, mask=None, proc_fun=None, rec=False, auto_add=False, quiet=True):
get_wd(self, path):
get_path(self, wd):
rm_watch(self, wd, rec=False, quiet=True):
watch_transient_file(self, filename, mask, proc_class):监控一个临时文件,它可以被频繁多次创建、删除。
get_ignore_events(self):得到忽视事件
set_ignore_events(self, nval):设置忽视的事件
class RawOutputFormat://格式化字符串
__init__(self, format=None):
simple(self, s, attribute):
punctuation(self, s):
field_value(self, s):
field_name(self, s):
class_name(self, s):
class ColoredOutputFormat(RawOutputFormat):
__init__(self):
compatibility_mode():
command_line():
3、使用实例:
#!/usr/bin/python3
import pyinotify
import time
import os
class ProcessTransientFile(pyinotify.ProcessEvent):
def process_IN_MODIFY(self,event):
line = f.readline()
if line:
print(line)
filename = './test.text'
f = open(filename,'r')
st_results = os.stat(filename)
st_size = st_results[6]
f.seek(st_size)
print(st_size)
wm = pyinotify.WatchManager()
notifier = pyinotify.Notifier(wm)
wm.watch_transient_file(filename,pyinotify.IN_MODIFY, ProcessTransientFil
e)
notifier.loop()
4、github地址:
https://github.com/seb-m/pyinotify/blob/master/python3/examples/asyncio_notifier.py