当前位置: 首页 > 工具软件 > Pyinotify > 使用案例 >

pyinotify学习总结

秦炜
2023-12-01

Pyinotify是一个Python模块,用来监测文件系统的变化。 Pyinotify依赖于Linux内核的功能—inotify(内核2.6.13合并)。
1、安装:

#pip install pyinotify
或者 
git clone https://github.com/seb-m/pyinotify.git
cd pyinotify/
python setup.py install

2、整理的api接口:

class ProcessEvent(_ProcessEvent):
 __init__(self, pevent=None, **kargs):
my_init(self, **kargs):
nested_pevent(self)
process_IN_Q_OVERFLOW(self, event):
process_default(self, event)
process_IN_IGNORED(self, raw_event):
process_IN_MOVE_SELF(self, raw_event):
process_IN_MOVED_TO(self, raw_event):
process_IN_MOVED_FROM(self, raw_event):
process_IN_CREATE(self, raw_event):
cleanup(self)

class Notifier:
__init__(self, watch_manager, default_proc_fun=None, read_freq=0, threshold=0, timeout=None):
append_event(self, event):
proc_fun(self):
coalesce_events(self, coalesce=True):
check_events(self, timeout=None):
read_events(self)
process_events(self):
fork_daemon():
loop(self, callback=None, daemonize=False, **args):
 stop(self):

class ThreadedNotifier(threading.Thread, Notifier):
 __init__(self, watch_manager, default_proc_fun=None, read_freq=0,threshold=0, timeout=None):
stop(self):
loop(self)
 run(self):
class TornadoAsyncNotifier(Notifier):
__init__(self, watch_manager, ioloop, callback=None, default_proc_fun=None, read_freq=0, threshold=0, timeout=None,channel_map=None):
stop(self):
handle_read(self, *args, **kwargs):

class AsyncioNotifier(Notifier):       asyncio/trollius event loop adapter.
__init__(self, watch_manager, loop, callback=None,default_proc_fun=None, read_freq=0, threshold=0, timeout=None):
stop(self):
handle_read(self, *args, **kwargs):

class ExcludeFilter:
__init__(self, arg_lst):
examples:
     ef1 = ExcludeFilter(["/etc/rc.*", "/etc/hostname"])
      ef2 = ExcludeFilter("/my/path/exclude.lst")

class WatchManager:
__init__(self, exclude_filter=lambda path: False)
close(self)
get_fd(self)
get_watch(self, wd)
del_watch(self, wd):
add_watch(self, path, mask, proc_fun=None, rec=False,auto_add=False, do_glob=False, quiet=True, exclude_filter=None)
update_watch(self, wd, mask=None, proc_fun=None, rec=False, auto_add=False, quiet=True):
get_wd(self, path):
get_path(self, wd):
 rm_watch(self, wd, rec=False, quiet=True):
 watch_transient_file(self, filename, mask, proc_class):监控一个临时文件,它可以被频繁多次创建、删除。
get_ignore_events(self):得到忽视事件
       set_ignore_events(self, nval):设置忽视的事件
class RawOutputFormat://格式化字符串 
__init__(self, format=None):
simple(self, s, attribute):
      punctuation(self, s):
      field_value(self, s):
      field_name(self, s):
      class_name(self, s):
class ColoredOutputFormat(RawOutputFormat):
__init__(self):
compatibility_mode():
command_line():

3、使用实例:

#!/usr/bin/python3

import pyinotify
import time
import os

class ProcessTransientFile(pyinotify.ProcessEvent):
    def process_IN_MODIFY(self,event):
        line = f.readline()
        if line:
            print(line)
filename = './test.text'
f = open(filename,'r')
st_results = os.stat(filename)
st_size = st_results[6]
f.seek(st_size)
print(st_size)

wm = pyinotify.WatchManager()
notifier = pyinotify.Notifier(wm)
wm.watch_transient_file(filename,pyinotify.IN_MODIFY, ProcessTransientFil
e)
notifier.loop()

4、github地址:

https://github.com/seb-m/pyinotify/blob/master/python3/examples/asyncio_notifier.py
 类似资料: