python监听本地目录及文件变化并将其同步到云主机/远端服务器(watchdog做监听)

艾宁
2023-12-01

毕业在即,闲着无聊,偶然发现腾讯云主机正在搞活动,于是买了一台云主机。想用跑一些日常小任务,如爬虫。然而在云主机上利用vim进行程序开发实在麻烦,本地开发完程序还需手动同步到服务端,太过麻烦。
搭建samba后发现mac访问腾讯云主机上的samba速度很慢,ftp速度还是比较快的。因此就想着自己写一个自动同步本地代码到云主机的python脚本。本文已上传到github


一、搭建ftp服务器

搭建ftp服务器见文章ubuntu16.04搭建ftp服务器

二、思路简介

本文工具主要利用watchdog对文件夹做监听,如发现文件夹中的文件有移动、创建、重命名、修改操作,那么就把对应的修改上传到ftp服务器。如文件夹某文件删除了,那么不对服务端的数据做处理。也就是对ftp服务器的数据做增量更新。

三、模块简介

1. 日志处理模块

日志处理模块主要是设置日志格式,日志输出等

#coding=utf-8
import os
import datetime
import logging
import logging.config

def genLogDict(logDir, logFile):
    '''
    配置日志格式的字典
    '''
    logDict = {
        "version": 1,
        "disable_existing_loggers": False,
        "formatters": {
            "simple": {
                'format': '%(asctime)s [%(name)s:%(lineno)d] [%(levelname)s]- %(message)s'
            },
            'standard': {
                'format': '%(asctime)s [%(threadName)s:%(thread)d] [%(name)s:%(lineno)d] [%(levelname)s]- %(message)s'
            },
        },

        "handlers": {
            "console": {
                "class": "logging.StreamHandler",
                "level": "DEBUG",
                "formatter": "simple",
                "stream": "ext://sys.stdout"
            },

            "default": {
                "class": "logging.handlers.RotatingFileHandler",
                "level": "INFO",
                "formatter": "simple",
                "filename": os.path.join(logDir, logFile),
                'mode': 'w+',
                "maxBytes": 1024*1024*5,  # 5 MB
                "backupCount": 20,
                "encoding": "utf8"
            },
        },

        # "loggers": {
        #     "app_name": {
        #         "level": "INFO",
        #         "handlers": ["console"],
        #         "propagate": "no"
        #     }
        # },

        "root": {
            'handlers': ['default'],
            'level': "INFO",
            'propagate': False
        }
    }
    return logDict


def initLogConf():
    """
    配置日志
    """
    baseDir = os.path.dirname(os.path.abspath(__file__))
    logDir = os.path.join(baseDir, "logs")
    if not os.path.exists(logDir):
        os.makedirs(logDir)  # 创建路径

    logFile = datetime.datetime.now().strftime("%Y-%m-%d") + ".log"
    logDict = genLogDict(logDir, logFile)
    logging.config.dictConfig(logDict)

if __name__ == '__main__':
    initLogConf()
    log = logging.getLogger(__file__)
    print "print A"
    log.info("log B")

2. ftp模块

主要是基于ftplib库,利用python代码实现文件从ftp服务器上传、下载、删除、移动> 等。

#!/usr/bin/python
# -*- coding: utf-8 -*-
__author__ = 'lihailin'  
__mail__ = '415787837@qq.com'  
__date__ = '2018-05-03'  
__version__ = 1.0  

import sys  
import os  
from ftplib import FTP  
import logging
import log
log.initLogConf()
_logging = logging.getLogger(__file__)
reload(sys)
sys.setdefaultencoding('utf-8')


import socket  
timeout = 1000    
socket.setdefaulttimeout(timeout)

class Xfer(object):  
    ''''' 
    上传本地文件或目录递归上传到FTP服务器 
    '''  
    def __init__(self):  
        self.ftp = None  


    def setFtpParams(self, ip, uname, pwd, port = 21, timeout = 60):  
        # 设置ftp参数        
        self.ip = ip  
        self.uname = uname  
        self.pwd = pwd  
        self.port = port  
        self.timeout = timeout  


    def initEnv(self):  
        # 链接ftp
        if self.ftp is None:  
            self.ftp = FTP()  
            _logging.debug('### connect ftp server: %s ...'%self.ip) 
            self.ftp.connect(self.ip, self.port, self.timeout)  
            self.ftp.login(self.uname, self.pwd)   
            _logging.debug(self.ftp.getwelcome())


    def clearEnv(self):  
        # ftp断开链接
        if self.ftp:  
            self.ftp.close()  
            # self.ftp.quit(), close和quit只能选一个
            _logging.debug('### disconnect ftp server: %s!'%self.ip)  
            self.ftp = None  


    def uploadFile(self, localpath, remotepath='./'): 
        ''' 
        上传文件至服务端,为了保存本地和服务器路径一致
        loacalpath路径文件夹需是单层,如'a/b/c.txt'是不行的。
        '''
        if not os.path.isfile(localpath):    
            return  
        _logging.info('+++ uploading %s to %s:%s'%(localpath, self.ip, remotepath))
        try:
            self.ftp.storbinary('STOR ' + remotepath, open(localpath, 'rb'))
            _logging.info('upload success: %s '%localpath)  
        except Exception as e:
            _logging.info('upload fail %s : %s'%(localpath, e))


    def uploadDir(self, localdir='./', remotedir='./'):  
        '''
        localdir 里面的文件以及全部同步到服务器的remotedir
        为了保存本地和服务器路径一致,localdir路径需是单层,如'a/b/c'是不行的
        '''
        # print sdfsdf 为了产生bug方便测试

        if not os.path.isdir(localdir):    
            return  
        self.ftp.cwd(remotedir)
        for file in os.listdir(localdir):  
            src = os.path.join(localdir, file)  
            if os.path.isfile(src):
                self.uploadFile(src, file)  
            elif os.path.isdir(src):  
                try:    
                    self.ftp.mkd(file)    
                except:    
                    # sys.stderr.write('the dir is exists %s'%file)  
                    # _logging.error('the dir is exists %s'%file)
                    pass
                self.uploadDir(src, file)  
        self.ftp.cwd('..')  


    def walkLastServer(self, src):
        '''
        在服务端递归创建文件夹,如'a/b/c'
        并将cd到最里层文件夹
        '''
        deldictorys = src.split('/')
        for d in deldictorys[:-1]:
            try:    
                self.ftp.mkd(d)    
            except:
                pass    
            self.ftp.cwd(d)
        return deldictorys[-1]


    def _upload(self, src):  
        '''
        可同时上传文件以及文件夹
        文件夹及文件路径可含有多层,'desk/za/a.txt'是可行的
        '''
        self.initEnv() 
        desSrc = self.walkLastServer(src)
        if os.path.isdir(desSrc):  
            try:    
                self.ftp.mkd(src)    
            except:
                pass
            try:
                log_s = "+++ uploading directory %s" % desSrc 
                _logging.info(log_s)
                self.uploadDir(desSrc, desSrc)
                log_s = "upload directory sucess: %s" % desSrc 
                _logging.info(log_s)  
            except Exception as e:
                _logging.error(e)
                log_s = "upload directory fail: %s" % desSrc 
                _logging.info(log_s)  
        elif os.path.isfile(src):  
            self.uploadFile(src, desSrc)
        self.clearEnv()    


    def upload(self, src, times=3):
        '''
        上传文件是一个耗时的操作,防止timeout
        '''
        try:
            self._upload(src)
        except Exception as e:
            _logging.error(e)
            times -= 1
            if times>0:
                _logging.info('======重传文件: %s ======' % src)
                self.upload(src,times)


    def rename(self, fromname, toname):
        self.initEnv() 
        logging.info('+++ rename %s to %s'%(fromname, toname))
        try:
            self.ftp.rename(fromname, toname)
            logging.info('rename success: %s' %fromname)
        except Exception as e:
            _logging.error(e)
            logging.info('rename fail %s : %s' %(fromname, e))

        self.clearEnv()   


    # def delFile(self, remotefile):
    #     '''
    #     删除文件,路径可是多层
    #     '''
    #     if not os.path.isfile(remotefile):    
    #         return
    #     self.initEnv() 
    #     self.ftp.delete(remotefile)
    #     self.clearEnv()


    # def delDictory(self, remotedic):
    #     '''
    #     删除文件夹,路径可是多层
    #     '''
    #     self.initEnv() 
    #     for a,b,c in os.walk(remotedic,topdown=False):
    #         for ai in a:
    #             for ci in c:
    #                 self.delfile(os.path.join(ai, ci))
    #             self.ftp.rmd(ai)
    #     self.clearEnv()


if __name__ == '__main__':  

    ip = 'xxxx.xxxx.xxxx.xxxx'
    username = 'xxxx' 
    passwd = 'xxx'

    # to = 'offer/tencent-实习offer'
    # src = 'offer/相关资料/tencent-实习offer'
    # xfer = Xfer()  
    # xfer.setFtpParams(ip, username, passwd)     
    # xfer.rename(src, to)

    # #测试
    # srcFile = r'offer'
    # srcFile = 'offer/相关资料/成绩单.docx'
    # xfer = Xfer()  
    # xfer.setFtpParams(ip, username, passwd)    
    # xfer.upload(srcFile) 
©

3. watchdog监听模块

利用watchdog对文件夹做监听,如发现文件夹中的文件有移动、创建、重命名、修改操作,那么就把对应的修改上传到ftp服务器。如文件夹某文件删除了,那么不对服务端的数据做处理。

#!/usr/bin/python
# -*- coding: utf-8 -*-
__author__ = 'lihailin'  
__mail__ = '415787837@qq.com'  
__date__ = '2018-05-03'  
__version__ = 1.0 

import sys
import time
import ftp
# 配置日志
import logging
import log
log.initLogConf()
_logging = logging.getLogger(__file__)
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
reload(sys)
sys.setdefaultencoding('utf-8')


# 配置ftp
ip = 'x.x.x.x'
username = 'xxxx' 
passwd = 'xxxx'

class ToServer(FileSystemEventHandler):
    '''
    将本地文件变化事件记录到日志中,并上传到服务器
    '''
    def __init__(self, path, ip, username, passwd):
        super(ToServer, self).__init__()
        # 配置ftp服务器
        self.xfer = ftp.Xfer()  
        self.xfer.setFtpParams(ip, username, passwd)
        # 开启服务时上传一遍文件至远程文件夹
        self.xfer.upload(path)


    def on_any_event(self, event):
        # 将发生过的事件写入日志
        if event.is_directory:
            is_d = 'directory'
        else:
            is_d = 'file'
        log_s = "%s %s: %s " % (event.event_type, is_d, event.src_path)
        # print log_s
        _logging.info(log_s)


    def on_created(self, event):  
        # 仅上传文件
        if event.is_directory:
            return  
        self.xfer.upload(event.src_path)


    def on_modified(self, event):  
        self.on_created(event)


    def on_moved(self, event):
        '''
        服务器中的文件进行移动
        '''
        self.xfer.rename(event.src_path, event.dest_path)
        # log_s = "move file: %s to %s" % (event.src_path, event.dest_path) 
        # _logging.info(log_s)


if __name__ == "__main__":
    path = sys.argv[1] if len(sys.argv) > 1 else '.'
    event_handler = ToServer(path, ip, username, passwd)  
    observer = Observer()  
    observer.schedule(event_handler, path, recursive=True)  
    observer.start()  
    try:  
        while True:  
           time.sleep(1)
    except KeyboardInterrupt:  
        observer.stop()  
    observer.join()  

四、使用

1. 下载项目代码

修改synWatch.py中的配置信息,配置信息有ftp服务器的ip地址,ftp用户名、密码。

 ip = 'x.x.x.x'
 username = 'xxxx'
 passwd = 'xxxx'

2. 进行文件监听

# yourpath为你要监听的本地目录
python synWatch.py yourpath

参考资料

  1. python watchdog:监控文件系统事件的Python库
  2. Python实现FTP上传文件或文件夹
  3. python应用系列教程——python中ftp操作:连接、登录、获取目录,重定向、上传下载,删除更改

环境

  1. MacOS X 10.11.6
  2. python2.7
  3. watchdog 0.83
 类似资料: