import pandas as pd import pyedflib from pathlib import Path def get_time_to_seconds(time_str): h, m, s = map(int, time_str.split(":")) return h * 3600 + m * 60 + s base_event = ["Hypopnea", "Central apnea", "Obstructive apnea", "Mixed apnea"] # 输入设置(每次运行此脚本都必须检查) dir_path = Path(r"D:\code\data") PSG_Data_Path = dir_path / "PSG" PSG_Label_Path = dir_path / "PSG_label" BCG_Data_Path = dir_path / "BCG" BCG_Label_Path = dir_path / "BCG_label" sampIDs = PSG_Label_Path.glob('*.*') for sampID in sampIDs: sampID = sampID.name.replace("export", "").replace(".csv", "") # 读取PSG标签 df_PSG_label = pd.read_csv(PSG_Label_Path / (f"export" + str(sampID) + ".csv"), encoding="gbk") df_PSG_label = df_PSG_label.loc[:, ~df_PSG_label.columns.str.contains('^Unnamed')] df_PSG_label = df_PSG_label[df_PSG_label["Event type"].isin(base_event)] df_PSG_label['Duration'] = df_PSG_label['Duration'].str.replace(r' \(.*?\)', '', regex=True) # 读取EDF文件 edf_File = pyedflib.EdfReader(str(PSG_Data_Path / f"A{str(sampID).rjust(7, '0')}.edf")) # 获取PSG记录开始时间 start_time = str(edf_File.getStartdatetime()).split(" ")[1] start_time_abs = get_time_to_seconds(start_time) # 计算起始时间秒数和终止时间秒数 df_PSG_label['Start'] = (df_PSG_label['Time'].apply(get_time_to_seconds) - start_time_abs).apply(lambda x: x + 24 * 3600 if x < 0 else x).astype(int) df_PSG_label['End'] = df_PSG_label['Start'] + df_PSG_label['Duration'].astype(float).round(0).astype(int) # 写入csv文件 df_PSG_label.to_csv(str(dir_path) + r"\BCG_label\export" + str(sampID) + "_all.csv", index=False, encoding="gbk") # 打印结果 print("sampID_" + str(sampID) + "写入csv成功")