记录下如何基于NumPynpz文件解析成多个csv文件,以自己项目中使用的npz文件结构为例,其它地方使用时可能会有差异

执行下述代码分析其key结构

1
2
3
4
5
if __name__ == '__main__':
    src_file = r'D:\test-001.npz'
    results = np.load(src_file)
    for key, value in results.items():
        print(key)

输出结果类似如下

1
2
3
4
5
6
CAN/2/localmap1__message394/left_line_id_11
CAN/2/localmap1__message394/left_line_id_12
CAN/2/localmap1__message394/left_line_id_13
CAN/2/localmap1__message396/_timestamp
CAN/2/localmap1__message396/timeStamp
CAN/2/localmap1__message396/right_line_id_15

基于此可发现其组织结构为协议/通道/序号/信号/报文,我们要做的时基于以信号为单位生成csv文件

改进测试代码

1
2
3
4
5
if __name__ == '__main__':
    src_file = r'D:\test-001.npz'
    results = np.load(src_file)
    for key, value in results.items():
        print(key + "\t" + str(len(value)))

执行后输出结果类似如下

1
2
3
4
5
6
CAN/2/localmap1__message394/left_line_id_11	322
CAN/2/localmap1__message394/left_line_id_12	322
CAN/2/localmap1__message394/left_line_id_13	322
CAN/2/localmap1__message396/_timestamp	350
CAN/2/localmap1__message396/timeStamp	350
CAN/2/localmap1__message396/right_line_id_15	350

可发现一个报文对应多行记录,且每个信号下的报文记录数基本上相同。

同时也能发现在npz中的数据是一行一行的,逐行拼接在一起,若要转化为csv文件,则涉及到行专列操作,同样可基于NumPy进行操作,完整的代码如下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import os
import shutil
import numpy as np
import time
import sys

def parse_npz(data_folder, src_file):
    if os.path.exists(data_folder):
        shutil.rmtree(data_folder)
    os.makedirs(data_folder)

    results = np.load(src_file)
    dict = {}
    message_name = None
    for key, value in results.items():
        data = key.split("/")

        # 长度不符合要求,直接跳过
        if (len(data)) != 4:
            continue
        new_message = data[-2]

        # 新的信号开始,要处理之前的信号
        if new_message != message_name:
            process_message(data_folder, message_name, dict)
            message_name = new_message
            dict[message_name] = []
        signal = str(data[-1])
        is_timestamp = signal == '_timestamp'
        if is_timestamp:
            value = np.multiply(value, 100_0000).astype(np.int64)
            value = value - value[0]
            signal = 'Timestamp'
        dict[message_name].append([signal, *value])

    # 处理最后一个信号
    process_message(data_folder, message_name, dict)


def process_message(folder, message, dict):
    if message is None:
        return
    signals = dict[message]
    length = min(len(v) for v in signals)
    signals = [v[0:length] for v in signals]

    # 将行转化为列,以符合业务需求
    csv_data = np.array(signals).swapaxes(0, 1)

    # 保存到磁盘
    file_name = folder + os.sep + message + ".csv"
    np.savetxt(file_name, csv_data, delimiter=",", fmt="%s")
    # print('-------------save file to ' + file_name + '---------------')
    del dict[message]


if __name__ == '__main__':
    #data_folder = sys.argv[1]
    #src_file = sys.argv[2]
    data_folder = r'D:\npz_csv'
    src_file = r'D:\test-001.npz'
    start = time.time()
    parse_npz(data_folder, src_file)
    end = time.time()
    print(f'=============== Parse npz file time cost:{end - start:.4f}s =============')