当前位置: 首页 > 工具软件 > ape-j > 使用案例 >

Python:处理车辆控制单元MDF(CANApe数据)记录文件思路_更新版

斜宁
2023-12-01

1 功能更新

      关于Python处理车辆控制单元MDF文件,对程序进行更新,其功能如下:

      1. 查找MDF文件中特定信号最大值;
       2. 计算平均车速和里程,使用matplotlib作图;
       3. 计算两信号差值,进行波峰判断并记录峰值个数,避免信号毛刺干扰,设定差值门限值,使用matplotlib作图(含主次坐标);
      4. 以上信息直接写入word保存。

2 源程序

      更新源程序:

"""
    名称:  CheckMDF.py
    作者:  Morven_Xie
    版本:  1.1
    时间:  2020/7/21 1:49
    更新:  2020/8/2 1:35
    功能:  用于车辆排查车辆VCU记录的MDF文件
    简介:  ·温度信号A峰值
            ·温度信号A与油底壳油温温度信号B差值大于门限值的峰值数量及时间
            ·通过车速信号计算车辆行驶里程
    Email:  morven_xie@163.com
"""


# coding=utf-8
from   asammdf import MDF                                    # 用于处理MDF文件
import os                                                    # 用于获取文件路径
import pandas as pd                                          # 用于数据分析
import numpy as np                                           # 用于数组处理
from numpy import trapz                                      # 可用于积分求面积
from matplotlib import pyplot as plt                         # 用于作图 
import datetime as dt                                        # 用于调用系统时间
import docx                                                  # 调用word文档库
from docx.shared import Inches                               # 用于设置图片尺寸


def check_MDF(Document_Adress):                              # 定义check_MDF函数
    print('文件名: '+Adress_str+',相关统计信息如下:')       # 提取地址中文件名信息,打印

    new_Doc = document.add_paragraph\
        ('文件名: '+Adress_str+',相关统计信息如下:')        # 新增段落文本


    MDF_File=MDF(Document_Adress)                            # 使用asammdf第三方包调取.mdf文件内容
    TCOil= MDF_File.get("TCOil")                             # 调用.mdf中零件处油温Signal   
    TOil=MDF_File.get("TOil")                                # 调用.mdf中油底壳温度Signal
    VehSpd=MDF_File.get("VehSpd")                            # 调用.mdf中车速Signal

    VehSpdCheck(VehSpd)                                      # check_MDF()函数调用VehSpdCheck()函数
    Signal_Data_Max(TCOil)                                   # check_MDF()函数调用Signal_Data_Max()函数
    T_Subtract_Peak(TCOil, TOil, 20)                         # check_MDF()函数调用T_Subtract_Peak()函数

    document.save(Veh_VIN+'.docx')                           # 运行完程序后保存word文档

def timeconvert(second):                                     # 将时间秒转化str类型的时分秒
    Timestr =str(int(second/3600)) + 'h ' + \
             str(int((second%3600)/ 60)) + 'min ' + \
             str(int((second%3600) % 60)) + 's'
    return Timestr

def Signal_Data_Max(Signal):                                 # 定义Signal_Data_Max()函数
    x=Signal.timestamps                                      # 提取Signal中所有时刻值存放列表x()中
    y=Signal.samples                                         # 提取Signal中所有实测值存放列表y()中
    T_C_Max=np.max(y)                                        # 使用Numpy库中max()函数求y中数据的最大值
    Positon=np.where(y==np.max(y))                           # 找到y最大值对应的索引值(可理解为序号)
    Time=(x[Positon[0][0]])                                  # 提取索引值对应时刻

    print('零件处油温最大值%d' %T_C_Max+\
          '时间是'+timeconvert(Time))                        # 打印零件处油温最大值及对应的时间(秒转化为分钟)

    new_Doc = document.add_paragraph\
        ('零件处油温最大值为 '+str(T_C_Max)+' ℃,'\
         '时间是'+timeconvert(Time),style='List Bullet')     # 新增段落文本

def VehSpdCheck(Singal):                                     # 定义Signal_Data_Max()函数
    x=Singal.timestamps                                      # 提取Signal中所有时刻值存放列表x()中
    #print(Singal.display_name)
    y=Singal.samples                                         # 提取Signal中所有实测值存放列表y()中
    Average_Vehspd=np.sum(y)/len(y)                          # 使用Numpy库中sum()函数求和后除以点的个数
    Drive_Distance=Average_Vehspd*x[-1]/3600                 # 行驶里程通过均值与时间乘积获得
    print('该文件平均车速 %.2f km/h,' %Average_Vehspd+\
    '行驶里程 %.2f km' %Drive_Distance)                      # 打印平均车速和行驶里程(使用换行\)

    Average_Vehspd_str=str(round(Average_Vehspd, 2))         # 平均速度保留两位,转化为字符串
    Drive_Distance_str =str(round( Drive_Distance, 2))       # 行驶里程保留两位,转化为字符串
    new_Doc = document.add_paragraph\
        ('该文件平均车速 '+ Average_Vehspd_str+' km/h,'+\
         '行驶里程= '+Drive_Distance_str+' km'+',见下图:',style='List Bullet')
                                                             # 新增段落文本

    plt.figure(Adress_str+'--'+Singal.display_name)          # 图表命名为文件名加信号名称
    plt.axis([np.min(x),np.max(x),0,np.max(y)])              # 使用pyplot函数定义x轴和y轴最大及最小值
    plt.plot(x,y,label='$VehSpd$')                           # 使用pyplot作图,图例为VehSpd
    plt.xlabel('t (s) ')                                     # 增加X轴-坐标轴标题
    plt.ylabel('VehicleSpeed (km/h) ')                       # 增加y轴-坐标轴标题

    plt.fill_between(x,y1=y,y2=0,facecolor='purple',\
                     alpha=0.2)                              # 对y与x围成面积着色,展示速度求积分→里程


    plt.text(x[-1]/3,np.max(y)-3, 'DriveDistance:%.2f km' \
             %Drive_Distance, \
             fontdict={'size': 10, 'color': 'red'})          #  x轴坐标,y轴坐标,显示内容,字体大小、颜色

    plt.title(Adress_str)                                    # 将全局变量信息增加入表头
    #plt.title(Adress_str+'__'+'(DriveDistance: %.2f '%Drive_Distance+' km)')
    plt.legend(loc='upper right')                            # 图例选择最佳位置,可选择best
    #plt.show()                                              # 显示图例
    picture_name=Adress_str +'-'+ \
                 Singal.display_name+'.png'                  # 图片保存时名称
    plt.savefig(picture_name, dpi=200)                       # 保存图片

    new_Doc = document.add_picture(picture_name, width=Inches(5.0))
                                                             # 图片写入word
    os.remove(picture_name)                                  # 删除图片文件

def T_Subtract_Peak(T1,T2,threshold):
    # T1.plot()                                              # 使用ASAMMDF自带Plot制图
    # T2.plot()
    x = T1.timestamps                                        # 提取信号T1中所有时刻值的点存放列表x()中
    y1 = T1.samples                                          # 提取信号T1中所有信号值的点存放列表y()中
    y2 = T2.samples                                          # 提取信号T2中所有信号值的点存放列表y()中
    k = len(x)                                               # 统计数据的总个数

    a = 0                                                    # 定义数字a
    i = 0                                                    # 定义数字i
    m = 0                                                    # 定义数字m

    T_Up = []                                                # 定义T_Up为列表
    for a in range(0, k):
        T_Up.append(y1[a] - y2[a])                           # 将所有温差点写入T_Up
    y3 = T_Up

    while i < k - 1:                                         # 从0到k,使用while循环,条件满足一直执行
        if int(T1.samples[i] - T2.samples[i]) >= \
                threshold:                                   # T1和T2在某时刻温差大于门限值,执行一次
            j = i                                            # 过滤信号毛刺,寻找大于门限值波峰个数
            while j < k - 1:
                j += 1
                if int(T1.samples[j] - \
                       T2.samples[j]) < threshold - 3:        # 设置差值为2作为过滤信号
                    m = m + 1                                 # 记录波峰个数
                    time = int(j / k * T1.timestamps[-1] / 60)
                    print('该文件零件处油温与油底壳温差大于' + str(threshold) + '的波峰第 %d 处,' % m + \
                          '时间是 %d s,' % time + '零件处油温 %d ℃' % T1.samples[j])

                    new_Doc = document.add_paragraph \
                        ('该文件零件处油温与油底壳温差大于'\
                         + str(threshold) + '℃的波峰第' +\
                         str(m) + '处,时间是' + timeconvert(time))
                                                              # 新增段落文本

                    i = j                                     # 局部变量i更新为j

                    break                                     # 循环退出
        i = i + 1                                             # 以上程序不执行或者退出,执行i自加

    plt.figure(Adress_str + '—' + T1.display_name +
               '-' + T1.display_name)                         # 图表命名为文件名加两信号名称

    fig = plt.figure(num=1, figsize=(15, 8), dpi=80)         # 开启一个窗口,同时设置大小,分辨率
    fig, ax1 = plt.subplots()                                # 作1个图
    ax2 = ax1.twinx()                                        # 坐标轴2为次要坐标
    ax1.plot(x, y1, '-', color='g', label='TCOil')           # 绘制曲线y1,含线型、颜色、图例名称
    ax1.plot(x, y2, '-', color='b', label='TOil')            # 绘制曲线y2
    ax2.plot(x, y3, '-', color='r', label='T_Up')            # 绘制曲线y3
    ax1.legend(loc='upper left')                             # 显示图例位置,plt.legend(),左上
    ax2.legend(loc='upper right')                            # 显示图例位置,右上

    # ax1.set_xlim(-5,5)
    ax1.set_ylim(np.min(y2) - 5, np.max(y1) + 5)             # 设定y1轴最大最小值
    ax2.set_ylim(np.min(y3) - 5, np.max(y3) + 5)             #  同上

    ax1.set_title(Adress_str)                                # 图表表头

    ax1.set_xlabel('time(s)')                                # 设定x轴标题
    ax1.set_ylabel("TC_Oil & T_Oil (℃)", color='b')         #  设定y1轴标题
    ax2.set_ylabel("T_Up(℃)", color='r')                    # 设定y2轴标题
    # plt.show()                                             # 参考之前注释

    picture_name = Adress_str + '-' + T1.display_name + '-' + T2.display_name +'.png'
    plt.savefig(picture_name, dpi=100)                       # 保存图片文件到文件夹

    new_Doc = document.add_picture(picture_name, width=Inches(6.0))
                                                             # 保存图片到word
    os.remove(picture_name)                                  # 删除图片文件

file_path = 'D:\SoftApp\Python\HardWay2StudyPython\MDFData'  # 文件所在文件夹
now_time=dt.datetime.now().strftime('%F  %T')                # 调用系统时间
Veh_VIN=input("请在下方输入故障排查车辆VIN数字编号后,回车:") # 提示输入车辆信息
print(Veh_VIN+"车辆"+'数据分析时间:'+now_time)               # 打印输出车辆和系统时间

document = docx.Document()                                  # 创建word文档对象
document.add_heading(Veh_VIN+'车排查信息', 0)                # 添加文档标题
new_Doc= document.add_paragraph \
    ('主要排查行驶里程、最高油温、\
    零件处与变速箱油温差值出现的峰值数。 ')                    # 新增段落文本,字符\用于换行
new_Doc=document.add_paragraph\
     (Veh_VIN+"车辆"+'数据分析时间:'+ now_time)              # 新增段落文本


for root, dirs, files in os.walk(file_path):                 # os.walk返回三个对象: dirpath(目录路径,string类型) ;
                                                             # dirname(多个子目录名,列表); filename(多个文件名,列表)
    Documents=[os.path.join(root, name) for name in files]   # 遍历文件名存放list(列表)类型的Documets

for Document_Path in Documents:                              # 遍历文件名对应的地址(假设有3个文件地址)
    # print(Document_path.info())                            # 打印MDF文件信号,例如可以看到有哪些信号
    print('-'*60)                                            # 打印分割行

    new_Doc = document.add_paragraph('-'*100)                # 新增段落文本,字符\用于换行
    new_Doc = document.add_paragraph('-' * 100)              # 新增段落文本,字符\用于换行

    Adress=Document_Path.split('\\')[-1]                     # 提取路径中的含后缀的文件名
    Adress_str=Adress.split('.')[0]                          # 提取路径中的文件名

    check_MDF(Document_Path)                                 # 调用def check_MDF()函数

3 巩固练习

      功能车辆行车数据采集设备记录多文件MF4,特定功能下触发,记录该段数据的最大横纵向加速,并作图。

"""
Check the testdata from ADAS_MDF
"""
# coding=utf-8
import os
from asammdf import MDF
from matplotlib import pyplot as plt

# read the document from the file
def Read_Docmt_Addr():
    File_Path='D:\SoftApp\Python\MDF_ADAS\MF4_Data'
    Document_Name=os.listdir(File_Path)
    Document_Address=[]
    for i in range(0,len(Document_Name)):
        Document_Address.append(File_Path+'/'+Document_Name[i])
    return Document_Address

# Filter the signal and plot the accelerate data when auto lane change
def Data_Filter_Function(Document_Addr):
    print(Document_Addr.split('/')[-1])
    MDF_File = MDF(Document_Addr)
    Signal_VehSpd = MDF_File.get("VehSpd_Signal_Name")
    Signal_Turn_Lamp = MDF_File.get("Signal_Turn_Lamp_Signal_Name")
    Signal_Turn_Lamp.plot()
    Signal_Lateral_Acc= MDF_File.get("Signal_Lateral_Acc_Signal_Name")
    Signal_Longitudinal_Acc= MDF_File.get("Signal_Longitudinal_Acc_Signal_Name")

    Signal_Data_Num=len(Signal_Turn_Lamp.timestamps)
    i=0
    Lamp_Threshold=6
    Signal_Burr=-3
    while i <Signal_Data_Num:
        if Signal_Turn_Lamp.samples[i]>=Lamp_Threshold:
            t1=Signal_Turn_Lamp.timestamps[i]
            j=i
            while j <Signal_Data_Num:
                if Signal_Turn_Lamp.samples[j] <Lamp_Threshold+Signal_Burr:
                    t2 = Signal_Turn_Lamp.timestamps[j]
                    Delta_t=t2-t1
                    print('换道时间:%.2f s'% (Delta_t))
                    # print(Signal_Lateral_Acc.samples[i])
                    MaxValue_lateral = max(Signal_Lateral_Acc.samples[i:j])
                    MaxValue_longitudinal = max(Signal_Longitudinal_Acc.samples[i:j])
                    print('最大横向和纵向加速度分别是:%.2f m/s^2和 %.2f m/s^2' %(MaxValue_lateral,MaxValue_longitudinal))
                    MDF_file_New = MDF_File.cut(Signal_Turn_Lamp.timestamps[i], Signal_Turn_Lamp.timestamps[j])
                    Filter_Signal_Figure(MDF_file_New)
                    i = j
                    break
                else:
                    j = j + 1
        i = i + 1

def Filter_Signal_Figure(MDF_file):
    Signal_Lateral_Acc_New = MDF_file.get("Signal_Lateral_Acc_Signal_Name")
    Signal_Longitudinal_Acc_New = MDF_file.get("Signal_longitudinal_acc_Signal_Name")
    x_axis = Signal_Lateral_Acc_New.timestamps
    y_axis_1 = Signal_Lateral_Acc_New.samples
    y_axis_2 = Signal_Longitudinal_Acc_New.samples

    plt.figure(str('变道时横纵向加速'))
    fig = plt.figure(num=1, figsize=(15, 8), dpi=100)
    ax = fig.add_subplot(1, 1, 1)
    plt.xlabel('t(s)')
    ax.set_axis_on()
    ax.plot(x_axis, y_axis_1, '-', color='g', label='Lateral_Acc')
    ax.legend(loc='upper left')
    ax.set_ylabel('Lateral(m/s^2)')
    ax2 = ax.twinx()
    ax2.plot(x_axis, y_axis_2, '-', color='b', label='Longitudinal_Acc')  # 绘制曲线y2
    ax2.legend(loc='upper right')
    ax2.set_ylabel('Longitudinal(m/s^2)')

    plt.savefig('Figure')
    plt.show()
    plt.close('all')

if __name__ == "__main__":
    for Addr in Read_Docmt_Addr():
        Data_Filter_Function(Addr)

      读取MDF文件所有信号的名称。

# Get signal name from MDF Document
 def Get_Signal_Name(MDF_File_Address):
     MDF_File = MDF(MDF_File_Address)
     Singal_Library=MDF_File.info()
     Signal_List=[]
     Signal_List_Number = Singal_Library['groups']
     for i in range(0,Signal_List_Number):
         Signal_List.append(Singal_Library[str('group '+str(i))]['channel 1'].split('"')[1])
     print(Signal_List)

>> 更多相关内容,点击Morven_Xie博客概览

 类似资料: