""" 本脚本完成对呼研所数据的处理,包含以下功能: 1. 数据读取与预处理 从传入路径中,进行数据和标签的读取,并进行初步的预处理 预处理包括为数据进行滤波、去噪等操作 2. 数据清洗与异常值处理 3. 输出清晰后的统计信息 4. 数据保存 将处理后的数据保存到指定路径,便于后续使用 主要是保存切分后的数据位置和标签 5. 可视化 提供数据处理前后的可视化对比,帮助理解数据变化 绘制多条可用性趋势图,展示数据的可用区间、体动区间、低幅值区间等 # 低幅值区间规则标定与剔除 # 高幅值连续体动规则标定与剔除 # 手动标定不可用区间提剔除 """ import sys from pathlib import Path sys.path.append(str(Path(__file__).resolve().parent.parent)) project_root_path = Path(__file__).resolve().parent.parent import shutil import draw_tools import utils import numpy as np import signal_method import os os.environ['DISPLAY'] = "localhost:10.0" def process_one_signal(samp_id, show=False): pass tho_signal_path = list((org_signal_root_path / f"{samp_id}").glob("Effort Tho_Sync_*.txt")) abd_signal_path = list((org_signal_root_path / f"{samp_id}").glob("Effort Abd_Sync_*.txt")) flowp_signal_path = list((org_signal_root_path / f"{samp_id}").glob("Flow P_Sync_*.txt")) flowt_signal_path = list((org_signal_root_path / f"{samp_id}").glob("Flow T_Sync_*.txt")) spo2_signal_path = list((org_signal_root_path / f"{samp_id}").glob("SpO2_Sync_*.txt")) stage_signal_path = list((org_signal_root_path / f"{samp_id}").glob("5_class_Sync_*.txt")) if not tho_signal_path: raise FileNotFoundError(f"Effort Tho_Sync file not found for sample ID: {samp_id}") tho_signal_path = tho_signal_path[0] print(f"Processing Effort Tho_Sync signal file: {tho_signal_path}") if not abd_signal_path: raise FileNotFoundError(f"Effort Abd_Sync file not found for sample ID: {samp_id}") abd_signal_path = abd_signal_path[0] print(f"Processing Effort Abd_Sync signal file: {abd_signal_path}") if not flowp_signal_path: raise FileNotFoundError(f"Flow P_Sync file not found for sample ID: {samp_id}") flowp_signal_path = flowp_signal_path[0] print(f"Processing Flow P_Sync signal file: {flowp_signal_path}") if not flowt_signal_path: raise FileNotFoundError(f"Flow T_Sync file not found for sample ID: {samp_id}") flowt_signal_path = flowt_signal_path[0] print(f"Processing Flow T_Sync signal file: {flowt_signal_path}") if not spo2_signal_path: raise FileNotFoundError(f"SpO2_Sync file not found for sample ID: {samp_id}") spo2_signal_path = spo2_signal_path[0] print(f"Processing SpO2_Sync signal file: {spo2_signal_path}") if not stage_signal_path: raise FileNotFoundError(f"5_class_Sync file not found for sample ID: {samp_id}") stage_signal_path = stage_signal_path[0] print(f"Processing 5_class_Sync signal file: {stage_signal_path}") label_path = (label_root_path / f"{samp_id}").glob("SA Label_Sync.csv") if not label_path: raise FileNotFoundError(f"Label_corrected file not found for sample ID: {samp_id}") label_path = list(label_path)[0] print(f"Processing Label_corrected file: {label_path}") # # # 保存处理后的数据和标签 save_samp_path = save_path / f"{samp_id}" save_samp_path.mkdir(parents=True, exist_ok=True) # # # 读取信号数据 tho_data_raw, tho_length, tho_fs, tho_second = utils.read_signal_txt(tho_signal_path, dtype=float, verbose=True) # abd_data_raw, abd_length, abd_fs, abd_second = utils.read_signal_txt(abd_signal_path, dtype=float, verbose=True) # flowp_data_raw, flowp_length, flowp_fs, flowp_second = utils.read_signal_txt(flowp_signal_path, dtype=float, verbose=True) # flowt_data_raw, flowt_length, flowt_fs, flowt_second = utils.read_signal_txt(flowt_signal_path, dtype=float, verbose=True) # spo2_data_raw, spo2_length, spo2_fs, spo2_second = utils.read_signal_txt(spo2_signal_path, dtype=float, verbose=True) stage_data_raw, stage_length, stage_fs, stage_second = utils.read_signal_txt(stage_signal_path, dtype=str, verbose=True) # # # 预处理与滤波 # tho_data, tho_data_filt, tho_fs = signal_method.psg_effort_filter(conf=conf, effort_data_raw=tho_data_raw, effort_fs=tho_fs) # abd_data, abd_data_filt, abd_fs = signal_method.psg_effort_filter(conf=conf, effort_data_raw=abd_data_raw, effort_fs=abd_fs) # flowp_data, flowp_data_filt, flowp_fs = signal_method.psg_effort_filter(conf=conf, effort_data_raw=flowp_data_raw, effort_fs=flowp_fs) # flowt_data, flowt_data_filt, flowt_fs = signal_method.psg_effort_filter(conf=conf, effort_data_raw=flowt_data_raw, effort_fs=flowt_fs) # 降采样 # old_tho_fs = tho_fs # tho_fs = conf["effort"]["downsample_fs"] # tho_data_filt = utils.downsample_signal_fast(original_signal=tho_data_filt, original_fs=old_tho_fs, target_fs=tho_fs) # old_abd_fs = abd_fs # abd_fs = conf["effort"]["downsample_fs"] # abd_data_filt = utils.downsample_signal_fast(original_signal=abd_data_filt, original_fs=old_abd_fs, target_fs=abd_fs) # old_flowp_fs = flowp_fs # flowp_fs = conf["effort"]["downsample_fs"] # flowp_data_filt = utils.downsample_signal_fast(original_signal=flowp_data_filt, original_fs=old_flowp_fs, target_fs=flowp_fs) # old_flowt_fs = flowt_fs # flowt_fs = conf["effort"]["downsample_fs"] # flowt_data_filt = utils.downsample_signal_fast(original_signal=flowt_data_filt, original_fs=old_flowt_fs, target_fs=flowt_fs) # spo2不降采样 # spo2_data_filt = spo2_data_raw # spo2_fs = spo2_fs label_data = utils.read_raw_psg_label(path=label_path) event_mask, score_mask = utils.generate_event_mask(signal_second=tho_second, event_df=label_data, use_correct=False, with_score=False) # event_mask > 0 的部分为1,其他为0 score_mask = np.where(event_mask > 0, 1, 0) # 根据睡眠分期生成不可用区间 wake_mask = utils.get_wake_mask(stage_data_raw) # 剔除短于60秒的觉醒区间 wake_mask = utils.remove_short_durations(wake_mask, time_points=np.arange(len(wake_mask) * stage_fs), min_duration_sec=60) # 合并短于120秒的觉醒区间 wake_mask = utils.merge_short_gaps(wake_mask, time_points=np.arange(len(wake_mask) * stage_fs), max_gap_sec=60) disable_label = wake_mask disable_label = disable_label[:tho_second] # 复制事件文件 到保存路径 sa_label_save_name = f"{samp_id}_" + label_path.name shutil.copyfile(label_path, save_samp_path / sa_label_save_name) # # 新建一个dataframe,分别是秒数、SA标签, save_dict = { "Second": np.arange(tho_second), "SA_Label": event_mask, "SA_Score": score_mask, "Disable_Label": disable_label, "Resp_LowAmp_Label": np.zeros_like(event_mask), "Resp_Movement_Label": np.zeros_like(event_mask), "Resp_AmpChange_Label": np.zeros_like(event_mask), "BCG_LowAmp_Label": np.zeros_like(event_mask), "BCG_Movement_Label": np.zeros_like(event_mask), "BCG_AmpChange_Label": np.zeros_like(event_mask) } mask_label_save_name = f"{samp_id}_Processed_Labels.csv" utils.save_process_label(save_path=save_samp_path / mask_label_save_name, save_dict=save_dict) if __name__ == '__main__': yaml_path = project_root_path / "dataset_config/HYS_PSG_config.yaml" # disable_df_path = project_root_path / "排除区间.xlsx" # conf = utils.load_dataset_conf(yaml_path) root_path = Path(conf["root_path"]) save_path = Path(conf["mask_save_path"]) select_ids = conf["select_ids"] # print(f"select_ids: {select_ids}") print(f"root_path: {root_path}") print(f"save_path: {save_path}") # org_signal_root_path = root_path / "PSG_Aligned" label_root_path = root_path / "PSG_Aligned" # # all_samp_disable_df = utils.read_disable_excel(disable_df_path) # # process_one_signal(select_ids[0], show=True) # # for samp_id in select_ids: print(f"Processing sample ID: {samp_id}") process_one_signal(samp_id, show=False) print(f"Finished processing sample ID: {samp_id}\n\n") pass