当前位置: 首页 > 工具软件 > psutil > 使用案例 >

系统性能监控-psutil功能详解

汤洋
2023-12-01

psutil是一个跨平台的库,用于在Python中检索有关运行进程和系统利用率(CPU,内存,磁盘,网络,传感器)的信息。它主要用于系统监视,分析,限制进程资源和运行进程的管理。它实现了UNIX命令行工具提供的许多功能,例如:ps,top,lsof,netstat,ifconfig,who,df,kill,free,nice,ionice,iostat,iotop,uptime,pidof,tty,taskset,pmap。

//安装psutil
pip install psutil
//若上面安装失败可以尝试下面
pip install psutil -i https://pypi.python.org/simple/ 

获取CPU信息

psutil.cpu_count()  #获取CPU个数
psutil.cpu_count(logical=False)
psutil.cpu_stats() #以命名元组的形式返回各种CPU统计信息:
参数信息
    ctx_switches: 启动以来上下文切换的次数(自愿+非自愿)。
    interrupts: 自启动以来中断的次数。
    soft_interrupts:自启动以来,软件中断的次数。在Windows和SunOS上总是设置为0。
    soft_interrupts:自启动以来,系统调用次数。在Linux上总是设置为0

psutil.cpu_count() # 逻辑CPU核数
psutil.cpu_count(logical=False) # 物理CPU核数
psutil.cpu_times() # CPU的用户、系统、空闲时间
psutil.cpu_percent(percpu=True) # 获取每个CPU的使用率,类似TOP命令
top = [psutil.cpu_percent(interval=i, percpu=True) for i in range(10)] #设置每秒刷新时间间隔,统计十次的结果
psutil.getloadavg()
    将过去1、5和15分钟内的平均系统负载作为一个元组返回。负载表示处于可运行状态的进程,要么使用CPU,要么等待使用CPU(例如,等待磁盘I/O)。
    在UNIX系统中,这依赖于os.getloadavg。在Windows上,这是通过使用Windows API来模拟的,该API生成一个线程,该线程在后台持续运行,并每5秒更新一次负载,
    模拟UNIX行为。因此,第一次调用它时,在接下来的5秒内,它将返回一个无意义的(0.0,0.0,0.0)元组。返回的数字只有与系统上安装的CPU内核数量相关时才有意义。
    例如,在一个有10个CPU核心的系统上,3.14意味着在过去的N分钟内系统负载为31.4%。

实例:


"""
获取cpu信息,负载、使用率信息
"""
def get_cpuinfo():
    cpuInfo={}
    try:
        ostype = get_ostype()
        if ostype == "Linux":
            #获取cpu使用率
            cpuuse_1 = psutil.cpu_percent()
            cpuuse = str(cpuuse_1)
            log_info(LOG_LOCATE(), "cpuuse:%s" % (cpuuse))

            #获取cpu负载
            f = open("/proc/loadavg")
            con = f.read().split()
            f.close()
            cpuload = con[0] +'/'+con[1]+'/'+ con[2]
            cpuInfo['cpuUse'] = cpuuse
            cpuInfo['cpuLoad_1'] = con[0]
            cpuInfo['cpuLoad_5'] = con[1]
            cpuInfo['cpuLoad_15'] = con[2]
        else:
            #获取cpu相关信息
            #logic_count = psutil.cpu_count()
            #py_conut =  psutil.cpu_count(logical=False)
            #log_info(LOG_LOCATE(), "logic_count:%d, py_conut:%d" % (logic_count, py_conut))

            #获取cpu使用率
            try:
                cpuuse_1 = psutil.cpu_percent()
                cpuuse_2 = psutil.cpu_percent(percpu=True)
                log_info(LOG_LOCATE(), "cpuuse_1:%s, cpuuse_2:%s" % (cpuuse_1, cpuuse_2))
            except Exception as e:
                log_info(LOG_LOCATE(), "get cpu percent info error!")
                cpuuse_1 = 0.0
                cpuuse_2 = 0.0

            if cpuuse_1 > 90.0:
                cpuuse = str(cpuuse_2[0])
                if cpuuse == "0.0":
                    cpuuse = "0.1"
            else:
                cpuuse = str(cpuuse_1)
            try:
                ptemp = os.popen("wmic cpu get loadpercentage")
                x = ptemp.read()
                log_info(LOG_LOCATE(), "-1-get cpuinfo str x:%s" % (x))
                str1 = re.split('\W+', x)
                if str1[-2] != " " and str1[-2] != "" and str1[-2].isspace() == False:
                    if str1[-2] != "100":
                        cpuInfo['cpuUse'] = cpuuse
                        cpuInfo['cpuLoad_1'] = str1[-2]
                        cpuInfo['cpuLoad_5'] = str1[-2]
                        cpuInfo['cpuLoad_15'] = str1[-2]
                    else:
                        if str1[-3] != " " and str1[-3] != "" and str1[-3].isspace() == False:
                            cpuInfo['cpuUse'] = cpuuse
                            cpuInfo['cpuLoad_1'] = str1[-3]
                            cpuInfo['cpuLoad_5'] = str1[-3]
                            cpuInfo['cpuLoad_15'] = str1[-3]
                        else:
                            cpuInfo['cpuUse'] = cpuuse
                            cpuInfo['cpuLoad_1'] = str1[-2]
                            cpuInfo['cpuLoad_5'] = str1[-2]
                            cpuInfo['cpuLoad_15'] = str1[-2]
                else:
                    log_info(LOG_LOCATE(), "-1-get cpu percent error!")
                    ptemp.close()
            except Exception as e:
                log_info(LOG_LOCATE(), "-2-get cpu percent error!")
    except Exception as e:
        log_info(LOG_LOCATE(), "-1-get cpu info error")
    log_info(LOG_LOCATE(), "getcpuinfo:",cpuInfo)
    return cpuInfo

 获取内存信息

psutil.virtual_memory() #获取内存统计数据,单位bytes
参数:
    total: 总物理内存(互斥交换)。
    available: 可以立即分配给进程而不需要系统进入交换区。这是通过根据平台对不同的内存值求和计算出来的,它应该用于以跨平台的方式监视实际的内存使用情况。
    其他指标:
    used: 使用的内存,根据平台计算不同,仅用于信息目的。完全空闲不一定匹配使用。
    free:内存根本没有被使用(0),随时可用;注意,这并没有反映实际可用的内存(而是使用可用的内存)。总使用量不一定匹配空闲。
    active (UNIX): 当前使用的或最近使用的内存,所以它在RAM中。
    inactive (UNIX): 标记为未使用的内存。
    buffers (Linux, BSD): 缓存文件系统元数据之类的东西。
    cached (Linux, BSD): 缓存各种东西。
    shared (Linux, BSD): 多个进程可以同时访问的内存。
    slab (Linux): 内核内的数据结构缓存。
    wired (BSD, macOS): 被标记为始终保持在RAM中的内存。它从未移动到磁盘上。
psutil.swap_memory() # 获取swap的统计数据
参数:
    total:总交换内存(以字节为单位)
    used:使用交换内存(以字节为单位)
    free:空闲交换内存(以字节为单位)
    percent:使用百分比,计算公式为(total - available) / total * 100
    sin: 系统从磁盘交换进来的字节数(累计)
    sout: 系统从磁盘交换出的字节数(累计)
"""
获取mem信息,内存总大小、剩余内存、内存使用率(M)
"""
def get_meminfo():
    memInfo={}
    ostype = get_ostype()
    if ostype == "Linux":
        try:
            memInfo['total'] = str(round(psutil.virtual_memory().total / (1024.0 * 1024.0), 1))
            #mem的free没有加psutil.virtual_memory().shared
            memInfo['free'] = str(round(((psutil.virtual_memory().free + psutil.virtual_memory().buffers\
                + psutil.virtual_memory().cached)) / (1024.0 * 1024.0), 1))
            #memInfo['used'] = psutil.virtual_memory().percent
        except Exception as e:
            log_info(LOG_LOCATE(), "get memInfo error")
    else:#windows memfree待确认
        try:
            memInfo['total'] = str(round(psutil.virtual_memory().total / (1024.0 * 1024.0), 1))
            #mem的free没有加psutil.virtual_memory().shared
            memInfo['free'] = str(round((psutil.virtual_memory().free) / (1024.0 * 1024.0), 1))
            #memInfo['used'] = psutil.virtual_memory().percent
        except Exception as e:
            log_info(LOG_LOCATE(), "get memInfo error")
    #print("memInfo:%s" % (memInfo))
    return memInfo

获取磁盘信息

psutil.disk_partitions() #获取磁盘分区信息
psutil.disk_usage("/") # 获取分区使用情况,
psutil.disk_io_counters() #磁盘IO情况
def get_OsDiskInfo():
    type = get_ostype()
    if type == "Linux":
        get_diskinfo("/opt") #磁盘信息
    else:
        get_diskinfo("C:/") #磁盘信息

"""
获取disk信息,磁盘大小、剩余磁盘大小、磁盘使用率
"""
def get_diskinfo(diskDir):
    #/root, /opt, /swap, /var, /home 
    #C:/, D:/
    diskInfo = {}

    p = psutil.disk_usage(diskDir)
    diskInfo['dir'] = diskDir
    diskInfo['size'] = str(int(p.total / (1024.0 * 1024.0)))
    diskInfo['used'] = str(int(p.used / (1024.0 * 1024.0)))
    diskInfo['percent'] = psutil.disk_usage(diskDir).percent
    log_info(LOG_LOCATE(), "diskinfo",diskInfo)
    return diskInfo

 获取网络信息

psutil.net_if_stats() # 获取网卡接口状态 
参数:
    isup: 一个bool,指示NIC是否已启动并正在运行。
    duplex: 双工通信类型;它可以是NIC_DUPLEX_FULL、NIC_DUPLEX_HALF或NIC_DUPLEX_UNKNOWN。
    speed: 网卡速度以兆比特(MB)表示,如果它不能被确定(如' localhost '),它将被设置为0。
    mtu: NIC的最大传输单元,以字节表示。
psutil.net_if_stats().get("en0") #获取单个网卡en0的状态 snicstats(isup=True, duplex=, speed=0, mtu=1500)
psutil.net_if_addrs() # 获取所有网卡的地址信息
参数:
    family: 地址族,可以是 AF_INET或AF_INET6或psutil.AF_LINK,它指的是MAC地址。
    address: 主网卡地址(总是设置)。
    netmask: 网络掩码地址(可能没有)。
    broadcast: 广播地址(可能没有)。
    ptp: 表示“点对点”;它是点到点接口(通常是VPN)上的目标地址。广播和ptp是互斥的。可能没有。
# 获取en0网卡的地址, 这里包括mac和ipv6地址
psutil.net_if_addrs().get("en0"):
psutil.net_io_counters() # 获取网络读写字节/包的个数
参数:
    bytes_sent:发送的字节数
    bytes_recv:收到的字节数
    packets_sent:发送的数据包数
    packets_recv:收到的数据包数
    errin:收到错误的总数
    errout:发送时的错误总数
    dropin:丢弃的传入数据包总数
    dropout:被丢弃的传出数据包总数(在MacOS和BSD上总是为0)
psutil.net_connections() # 获取网络连接信息,注意这里需要root权限。

"""
获取设备网卡收发速率
"""
def get_key():
    key_info = psutil.net_io_counters(pernic=True).keys() #获取网卡名称
    recv = {}
    sent = {}

    for key in key_info:
        recv.setdefault(key, psutil.net_io_counters(pernic=True).get(key).bytes_recv) #各网卡接收的字节数
        sent.setdefault(key, psutil.net_io_counters(pernic=True).get(key).bytes_sent) #各网卡发送的字节数

    return key_info, recv, sent
def get_rate(func):
    key_info, old_recv, old_sent = func() #前5s收集的数据
    time.sleep(5)
    key_info, now_recv, now_sent = func() #当前所收集的数据

    net_in = {}
    net_out = {}

    for key in key_info:
        net_in.setdefault(key, (now_recv.get(key) - old_recv.get(key))/5) #每秒接收速率
        net_out.setdefault(key, (now_sent.get(key) - old_sent.get(key))/5) #每秒发送速率
    return key_info, net_in, net_out

def get_netinfo():
    try:
        netInfo = {}
        key_info, net_in, net_out = get_rate(get_key)#获取网卡收发速率bytes

        netInfo["Rx_rate"] = net_in
        netInfo["Tx_rate"] = net_out
    except Exception as e:
        log_info(LOG_LOCATE(), "get ipInfo error")
    return netInfo

 获取进程信息:

psutil.pids() # 获取所有进程IDpsutil.Process(61) # 获取指定PID的进程信息
psutil.Process(45573).exe() # 获取进程的exe路径
psutil.Process(45573).name() # 获取进程名称
psutil.Process(45573).cmdline() # 获取进程启动的命令
psutil.Process(45573).num_threads() # 获取进程的线程数量
psutil.Process(45573).environ() # 获取进程的环境变量信息
psutil.Process( pid=无) 
    #表示具有给定pid的 OS 进程。如果省略 pid,则使用当前进程pid ( os.getpid )。NoSuchProcess如果pid不存在则引发。在 Linux 上, pid还可以引用线程 ID(方法返回的id字段 threads())。
    #访问此类的方法时,请始终准备好捕获 NoSuchProcess和AccessDenied异常。 hash builtin 可用于此类的实例,以便随着时间的推移明确识别进程(哈希由混合进程 PID + 创建时间确定)
pid # 进程PID。这是该类的唯一(只读)属性。
ppid( ) #进程父 PID。在 Windows 上,返回值在第一次调用后被缓存。不在 POSIX 上,因为如果进程变成僵尸,ppid 可能会改变 另见parent()和parents()方法。
name( ) #进程名称。在 Windows 上,返回值在第一次调用后被缓存。不在 POSIX 上,因为进程名称可能会更改。另请参阅如何按名称查找进程。
exe( ) #进程可执行为绝对路径。在某些系统上,这也可能是一个空字符串。返回值在第一次调用后被缓存
status( ) #当前进程状态为字符串。返回的字符串是 psutil.STATUS_*常量之一。
num_threads( ) #此进程当前使用的线程数(非累积)。
num_fds( ) #此进程当前打开的文件描述符数(非累积)。 可用性:UNIX
memory_info( ) #根据表示有关进程的内存信息的平台,返回具有可变字段的命名元组。所有平台上可用的“便携式”字段是rss和vms。所有数字都以字节表示。
is_running( ) #返回当前进程是否在当前进程列表中运行。这在进程消失并且它的 PID 被另一个进程重用的情况下也是可靠的,因此它必须优于doing psutil.pid_exists(p.pid)。
"""
根据名称查找进程
"""
def find_procs_by_name(name):
    "Return a list of processes matching 'name'."
    ls = []
    for p in psutil.process_iter(["name", "exe", "cmdline"]):
        if name == p.info['name'] or \
                p.info['exe'] and os.path.basename(p.info['exe']) == name or \
                p.info['cmdline'] and p.info['cmdline'][0] == name:
            ls.append(p)
    return ls

"""
获取进程运行信息
"""
def get_procinfo(pidName, time_range, ipAddr, mc_pub_time):
    procInfo={}
    servList = []
    servInfoToDbList = []
    time_cnt = int(time_range)/int(mc_pub_time)

    statlist = find_procs_by_name(pidName)
    print("procinfo:",statlist,"time_cnt:",time_cnt,"size:",len(statlist))
    if(statlist == "" or statlist == " " or len(statlist) == 0):
        procInfo['srevStatus'] = "ABNOMAL"
        procInfo['time'] = int(time.time())
    for p in statlist:
        print("pid:",p.pid,"status:",p.status(),"name:",p.name())
        if(p.is_running()):
            procInfo['srevStatus'] = "NOMAL"
            procInfo['time'] = int(time.time())
        else:
            procInfo['srevStatus'] = "ABNOMAL"
            procInfo['time'] = int(time.time())
        servInfoToDbList.append(procInfo)
    return servInfoToDbList

if __name__ == '__main__':
    print(get_procinfo("QQ.exe", "10", "127.0.0.1", "6"))

 其他信息

psutil.boot_time() #返回系统启动时间(以秒为单位)。
psutil.users() #将当前连接到系统上的用户作为命名元组列表返回
参数:
    user: 用户的名称。
    terminal: 与用户关联的tty或伪tty(如果有的话),否则为None。
    host: 与条目关联的主机名(如果有的话)。
    started: 创建时间作为自纪元以来以秒为单位表示的浮点数。
    pid: 登录进程的pid(如sshd、tmux、gdm-session-worker等)。在Windows和OpenBSD中,这总是设置为None。 
"""
获取系统时间 int
"""
def get_systimeinfo():
    sysTimeInfo={}

    now_time = int(time.time())
    sysTimeInfo["time"] = now_time
    return sysTimeInfo

def get_sysTime():
    """
    获取系统时间
    """
    try:
        timeinfo = get_systimeinfo()
        sysTime = timeinfo["time"]
        log_info(LOG_LOCATE(), "sysTime:%d" % (sysTime))
    except Exception as e:
        log_info(LOG_LOCATE(), "get systime info error")
    return sysTime

 psutil模块中传感器等其他功能可以查看官方文档:

psutil documentation — psutil 5.9.1 documentation

https://github.com/giampaolo/psutil 

 类似资料: