42 lines
1.8 KiB
Python
42 lines
1.8 KiB
Python
import pandas as pd
|
|
import pyedflib
|
|
from pathlib import Path
|
|
|
|
def get_time_to_seconds(time_str):
|
|
h, m, s = map(int, time_str.split(":"))
|
|
return h * 3600 + m * 60 + s
|
|
|
|
base_event = ["Hypopnea", "Central apnea", "Obstructive apnea", "Mixed apnea"]
|
|
|
|
# 输入设置(每次运行此脚本都必须检查)
|
|
dir_path = Path(r"D:\code\data")
|
|
PSG_Data_Path = dir_path / "PSG"
|
|
PSG_Label_Path = dir_path / "PSG_label"
|
|
BCG_Data_Path = dir_path / "BCG"
|
|
BCG_Label_Path = dir_path / "BCG_label"
|
|
|
|
sampIDs = PSG_Label_Path.glob('*.*')
|
|
for sampID in sampIDs:
|
|
sampID = sampID.name.replace("export", "").replace(".csv", "")
|
|
# 读取PSG标签
|
|
df_PSG_label = pd.read_csv(PSG_Label_Path / (f"export" + str(sampID) + ".csv"), encoding="gbk")
|
|
df_PSG_label = df_PSG_label.loc[:, ~df_PSG_label.columns.str.contains('^Unnamed')]
|
|
df_PSG_label = df_PSG_label[df_PSG_label["Event type"].isin(base_event)]
|
|
df_PSG_label['Duration'] = df_PSG_label['Duration'].str.replace(r' \(.*?\)', '', regex=True)
|
|
|
|
# 读取EDF文件
|
|
edf_File = pyedflib.EdfReader(str(PSG_Data_Path / f"A{str(sampID).rjust(7, '0')}.edf"))
|
|
|
|
# 获取PSG记录开始时间
|
|
start_time = str(edf_File.getStartdatetime()).split(" ")[1]
|
|
start_time_abs = get_time_to_seconds(start_time)
|
|
|
|
# 计算起始时间秒数和终止时间秒数
|
|
df_PSG_label['Start'] = (df_PSG_label['Time'].apply(get_time_to_seconds) - start_time_abs).apply(lambda x: x + 24 * 3600 if x < 0 else x).astype(int)
|
|
df_PSG_label['End'] = df_PSG_label['Start'] + df_PSG_label['Duration'].astype(float).round(0).astype(int)
|
|
|
|
# 写入csv文件
|
|
df_PSG_label.to_csv(str(dir_path) + r"\BCG_label\export" + str(sampID) + "_all.csv", index=False, encoding="gbk")
|
|
|
|
# 打印结果
|
|
print("sampID_" + str(sampID) + "写入csv成功") |