from pathlib import Path from typing import Union import utils from .event_map import N2Chn import numpy as np import pandas as pd from .operation_tools import event_mask_2_list # 尝试导入 Polars try: import polars as pl HAS_POLARS = True except ImportError: HAS_POLARS = False def read_signal_txt(path: Union[str, Path], dtype, verbose=True, is_peak=False): """ Read a txt file and return the first column as a numpy array. Args: :param path: :param verbose: :param dtype: Returns: np.ndarray: The first column of the txt file as a numpy array. """ path = Path(path) if not path.exists(): raise FileNotFoundError(f"File not found: {path}") if HAS_POLARS: df = pl.read_csv(path, has_header=False, infer_schema_length=0) signal_data_raw = df[:, 0].to_numpy().astype(dtype) else: df = pd.read_csv(path, header=None, dtype=dtype) signal_data_raw = df.iloc[:, 0].to_numpy() signal_original_length = len(signal_data_raw) signal_fs = int(path.stem.split("_")[-1]) if is_peak: signal_second = None signal_length = None else: signal_second = signal_original_length // signal_fs # 根据采样率进行截断 signal_data_raw = signal_data_raw[:signal_second * signal_fs] signal_length = len(signal_data_raw) if verbose: print(f"Signal file read from {path}") print(f"signal_fs: {signal_fs}") print(f"signal_original_length: {signal_original_length}") print(f"signal_after_cut_off_length: {signal_length}") print(f"signal_second: {signal_second}") return signal_data_raw, signal_length, signal_fs, signal_second def read_label_csv(path: Union[str, Path], verbose=True) -> pd.DataFrame: """ Read a CSV file and return it as a pandas DataFrame. Args: path (str | Path): Path to the CSV file. verbose (bool): Returns: pd.DataFrame: The content of the CSV file as a pandas DataFrame. :param path: :param verbose: """ path = Path(path) if not path.exists(): raise FileNotFoundError(f"File not found: {path}") # 直接用pandas读取 包含中文 故指定编码 df = pd.read_csv(path, encoding="gbk") if verbose: print(f"Label file read from {path}, number of rows: {len(df)}") # 统计打标情况 # isLabeled=1 表示已打标 # Event type 有值的为PSG导出的事件 # Event type 为nan的为手动打标的事件 # score=1 显著事件, score=2 为受干扰事件 score=3 为非显著应删除事件 # 确认后的事件在correct_EventsType # 输出事件信息 按照总计事件、低通气、中枢性、阻塞性、混合型按行输出 格式为 总计/来自PSG/手动/删除/未标注 # Columns: # Index Event type Stage Time Epoch Date Duration HR bef. HR extr. HR delta O2 bef. O2 min. O2 delta Body Position Validation Start End score remark correct_Start correct_End correct_EventsType isLabeled # Event type: # Hypopnea # Central apnea # Obstructive apnea # Mixed apnea num_total = np.sum((df["isLabeled"] == 1) & (df["score"] != 3)) num_psg_events = np.sum(df["Event type"].notna()) num_manual_events = np.sum(df["Event type"].isna()) num_deleted = np.sum(df["score"] == 3) # 统计事件 num_unlabeled = np.sum(df["isLabeled"] == -1) num_psg_hyp = np.sum(df["Event type"] == "Hypopnea") num_psg_csa = np.sum(df["Event type"] == "Central apnea") num_psg_osa = np.sum(df["Event type"] == "Obstructive apnea") num_psg_msa = np.sum(df["Event type"] == "Mixed apnea") num_hyp = np.sum((df["correct_EventsType"] == "Hypopnea") & (df["score"] != 3)) num_csa = np.sum((df["correct_EventsType"] == "Central apnea") & (df["score"] != 3)) num_osa = np.sum((df["correct_EventsType"] == "Obstructive apnea") & (df["score"] != 3)) num_msa = np.sum((df["correct_EventsType"] == "Mixed apnea") & (df["score"] != 3)) num_manual_hyp = np.sum((df["Event type"].isna()) & (df["correct_EventsType"] == "Hypopnea")) num_manual_csa = np.sum((df["Event type"].isna()) & (df["correct_EventsType"] == "Central apnea")) num_manual_osa = np.sum((df["Event type"].isna()) & (df["correct_EventsType"] == "Obstructive apnea")) num_manual_msa = np.sum((df["Event type"].isna()) & (df["correct_EventsType"] == "Mixed apnea")) num_deleted_hyp = np.sum((df["score"] == 3) & (df["correct_EventsType"] == "Hypopnea")) num_deleted_csa = np.sum((df["score"] == 3) & (df["correct_EventsType"] == "Central apnea")) num_deleted_osa = np.sum((df["score"] == 3) & (df["correct_EventsType"] == "Obstructive apnea")) num_deleted_msa = np.sum((df["score"] == 3) & (df["correct_EventsType"] == "Mixed apnea")) num_unlabeled_hyp = np.sum((df["isLabeled"] == -1) & (df["Event type"] == "Hypopnea")) num_unlabeled_csa = np.sum((df["isLabeled"] == -1) & (df["Event type"] == "Central apnea")) num_unlabeled_osa = np.sum((df["isLabeled"] == -1) & (df["Event type"] == "Obstructive apnea")) num_unlabeled_msa = np.sum((df["isLabeled"] == -1) & (df["Event type"] == "Mixed apnea")) num_hyp_1_score = np.sum((df["correct_EventsType"] == "Hypopnea") & (df["score"] == 1)) num_csa_1_score = np.sum((df["correct_EventsType"] == "Central apnea") & (df["score"] == 1)) num_osa_1_score = np.sum((df["correct_EventsType"] == "Obstructive apnea") & (df["score"] == 1)) num_msa_1_score = np.sum((df["correct_EventsType"] == "Mixed apnea") & (df["score"] == 1)) num_hyp_2_score = np.sum((df["correct_EventsType"] == "Hypopnea") & (df["score"] == 2)) num_csa_2_score = np.sum((df["correct_EventsType"] == "Central apnea") & (df["score"] == 2)) num_osa_2_score = np.sum((df["correct_EventsType"] == "Obstructive apnea") & (df["score"] == 2)) num_msa_2_score = np.sum((df["correct_EventsType"] == "Mixed apnea") & (df["score"] == 2)) num_hyp_3_score = np.sum((df["correct_EventsType"] == "Hypopnea") & (df["score"] == 3)) num_csa_3_score = np.sum((df["correct_EventsType"] == "Central apnea") & (df["score"] == 3)) num_osa_3_score = np.sum((df["correct_EventsType"] == "Obstructive apnea") & (df["score"] == 3)) num_msa_3_score = np.sum((df["correct_EventsType"] == "Mixed apnea") & (df["score"] == 3)) num_1_score = np.sum(df["score"] == 1) num_2_score = np.sum(df["score"] == 2) num_3_score = np.sum(df["score"] == 3) if verbose: print("Event Statistics:") # 格式化输出 总计/来自PSG/手动/删除/未标注 指定宽度 print(f"Type {'Total':^8s} / {'From PSG':^8s} / {'Manual':^8s} / {'Deleted':^8s} / {'Unlabeled':^8s}") print( f"Hyp: {num_hyp:^8d} / {num_psg_hyp:^8d} / {num_manual_hyp:^8d} / {num_deleted_hyp:^8d} / {num_unlabeled_hyp:^8d}") print( f"CSA: {num_csa:^8d} / {num_psg_csa:^8d} / {num_manual_csa:^8d} / {num_deleted_csa:^8d} / {num_unlabeled_csa:^8d}") print( f"OSA: {num_osa:^8d} / {num_psg_osa:^8d} / {num_manual_osa:^8d} / {num_deleted_osa:^8d} / {num_unlabeled_osa:^8d}") print( f"MSA: {num_msa:^8d} / {num_psg_msa:^8d} / {num_manual_msa:^8d} / {num_deleted_msa:^8d} / {num_unlabeled_msa:^8d}") print( f"All: {num_total:^8d} / {num_psg_events:^8d} / {num_manual_events:^8d} / {num_deleted:^8d} / {num_unlabeled:^8d}") print("Score Statistics (only for non-deleted events and manual created events):") print(f"Type {'Total':^8s} / {'Score 1':^8s} / {'Score 2':^8s} / {'Score 3':^8s}") print(f"Hyp: {num_hyp:^8d} / {num_hyp_1_score:^8d} / {num_hyp_2_score:^8d} / {num_hyp_3_score:^8d}") print(f"CSA: {num_csa:^8d} / {num_csa_1_score:^8d} / {num_csa_2_score:^8d} / {num_csa_3_score:^8d}") print(f"OSA: {num_osa:^8d} / {num_osa_1_score:^8d} / {num_osa_2_score:^8d} / {num_osa_3_score:^8d}") print(f"MSA: {num_msa:^8d} / {num_msa_1_score:^8d} / {num_msa_2_score:^8d} / {num_msa_3_score:^8d}") print(f"All: {num_total:^8d} / {num_1_score:^8d} / {num_2_score:^8d} / {num_3_score:^8d}") df["Start"] = df["Start"].astype(int) df["End"] = df["End"].astype(int) return df def read_disable_excel(path: Union[str, Path]) -> pd.DataFrame: """ Read an Excel file and return it as a pandas DataFrame. Args: path (str | Path): Path to the Excel file. Returns: pd.DataFrame: The content of the Excel file as a pandas DataFrame. """ path = Path(path) if not path.exists(): raise FileNotFoundError(f"File not found: {path}") # 直接用pandas读取 df = pd.read_excel(path) df["id"] = df["id"].astype(int) df["start"] = df["start"].astype(int) df["end"] = df["end"].astype(int) return df def read_mask_execl(path: Union[str, Path]): """ Read an Excel file and return the mask as a numpy array. Args: path (str | Path): Path to the Excel file. Returns: np.ndarray: The mask as a numpy array. """ path = Path(path) if not path.exists(): raise FileNotFoundError(f"File not found: {path}") df = pd.read_csv(path) event_mask = df.to_dict(orient="list") for key in event_mask: event_mask[key] = np.array(event_mask[key]) event_list = {"RespAmpChangeSegment": event_mask_2_list(1 - event_mask["Resp_AmpChange_Label"]), "BCGAmpChangeSegment": event_mask_2_list(1 - event_mask["BCG_AmpChange_Label"]), "EnableSegment": event_mask_2_list(1 - event_mask["Disable_Label"]),} return event_mask, event_list def read_psg_channel(path_str: Union[str, Path], channel_number: list[int]): """ 读取PSG文件中特定通道的数据。 参数: path_str (Union[str, Path]): 存放PSG文件的文件夹路径。 channel_name (str): 需要读取的通道名称。 返回: np.ndarray: 指定通道的数据数组。 """ path = Path(path_str) if not path.exists(): raise FileNotFoundError(f"PSG Dir not found: {path}") if not path.is_dir(): raise NotADirectoryError(f"PSG Dir not found: {path}") channel_data = {} # 遍历检查通道对应的文件是否存在 for ch_id in channel_number: ch_name = N2Chn[ch_id] ch_path = list(path.glob(f"{ch_name}*.txt")) if not any(ch_path): raise FileNotFoundError(f"PSG Channel file not found: {ch_path}") if len(ch_path) > 1: print(f"Warning!!! PSG Channel file more than one: {ch_path}") if ch_id == 8: # sleep stage 特例 读取为整数 ch_signal, ch_length, ch_fs, ch_second = read_signal_txt(ch_path[0], dtype=str, verbose=True) # 转换为整数数组 for stage_str, stage_number in utils.Stage2N.items(): np.place(ch_signal, ch_signal == stage_str, stage_number) ch_signal = ch_signal.astype(int) elif ch_id == 1: # Rpeak 特例 读取为整数 ch_signal, ch_length, ch_fs, ch_second = read_signal_txt(ch_path[0], dtype=int, verbose=True, is_peak=True) else: ch_signal, ch_length, ch_fs, ch_second = read_signal_txt(ch_path[0], dtype=float, verbose=True) channel_data[ch_name] = { "name": ch_name, "path": ch_path[0], "data": ch_signal, "length": ch_length, "fs": ch_fs, "second": ch_second } return channel_data def read_psg_label(path: Union[str, Path], verbose=True): path = Path(path) if not path.exists(): raise FileNotFoundError(f"File not found: {path}") # 直接用pandas读取 包含中文 故指定编码 df = pd.read_csv(path, encoding="gbk") if verbose: print(f"Label file read from {path}, number of rows: {len(df)}") # 丢掉Event type为空的行 df = df.dropna(subset=["Event type"], how='all').reset_index(drop=True) return df