python 多进程以及进程间manager通信实际应用场景

夏弘文
2023-12-01

hello, 大家好,许久没有更新博客了,今天就工作中运用到一个实际场景给大家分享一下,也算是自己在python多进程方面的一个实际应用,希望能够给大家带来一点帮助同时也希望大佬能指点一下其中一点我的疑惑。

实际场景是这样的,一个同事使用C++在服务器上部署了一个应用,该应用可以调用起来服务器上的几个服务(比如服务A、服务B、服务C),运行起来同事的应用后,会实时将这几个服务的参数(CPU时间片占用、内存使用、处理请求数、响应时间等)打印到终端,同事想要看到的就是我能将运行起来的数据的日志实时保存到本地一份,同时也能每间隔一段时间能打印一下监控的这几个服务的各个参数。以便在执行过程中看一下这几个服务运行时的瞬时参数。

需求有了,接下来该我表演了;

第一:首先是需要连接上服务器并且执行起来该应用从而触发监控日志动作,python的paramiko模块用起来没毛病

 1 import paramiko
 2 
 3 
 4 class SSHClient:
 5     """
 6     ssh连接服务器并且执行命令,检查返回值
 7     """
 8 
 9     def __init__(self, ip, port=22, username="tommy", password="tommy", timeout=3):
10         self.ip = ip
11         self.port = port
12         self.username = username
13         self.password = password
14         self.timeout = timeout
15         self.ssh = None
16         self.invoke_shells = None
17 
18     def connect_board(self):
19         # 创建SSH对象
20         ssh = paramiko.SSHClient()
21         # 允许连接不在know_hosts文件中的主机
22         ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
23         self.ssh = ssh
24         while True:
25             try:
26                 self.ssh.connect(hostname=self.ip, port=self.port, username=self.username, password=self.password,
27                                  timeout=self.timeout)
28                 break
29             except Exception as e:
30                 print(e)
31                 continue
32 
33     # 该方法可以用来实时接收终端吐出的日志
34     def get_invoke_shell(self):
35         self.invoke_shells = self.ssh.invoke_shell()
36 
37     def send_normal_cmd(self, cmd):
38         """
39         发送执行完可以释放终端的命令并且获取返回值
40         :param cmd: str such as ls -l、touch test
41         :return: command return value
42         """
43         stdin, stdout, stderr = self.ssh.exec_command(cmd)
44         # 获取命令结果
45         res, err = stdout.read(), stderr.read()
46         result = res if res else err
47         # print(result.decode())
48         try:
49             return result.decode()
50         except Exception as e:
51             print(e)
52             return [result]
53 
54     def close_connection(self):
55         self.ssh.close()
View Code

接下来就是收到内容并且保存到本地

 1 import datetime
 2 import os
 3 
 4 
 5 def init_log_folder():
 6     # 获取到当天的日期用来创建文件
 7     time_now = str(datetime.datetime.today()).replace(" ", "_").replace(":", "_").split(".")[0]
 8     full_log = "all_log-" + time_now + ".log"
 9     log_folder_name = "log"
10     current_file_path = os.path.abspath(__file__)
11     current_folder_path = os.path.dirname(current_file_path)
12     root_path = os.path.dirname(current_folder_path)
13     # 在该脚本文件所在的文件夹的同级目录创建一个名字为log的文件夹
14     log_folder_path = os.path.join(root_path, log_folder_name)
15     sub_folder = os.path.join(log_folder_path, time_now)
16     full_log_path = os.path.join(sub_folder, full_log)
17 
18     # 不存在则创建
19     # 创建 log 文件夹
20     if os.path.exists(log_folder_path):
21         print("log file exist")
22         pass
23     else:
24         os.mkdir(log_folder_path)
25     # 创建子文件夹
26     if os.path.exists(sub_folder):
27         print("file exist")
28         pass
29     else:
30         os.mkdir(sub_folder)
31     with open(full_log_path, "a") as f:
32         f.write(" ")
33     return full_log_path
View Code

现在到了需要分别接收数据并且过滤数据进行展示的环节了,首先考虑到的是采用多进程,使用manager进行数据间的通信,考虑到保存的日志文件到一定大小需要重定向,因此使用到了 manager.dict()和manager.list()两种数据结构,进行数据展示的考虑到了字典结构,因为其需要不同服务的相同参数,考虑到实时修改以及存取处理,采用二级字典结构进行存储(这也出现了一点小插曲(可能的解释是深浅拷贝,但是不确定)),就是多进程间直接修改字典中的字典中的值不生效,想要修改只能生成一个临时字典,然后覆盖字典里面key对应的字典,

default_data = {
    "service1": {"cpu_slice": "空", "mem_usage": "空", "proc_req": "空", "response_time": "空"},
    "service2": {"cpu_slice": "空", "mem_usage": "空", "proc_req": "空", "response_time": "空"},
    "service3": {"cpu_slice": "空", "mem_usage": "空", "proc_req": "空", "response_time": "空"},
    "service4": {"cpu_slice": "空", "mem_usage": "空", "proc_req": "空", "response_time": "空"},
    "service5": {"cpu_slice": "空", "mem_usage": "空", "proc_req": "空", "response_time": "空"},
}
View Code

比如想要修改default_data["service5"]["cpu_slice"] = 50 在另一个进程中读取的时候就不会生效,想要生效只能这样做 default_data["service5"] = {"cpu_slice": 50, "mem_usage": "空", "proc_req": "空", "response_time": "空"}

最初的想法是这样的,第一个进程负责实时接收日志并且提取关键字并且更新字典数据结构,另一个进程每隔2秒打印一下数据以及判断是不是日志文件超过规定大小(500MB)

所以有了

 1 import re
 2 import time
 3 
 4 
 5 def write_log_content(file_path, origin_content):
 6     with open(file_path, "a") as f:
 7         current_time = time.ctime()
 8         f.write(current_time + origin_content)
 9 
10 
11 def demo_single_pic(storage_dict, file_list_info):  # 生成一个图标
12     # 常量
13     ssh = SSHClient('ip', username='tommy', password='')
14     ssh.connect_board()
15     ssh.get_invoke_shell()
16 
17     ssh.invoke_shells.send(
18         "cmd\n")
19     print("send 1 ok")
20     while 1:
21         # 以提取一种数据格式为例
22         content = ssh.invoke_shells.recv(2048).decode('ascii')
23         if content:
24             write_log_content(file_list_info[-1], content)
25             # first type of log
26             try:
27                 p = re.compile(
28                     r'service\s+\[(?P<service_num>\d+).*?cpu_slice.*?(?P<cpu_slice>\d+)')
29                 m = p.search(content)
30                 service_num, cpu_slice = int(m.group('service_num')), m.group("cpu_slice")
31 
32                 if storage_dict.get(service_num):
33                     tmp_dict = storage_dict[service_num]
34                     tmp_dict["cpu_slice"] = cpu_slice
35                     storage_dict[service_num] = tmp_dict
36                     continue
37             except Exception:
38                 pass
39             # second type of log
40         else:
41             continue
View Code

其中 第一个参数正是上面的那个default_data、第二个参数就是用来传入写入日志的文件名的

展示数据并且判断日志文件大小的进程

 1 import datetime
 2 import os
 3 import time
 4 
 5 
 6 def get_log_content(info_dict, file_list):
 7     while 1:
 8         print(info_dict)
 9         time.sleep(1)
10         if os.path.exists(file_list[-1]):
11             if os.path.getsize(file_list[-1]) / 1024 / 1024 > 200:
12                 new_file_name = file_list[-1].split('all_log-')[0]
13                 time_now = str(datetime.datetime.today()).replace(" ", "_").replace(":", "_").split(".")[0]
14                 full_log = new_file_name + 'all_log-' + time_now + ".log"
15                 file_list.append(full_log)
16                 file_list.pop(0)
17         else:
18             continue
View Code

入口文件

 1 if __name__ == '__main__':
 2     manager = multiprocessing.Manager()
 3     dict_info = manager.dict()
 4     file_list_info = manager.list()
 5     # TSC = 850461282707, BufferLen = 0, numExposures = 2, expTimeValid = 1, gainValid = 1
 6     dict_info.update({
 7         "service1": {"cpu_slice": "空", "mem_usage": "空", "proc_req": "空", "response_time": "空"},
 8         "service2": {"cpu_slice": "空", "mem_usage": "空", "proc_req": "空", "response_time": "空"},
 9         "service3": {"cpu_slice": "空", "mem_usage": "空", "proc_req": "空", "response_time": "空"},
10         "service4": {"cpu_slice": "空", "mem_usage": "空", "proc_req": "空", "response_time": "空"},
11         "service5": {"cpu_slice": "空", "mem_usage": "空", "proc_req": "空", "response_time": "空"},
12     })
13     # 初始化日志文件
14     sub_folder_all_log_path = init_log_folder()
15     file_list_info.append(sub_folder_all_log_path)
16     p = Process(target=demo_single_pic, args=(dict_info, file_list_info,))
17     p1 = Process(target=get_log_content, args=(dict_info, file_list_info,))
18     p.daemon = True
19     p1.daemon = True
20     jobs = [p1, p]
21     for job in jobs:
22         job.start()
23     for job in jobs:
24         job.join()
25     ...
View Code

至此,该需求完成。考虑到资源占用,后续的会换成多线程执行,结果待更新中,

 类似资料: