Compare commits

...

3 Commits

Author SHA1 Message Date
a42a482e1c Merge remote-tracking branch 'origin/main' 2026-03-24 21:18:21 +08:00
0b87f36d17 feat: 添加 .gitignore 文件以排除不必要的文件和目录 2026-03-24 21:15:15 +08:00
8ee5980906 feat: Add utility functions for signal processing and event mapping
- Created a new module `utils/__init__.py` to consolidate utility imports.
- Added `event_map.py` for mapping apnea event types to numerical values and colors.
- Implemented various filtering functions in `filter_func.py`, including Butterworth, Bessel, downsampling, and notch filters.
- Developed `operation_tools.py` for dataset configuration loading, event mask generation, and signal processing utilities.
- Introduced `split_method.py` for segmenting data based on movement and amplitude criteria.
- Added `statistics_metrics.py` for calculating amplitude metrics and generating confusion matrices.
- Included a new Excel file for additional data storage.
2026-03-24 21:15:05 +08:00
8 changed files with 923 additions and 49 deletions

470
AGENT_BACKGROUND_CN.md Normal file
View File

@ -0,0 +1,470 @@
# 项目背景文档(供 Agent 快速上手)
## 1. 文档目的
这份文档用于帮助新进入仓库的 agent 快速建立项目心智模型,重点回答下面几个问题:
- 这个仓库在做什么
- 核心处理链路是什么
- 代码从哪里开始看
- 输入数据长什么样,输出产物长什么样
- 哪些模块已经可用,哪些模块还只是占位或半成品
---
## 2. 一句话理解项目
`DataPrepare` 是一个睡眠呼吸暂停数据预处理仓库,主要用于把原始的 BCG / PSG 信号与呼吸暂停标签整理成可训练的数据集。
当前主线可以概括为两步:
1. 在 `event_mask_process/` 中把原始标注和规则检测结果整理成逐秒标签掩码。
2. 在 `dataset_builder/` 中根据这些掩码切出固定长度窗口,并将信号和窗口索引保存为 `npz` 数据集。
---
## 3. 仓库总览
| 目录 | 作用 | 备注 |
| --- | --- | --- |
| `dataset_config/` | 各数据集的 YAML 配置 | 包含路径、采样率、阈值、切窗参数 |
| `event_mask_process/` | 生成逐秒标签掩码 | 主入口之一 |
| `dataset_builder/` | 将信号切成窗口并保存数据集 | 主入口之二 |
| `signal_method/` | 信号预处理、规则检测、特征计算 | 规则逻辑核心 |
| `utils/` | 读文件、标签转换、切窗、滤波、通用工具 | 底层支撑 |
| `draw_tools/` | 全夜信号图、分段图、统计图 | 调试与可视化 |
| `dataset_tools/` | 辅助脚本 | 包括配对拷贝、SHHS 标注检查 |
| `output/` | 中间结果样例 | 仓库内已有若干处理结果示例 |
---
## 4. 项目主线流程
### 4.1 HYSBCG 主线)
对应文件:
- `event_mask_process/HYS_process.py`
- `dataset_builder/HYS_dataset.py`
处理过程:
1. 从 `root_path/OrgBCG_Aligned/<id>/OrgBCG_Sync_*.txt` 读取原始 BCG 同步信号。
2. 从 `root_path/Label/<id>/SA Label_corrected.csv` 读取人工修正后的呼吸暂停标签。
3. 对原始 BCG 进行 50Hz 陷波,再拆出:
- 呼吸分量 `resp_data`
- BCG 分量 `bcg_data`
4. 根据配置做规则检测,生成逐秒掩码:
- `Resp_LowAmp_Label`
- `Resp_Movement_Label`
- `Resp_AmpChange_Label`
- `BCG_LowAmp_Label`
- `BCG_Movement_Label`
- `BCG_AmpChange_Label`
- `Disable_Label`(来自 `排除区间.xlsx`
- `SA_Label` / `SA_Score`(来自人工标签)
5. 将这些结果保存为 `output/HYS/<id>/<id>_Processed_Labels.csv`
6. 在数据集构建阶段读取上述逐秒标签,按 `window_sec``stride_sec` 切成窗口。
7. 保存处理后的信号 `Signals/*.npz`、窗口索引 `Segments_List/*.npz`、标签副本 `Labels/*.csv`
### 4.2 HYS_PSGPSG 主线)
对应文件:
- `event_mask_process/HYS_PSG_process.py`
- `dataset_builder/HYS_PSG_dataset.py`
处理过程:
1. 从 `root_path/PSG_Aligned/<id>/` 读取 PSG 通道,包括:
- `Rpeak`
- `ECG_Sync`
- `Effort Tho`
- `Effort Abd`
- `Flow P`
- `Flow T`
- `SpO2`
- `5_class`
2. 从同目录读取 `SA Label_Sync.csv`
3. 在 `HYS_PSG_process.py` 中生成较简化的逐秒标签:
- `SA_Label`
- `SA_Score`
- `Disable_Label`(主要由睡眠分期里的清醒期生成)
- 其余 `Resp_*` / `BCG_*` 掩码目前全部置零
4. 在 `HYS_PSG_dataset.py` 中对胸腹带、流量、SpO2、RRI 做统一重采样与长度对齐。
5. 对 SpO2 进行异常填补或置空,再切成窗口,保存为 PSG 数据集。
### 4.3 设计上的共性
无论是 HYS 还是 HYS_PSG核心设计都是
1. 先把整夜记录整理成逐秒掩码。
2. 再根据掩码和切窗规则提取训练片段。
这意味着以后如果要改“可用性判断”或“切窗逻辑”,优先看:
- `event_mask_process/`
- `utils/split_method.py`
---
## 5. 数据与目录约定
### 5.1 外部数据目录
这个仓库本身不包含原始数据,真正的数据目录由 YAML 中的绝对路径指定,例如:
- `/mnt/disk_wd/marques_dataset/DataCombine2023/HYS`
- `/mnt/disk_wd/marques_dataset/shhs/polysomnography/shhs1`
因此在新环境运行时,第一件事通常不是改代码,而是先改 `dataset_config/*.yaml` 里的绝对路径。
### 5.2 HYS 数据目录约定
代码默认外部目录至少包含:
- `OrgBCG_Aligned/<id>/OrgBCG_Sync_<fs>.txt`
- `PSG_Aligned/<id>/...`
- `Label/<id>/SA Label_corrected.csv`
### 5.3 HYS_PSG 数据目录约定
代码默认 `PSG_Aligned/<id>/` 内文件命名符合下面的模式:
- `Rpeak*.txt`
- `ECG_Sync*.txt`
- `Effort Tho*.txt`
- `Effort Abd*.txt`
- `Flow P*.txt`
- `Flow T*.txt`
- `SpO2*.txt`
- `5_class*.txt`
- `SA Label_Sync.csv`
### 5.4 处理中间结果目录
仓库内 `output/` 保存的是“中间标签与图像结果”,不是最终训练数据集。
最终数据集通常会写到 YAML 中 `dataset_save_path` 指向的外部目录。
---
## 6. 核心入口脚本
### 6.1 主入口
| 场景 | 脚本 |
| --- | --- |
| HYS 逐秒标签生成 | `event_mask_process/HYS_process.py` |
| HYS 数据集构建 | `dataset_builder/HYS_dataset.py` |
| HYS_PSG 逐秒标签生成 | `event_mask_process/HYS_PSG_process.py` |
| HYS_PSG 数据集构建 | `dataset_builder/HYS_PSG_dataset.py` |
### 6.2 辅助脚本
| 脚本 | 作用 |
| --- | --- |
| `dataset_tools/resp_pair_copy.py` | 将 HYS/ZD5Y 的 BCG 与 PSG 原始文件拷贝为配对数据集 |
| `dataset_tools/shhs_annotations_check.py` | 统计 SHHS XML 标注中的事件类型组合 |
| `event_mask_process/SHHS1_process.py` | SHHS1 处理入口占位,目前未实现 |
### 6.3 入口脚本的共同特点
- 大多数脚本没有命令行参数接口。
- 配置文件路径通常直接写在 `if __name__ == '__main__':` 中。
- 运行逻辑依赖 YAML 中的绝对路径。
所以如果 agent 要“跑脚本”,通常需要先确认:
1. 当前机器是否存在对应外部数据目录。
2. YAML 路径是否匹配当前环境。
---
## 7. 关键模块说明
### 7.1 `utils/`
#### `utils/HYS_FileReader.py`
负责各种输入文件读取,是仓库最重要的底层模块之一:
- `read_signal_txt`:读取单通道 txt并根据文件名中的采样率推断 `fs`
- `read_label_csv`:读取人工修正的 HYS 标签
- `read_raw_psg_label`:读取 PSG 原始同步标签
- `read_disable_excel`:读取 `排除区间.xlsx`
- `read_mask_execl`:读取处理后的逐秒标签 CSV并生成事件片段列表
- `read_psg_channel`:按通道名读取 PSG 文件夹里的多通道数据
#### `utils/operation_tools.py`
负责标签转换、片段提取和通用处理:
- `load_dataset_conf`:读取 YAML
- `generate_event_mask`:把事件表转换成逐秒 `SA_Label`
- `generate_disable_mask`:把 Excel 中的排除区间转换成逐秒 `Disable_Label`
- `event_mask_2_list`:把 0/1 掩码转为 `[start, end]` 列表
- `merge_short_gaps` / `remove_short_durations`:对逐秒掩码做时长后处理
- `fill_spo2_anomaly`:修补 SpO2 异常段
#### `utils/split_method.py`
真正的切窗规则在这里:
- 默认按 `window_sec` / `stride_sec` 滑窗
- 只在 `EnableSegment` 内生成可用窗口
- 如果一个窗口中 `Resp_Movement_Label | Resp_LowAmp_Label` 超过窗口时长的 2/3则该窗口转入 `disable_segment_list`
#### `utils/filter_func.py`
提供滤波和采样率处理:
- Butterworth / Bessel
- 陷波
- 整数倍降采样
- 自动升降采样
- 移动平均去趋势
### 7.2 `signal_method/`
#### `signal_method/signal_process.py`
负责信号预处理:
- `signal_filter_split`:把原始 OrgBCG 信号拆成呼吸分量和 BCG 分量
- `psg_effort_filter`:处理 PSG 努力带 / 流量信号
- `rpeak2hr` / `rpeak2rri_interpolation`:由 R 峰生成 HR / RRI
#### `signal_method/rule_base_event.py`
规则检测主逻辑:
- `detect_movement`:基于滑窗标准差和局部幅值比较检测体动
- `movement_revise`:对体动掩码做二次修正
- `detect_low_amplitude_signal`:检测低幅值
- `position_based_sleep_recognition_v2/v3`:根据体动前后幅值变化标记姿势/幅值变化段
#### `signal_method/normalize_method.py`
`normalize_resp_signal_by_segment` 会按“可用片段”做分段 z-score 标准化。
HYS 中通常按 `Resp_AmpChange_Label` 的反向片段来归一化,目的是减少整夜幅值漂移带来的影响。
### 7.3 `draw_tools/`
主要用于人工检查处理质量:
- `draw_signal_with_mask`:画 HYS 全夜原始信号 + 规则掩码
- `draw_psg_signal`:画 HYS_PSG 全夜 PSG 信号
- `draw_psg_label` / `draw_psg_bcg_label`:按窗口导出分段图
### 7.4 `dataset_builder/`
核心职责是把“整夜记录 + 逐秒掩码”转换成训练样本:
- 保存处理后的多通道信号到 `npz`
- 保存窗口起止列表到 `npz`
- 把标签 CSV 一并拷贝到数据集目录
---
## 8. 标签、通道和编码约定
### 8.1 呼吸暂停事件编码
定义于 `utils/event_map.py`
| 事件 | 编码 |
| --- | --- |
| `Hypopnea` | 1 |
| `Central apnea` | 2 |
| `Obstructive apnea` | 3 |
| `Mixed apnea` | 4 |
### 8.2 PSG 通道编号映射
同样定义于 `utils/event_map.py`
| 编号 | 通道名 |
| --- | --- |
| 1 | `Rpeak` |
| 2 | `ECG_Sync` |
| 3 | `Effort Tho` |
| 4 | `Effort Abd` |
| 5 | `Flow P` |
| 6 | `Flow T` |
| 7 | `SpO2` |
| 8 | `5_class` |
### 8.3 睡眠分期编码
`5_class` 在读取时会转成整数:
| 分期 | 编码 |
| --- | --- |
| `N3` | 1 |
| `N2` | 2 |
| `N1` | 3 |
| `R` | 4 |
| `W` | 5 |
### 8.4 逐秒标签 CSV 字段
`Processed_Labels.csv` 的核心字段为:
- `Second`
- `SA_Label`
- `SA_Score`
- `Disable_Label`
- `Resp_LowAmp_Label`
- `Resp_Movement_Label`
- `Resp_AmpChange_Label`
- `BCG_LowAmp_Label`
- `BCG_Movement_Label`
- `BCG_AmpChange_Label`
样例可见仓库内:
- `output/HYS/220/220_Processed_Labels.csv`
- `output/HYS_PSG/220/220_Processed_Labels.csv`
---
## 9. 主要配置文件
### 9.1 HYS
`dataset_config/HYS_config.yaml` 主要控制:
- 样本 ID 列表
- 原始数据根目录
- 中间标签保存目录
- 呼吸 / BCG 的滤波和降采样参数
- 低幅值、体动、幅值变化检测阈值
- 数据集窗口长度和步长
### 9.2 HYS_PSG
`dataset_config/HYS_PSG_config.yaml` 主要控制:
- 样本 ID 列表
- 目标统一采样率 `target_fs`
- 努力带 / 流量滤波参数
- SpO2 异常填补参数
- 数据集输出路径
### 9.3 其他配置
- `dataset_config/ZD5Y_config.yaml`:另一套 BCG 规则配置
- `dataset_config/SHHS1_config.yaml`SHHS1 预留配置
- `dataset_config/RESP_PAIR_HYS_config.yaml`HYS 配对原始数据拷贝
- `dataset_config/RESP_PAIR_ZD5Y_config.yaml`ZD5Y 配对原始数据拷贝
---
## 10. 输出产物说明
### 10.1 中间输出
典型位置:
- `output/HYS/<id>/<id>_Processed_Labels.csv`
- `output/HYS/<id>/<id>_Signal_Plots.png`
- `output/HYS_PSG/<id>/<id>_Processed_Labels.csv`
- `output/HYS_PSG/<id>/<id>_Signal_Plots.png`
- `output/HYS_PSG/<id>/<id>_Signal_Plots_fill.png`
### 10.2 最终数据集输出
`dataset_builder/*` 保存到 YAML 指定的外部目录,结构通常是:
- `Signals/`
- `Segments_List/`
- `Labels/`
HYS 的 `Signals/*.npz` 里主要保存:
- `bcg_signal_notch`
- `bcg_signal`
- `resp_signal`
HYS_PSG 的 `Signals/*.npz` 里主要保存:
- `Effort Tho`
- `Effort Abd`
- `Effort`
- `Flow P`
- `Flow T`
- `SpO2`
- `HR`
- `RRI`
- `5_class`
`Segments_List/*.npz` 中主要保存:
- `segment_list`
- `disable_segment_list`
---
## 11. 推荐阅读顺序
如果 agent 是第一次接触这个仓库,建议按下面顺序阅读:
1. `dataset_config/HYS_config.yaml`
2. `event_mask_process/HYS_process.py`
3. `utils/operation_tools.py`
4. `utils/split_method.py`
5. `dataset_builder/HYS_dataset.py`
6. `signal_method/signal_process.py`
7. `signal_method/rule_base_event.py`
8. `utils/HYS_FileReader.py`
如果要看 PSG 主线,再继续读:
1. `dataset_config/HYS_PSG_config.yaml`
2. `event_mask_process/HYS_PSG_process.py`
3. `dataset_builder/HYS_PSG_dataset.py`
4. `draw_tools/draw_label.py`
---
## 12. 常见修改入口
如果你要改不同类型的问题,可以优先从这些文件入手:
| 需求 | 优先看哪里 |
| --- | --- |
| 调整阈值、采样率、窗口长度 | `dataset_config/*.yaml` |
| 修改逐秒标签生成逻辑 | `event_mask_process/*.py` |
| 修改体动 / 低幅值 / 幅值变化规则 | `signal_method/rule_base_event.py` |
| 修改滤波与重采样 | `signal_method/signal_process.py`、`utils/filter_func.py` |
| 修改切窗规则 | `utils/split_method.py` |
| 修改输入文件解析 | `utils/HYS_FileReader.py` |
| 修改事件编码或通道映射 | `utils/event_map.py` |
| 修改图像输出样式 | `draw_tools/*.py` |
---
## 13. 当前实现状态与注意事项
下面这些点对 agent 很重要:
1. 仓库没有依赖清单文件(如 `requirements.txt` / `pyproject.toml`),依赖需要从源码导入中反推,当前至少涉及 `numpy`、`pandas`、`scipy`、`matplotlib`、`seaborn`、`yaml`、`tqdm`、`rich`、`polars`、`lxml`、`mne`。
2. 大部分脚本依赖绝对路径,迁移环境时优先修改 YAML。
3. `event_mask_process/SHHS1_process.py` 目前基本为空,占位多于实现。
4. `event_mask_process/HYS_PSG_process.py` 当前是“简化版标签生成”,核心只用了 SA 标签和睡眠分期,`Resp_*` / `BCG_*` 掩码还没有真正实现。
5. `dataset_builder/HYS_dataset.py` 里虽然保留了分段可视化入口,但实际绘图调用被注释掉了,因此默认不会导出分段图。
6. `utils/signal_process.py``utils/filter_func.py` 有部分重复实现;当前实际被 `utils/__init__.py` 导出并广泛使用的是 `filter_func.py` 中的版本。
7. `README.md` 目前非常简略,真正的项目逻辑主要还是要靠源码理解。
---
## 14. 对 Agent 最有价值的结论
如果只能记住几件事,请记住下面这些:
1. 这是一个“先做逐秒掩码,再做固定窗口切片”的数据准备仓库。
2. HYS 主线比 HYS_PSG 更完整,规则检测主要服务于 HYS。
3. 切窗逻辑集中在 `utils/split_method.py`,不是分散在各个 builder 里。
4. 原始数据不在仓库里,仓库只是代码和部分中间结果样例。
5. 大多数运行问题都不是代码逻辑错,而是路径、文件命名、采样率和外部数据结构不匹配。

194
README.md
View File

@ -1,5 +1,191 @@
# DataPrepare
## 操作步骤
1. 信号预处理
2. 数据集构建
3. 数据可视化(可选)
`DataPrepare` 是一个面向睡眠呼吸暂停数据的预处理仓库,用于把原始 BCG / PSG 信号与事件标签整理成可训练的数据集。
当前仓库的主线是:
1. 生成逐秒标签掩码
2. 按固定窗口切分数据集
3. 输出可视化结果用于人工检查
更详细的项目背景说明见 [AGENT_BACKGROUND_CN.md](./AGENT_BACKGROUND_CN.md)。
## 项目在做什么
仓库主要服务两类数据:
- `HYS`:以 `OrgBCG` 为核心结合人工修正的呼吸暂停标签提取呼吸分量、BCG 分量并生成逐秒可用性掩码
- `HYS_PSG`:以 PSG 多通道信号为核心整理胸腹带、流量、SpO2、RRI、睡眠分期与同步呼吸暂停标签
两条主线都采用同一个设计:
1. 先把整夜记录整理成逐秒标签
2. 再根据标签切出固定长度窗口
## 仓库结构
| 目录 | 作用 |
| --- | --- |
| `dataset_config/` | 数据集配置文件,包含路径、采样率、阈值、切窗参数 |
| `event_mask_process/` | 逐秒标签掩码生成脚本 |
| `dataset_builder/` | 数据集切片与保存脚本 |
| `signal_method/` | 信号预处理、规则检测、特征计算 |
| `utils/` | 文件读取、标签转换、切窗、滤波等通用工具 |
| `draw_tools/` | 全夜图、分段图和统计图绘制 |
| `dataset_tools/` | 辅助脚本如配对数据拷贝、SHHS 标注检查 |
| `output/` | 仓库内的中间结果样例 |
## 核心流程
### HYS
对应脚本:
- `event_mask_process/HYS_process.py`
- `dataset_builder/HYS_dataset.py`
流程概要:
1. 读取 `OrgBCG_Aligned/<id>/OrgBCG_Sync_*.txt`
2. 读取 `Label/<id>/SA Label_corrected.csv`
3. 对原始信号做陷波、呼吸分量提取和 BCG 分量提取
4. 检测低幅值、体动、幅值变化,并结合 `排除区间.xlsx` 生成逐秒标签
5. 保存 `Processed_Labels.csv`
6. 根据 `window_sec``stride_sec` 切分训练窗口并保存 `npz`
### HYS_PSG
对应脚本:
- `event_mask_process/HYS_PSG_process.py`
- `dataset_builder/HYS_PSG_dataset.py`
流程概要:
1. 读取 `PSG_Aligned/<id>/` 下的多通道信号
2. 读取 `SA Label_Sync.csv`
3. 根据同步标签与睡眠分期生成逐秒标签
4. 对努力带、流量、SpO2、RRI 做统一重采样和长度对齐
5. 切分窗口并保存 PSG 数据集
## 快速开始
### 1. 先检查配置
所有主脚本都依赖 `dataset_config/*.yaml` 中的绝对路径。
在新机器或新数据目录下,优先修改这些配置文件:
- `dataset_config/HYS_config.yaml`
- `dataset_config/HYS_PSG_config.yaml`
- `dataset_config/ZD5Y_config.yaml`
- `dataset_config/SHHS1_config.yaml`
重点检查:
- `root_path`
- `mask_save_path``save_path`
- `dataset_save_path`
- `dataset_visual_path`
- `select_ids`
### 2. 生成逐秒标签
HYS
```bash
python event_mask_process/HYS_process.py
```
HYS_PSG
```bash
python event_mask_process/HYS_PSG_process.py
```
执行后通常会在 `output/` 下生成:
- `*_Processed_Labels.csv`
- `*_Signal_Plots.png`
### 3. 构建数据集
HYS
```bash
python dataset_builder/HYS_dataset.py
```
HYS_PSG
```bash
python dataset_builder/HYS_PSG_dataset.py
```
输出目录由对应 YAML 中的 `dataset_save_path` 控制,通常包含:
- `Signals/`
- `Segments_List/`
- `Labels/`
### 4. 可视化与辅助工具
配对拷贝原始数据:
```bash
python dataset_tools/resp_pair_copy.py
```
检查 SHHS XML 标注:
```bash
python dataset_tools/shhs_annotations_check.py
```
## 输入与输出约定
### 输入
仓库本身不包含原始数据,原始数据目录由 YAML 指定。代码默认外部数据目录中至少存在:
- `OrgBCG_Aligned/<id>/OrgBCG_Sync_*.txt`
- `PSG_Aligned/<id>/...`
- `Label/<id>/SA Label_corrected.csv`
- `PSG_Aligned/<id>/SA Label_Sync.csv`
### 中间输出
仓库内 `output/` 保存的是中间标签与图像样例,不是最终训练数据集。例如:
- `output/HYS/<id>/<id>_Processed_Labels.csv`
- `output/HYS_PSG/<id>/<id>_Processed_Labels.csv`
### 最终输出
`dataset_builder/` 写入 YAML 中配置的外部目录。典型结构为:
- `Signals/*.npz`
- `Segments_List/*.npz`
- `Labels/*.csv`
## 关键文件
如果你是第一次阅读这个仓库,推荐优先看:
1. `dataset_config/HYS_config.yaml`
2. `event_mask_process/HYS_process.py`
3. `utils/operation_tools.py`
4. `utils/split_method.py`
5. `dataset_builder/HYS_dataset.py`
6. `signal_method/rule_base_event.py`
## 当前状态与注意事项
- 仓库目前没有依赖清单文件,常见依赖包括 `numpy`、`pandas`、`scipy`、`matplotlib`、`seaborn`、`yaml`、`tqdm`、`rich`、`polars`、`lxml`、`mne`
- 大多数脚本没有命令行参数接口,配置文件路径直接写在脚本 `__main__`
- `event_mask_process/SHHS1_process.py` 目前基本还是占位
- `event_mask_process/HYS_PSG_process.py` 当前实现偏简化,`Resp_*` / `BCG_*` 掩码尚未真正展开
- `output/` 里的文件更适合拿来理解格式与结果,不代表完整数据集
## 相关文档
- 详细项目背景:[AGENT_BACKGROUND_CN.md](./AGENT_BACKGROUND_CN.md)

View File

@ -165,51 +165,6 @@ def multiprocess_entry(_progress, task_id, _id):
build_HYS_dataset_segment(samp_id=_id, show=False, draw_segment=True, verbose=False, multi_p=_progress, multi_task_id=task_id)
def multiprocess_with_tqdm(args_list, n_processes):
from concurrent.futures import ProcessPoolExecutor
from rich import progress
with progress.Progress(
"[progress.description]{task.description}",
progress.BarColumn(),
"[progress.percentage]{task.percentage:>3.0f}%",
progress.MofNCompleteColumn(),
progress.TimeRemainingColumn(),
progress.TimeElapsedColumn(),
refresh_per_second=1, # bit slower updates
transient=False
) as progress:
futures = []
with multiprocessing.Manager() as manager:
_progress = manager.dict()
overall_progress_task = progress.add_task("[green]All jobs progress:")
with ProcessPoolExecutor(max_workers=n_processes) as executor:
for i_args in range(len(args_list)):
args = args_list[i_args]
task_id = progress.add_task(f"task {i_args}", visible=True)
futures.append(executor.submit(multiprocess_entry, _progress, task_id, args_list[i_args]))
# monitor the progress:
while (n_finished := sum([future.done() for future in futures])) < len(
futures
):
progress.update(
overall_progress_task, completed=n_finished, total=len(futures)
)
for task_id, update_data in _progress.items():
desc = update_data.get("desc", "")
# update the progress bar for this task:
progress.update(
task_id,
completed=update_data.get("progress", 0),
total=update_data.get("total", 0),
description=desc
)
# raise any errors:
for future in futures:
future.result()
def multiprocess_with_pool(args_list, n_processes):
"""使用Pool每个worker处理固定数量任务后重启"""
from multiprocessing import Pool

View File

@ -0,0 +1,56 @@
select_ids:
- 1000
- 1004
- 1006
- 1009
- 1010
- 1300
- 1301
- 1302
- 1308
- 1314
- 1354
- 1374
- 1378
- 1478
- 220
- 221
- 229
- 282
- 285
- 286
- 54
- 541
- 579
- 582
- 670
- 671
- 683
- 684
- 686
- 703
- 704
- 726
- 735
- 736
- 88
- 893
- 933
- 935
- 939
- 950
- 952
- 954
- 955
- 956
- 960
- 961
- 962
- 967
- 969
- 971
- 972
root_path: /mnt/disk_wd/marques_dataset/DataCombine2023/HYS
pair_file_path: /mnt/disk_wd/marques_dataset/Resp_Pair_Dataset/HYS/Raw

View File

@ -0,0 +1,32 @@
select_ids:
- 3103
- 3105
- 3106
- 3107
- 3108
- 3110
- 3203
- 3204
- 3205
- 3209
- 3211
- 3301
- 3303
- 3304
- 3307
- 3308
- 3309
- 3403
- 3405
- 3406
- 3407
- 3408
- 3504
root_path: /mnt/disk_wd/marques_dataset/DataCombine2023/ZD5Y
pair_file_path: /mnt/disk_wd/marques_dataset/Resp_Pair_Dataset/ZD5Y/Raw

View File

@ -0,0 +1,67 @@
from pathlib import Path
import sys
sys.path.append(str(Path(__file__).resolve().parent.parent))
project_root_path = Path(__file__).resolve().parent.parent
import utils
import shutil
def copy_one_resp_pair(one_id):
sync_type = "Sync"
org_bcg_file_path = sync_bcg_path / f"{one_id}"
dest_bcg_file_path = pair_file_path / f"{one_id}"
dest_bcg_file_path.mkdir(parents=True, exist_ok=True)
if not list(org_bcg_file_path.glob("OrgBCG_Sync_*.txt")):
if not list(org_bcg_file_path.glob("OrgBCG_RoughCut_*.txt")):
print(f"No OrgBCG files found for ID {one_id}.")
return
else:
sync_type = "RoughCut"
print(f"Using RoughCut files for ID {one_id}.")
for file in org_bcg_file_path.glob(f"OrgBCG_{sync_type}_*.txt"):
shutil.copyfile(file, dest_bcg_file_path / f"{one_id}_{file.name}".replace("_RoughCut", "").replace("_Sync", ""))
psg_file_path = sync_psg_path / f"{one_id}"
dest_psg_file_path = pair_file_path / f"{one_id}"
dest_psg_file_path.mkdir(parents=True, exist_ok=True)
# 检查上面的文件是否存在
psg_file_patterns = [
f"5_class_{sync_type}_*.txt",
f"Effort Abd_{sync_type}_*.txt",
f"Effort Tho_{sync_type}_*.txt",
f"Flow P_{sync_type}_*.txt",
f"Flow T_{sync_type}_*.txt",
f"SA Label_Sync.csv",
f"SpO2_{sync_type}_*.txt"
]
for pattern in psg_file_patterns:
if not list(psg_file_path.glob(pattern)):
print(f"No PSG files found for ID {one_id} with pattern {pattern}.")
return
for pattern in psg_file_patterns:
for file in psg_file_path.glob(pattern):
shutil.copyfile(file, dest_psg_file_path / f"{one_id}_{file.name.replace('_RoughCut', '').replace('_Sync', '')}")
if __name__ == '__main__':
yaml_path = project_root_path / "dataset_config/RESP_PAIR_ZD5Y_config.yaml"
conf = utils.load_dataset_conf(yaml_path)
select_ids = conf["select_ids"]
root_path = Path(conf["root_path"])
sync_bcg_path = root_path / "OrgBCG_Aligned"
sync_psg_path = root_path / "PSG_Aligned"
pair_file_path = Path(conf["pair_file_path"])
# copy_one_resp_pair(961)
for samp_id in select_ids:
print(f"Processing {samp_id}...")
copy_one_resp_pair(samp_id)

108
utils/signal_process.py Normal file
View File

@ -0,0 +1,108 @@
from utils.operation_tools import timing_decorator
import numpy as np
from scipy import signal, ndimage
@timing_decorator()
def butterworth(data, _type, low_cut=0.0, high_cut=0.0, order=10, sample_rate=1000):
if _type == "lowpass": # 低通滤波处理
sos = signal.butter(order, low_cut / (sample_rate * 0.5), btype='lowpass', output='sos')
return signal.sosfiltfilt(sos, np.array(data))
elif _type == "bandpass": # 带通滤波处理
low = low_cut / (sample_rate * 0.5)
high = high_cut / (sample_rate * 0.5)
sos = signal.butter(order, [low, high], btype='bandpass', output='sos')
return signal.sosfiltfilt(sos, np.array(data))
elif _type == "highpass": # 高通滤波处理
sos = signal.butter(order, high_cut / (sample_rate * 0.5), btype='highpass', output='sos')
return signal.sosfiltfilt(sos, np.array(data))
else: # 警告,滤波器类型必须有
raise ValueError("Please choose a type of fliter")
def bessel(data, _type, low_cut=0.0, high_cut=0.0, order=4, sample_rate=1000):
if _type == "lowpass": # 低通滤波处理
b, a = signal.bessel(order, low_cut / (sample_rate * 0.5), btype='lowpass', analog=False, norm='mag')
return signal.filtfilt(b, a, np.array(data))
elif _type == "bandpass": # 带通滤波处理
low = low_cut / (sample_rate * 0.5)
high = high_cut / (sample_rate * 0.5)
b, a = signal.bessel(order, [low, high], btype='bandpass', analog=False, norm='mag')
return signal.filtfilt(b, a, np.array(data))
elif _type == "highpass": # 高通滤波处理
b, a = signal.bessel(order, high_cut / (sample_rate * 0.5), btype='highpass', analog=False, norm='mag')
return signal.filtfilt(b, a, np.array(data))
else: # 警告,滤波器类型必须有
raise ValueError("Please choose a type of fliter")
@timing_decorator()
def downsample_signal_fast(original_signal, original_fs, target_fs, chunk_size=100000):
"""
高效整数倍降采样长信号分段处理以优化内存和速度
参数:
original_signal : array-like, 原始信号数组
original_fs : float, 原始采样率 (Hz)
target_fs : float, 目标采样率 (Hz)
chunk_size : int, 每段处理的样本数默认100000
返回:
downsampled_signal : array-like, 降采样后的信号
"""
# 输入验证
if not isinstance(original_signal, np.ndarray):
original_signal = np.array(original_signal)
if original_fs <= target_fs:
raise ValueError("目标采样率必须小于原始采样率")
if target_fs <= 0 or original_fs <= 0:
raise ValueError("采样率必须为正数")
# 计算降采样因子(必须为整数)
downsample_factor = original_fs / target_fs
if not downsample_factor.is_integer():
raise ValueError("降采样因子必须为整数倍")
downsample_factor = int(downsample_factor)
# 计算总输出长度
total_length = len(original_signal)
output_length = total_length // downsample_factor
# 初始化输出数组
downsampled_signal = np.zeros(output_length)
# 分段处理
for start in range(0, total_length, chunk_size):
end = min(start + chunk_size, total_length)
chunk = original_signal[start:end]
# 使用decimate进行整数倍降采样
chunk_downsampled = signal.decimate(chunk, downsample_factor, ftype='iir', zero_phase=True)
# 计算输出位置
out_start = start // downsample_factor
out_end = out_start + len(chunk_downsampled)
if out_end > output_length:
chunk_downsampled = chunk_downsampled[:output_length - out_start]
downsampled_signal[out_start:out_end] = chunk_downsampled
return downsampled_signal
@timing_decorator()
def average_filter(raw_data, sample_rate, window_size_sec=20):
kernel = np.ones(window_size_sec * sample_rate) / (window_size_sec * sample_rate)
filtered = ndimage.convolve1d(raw_data, kernel, mode='reflect')
convolve_filter_signal = raw_data - filtered
return convolve_filter_signal
# 陷波滤波器
@timing_decorator()
def notch_filter(data, notch_freq=50.0, quality_factor=30.0, sample_rate=1000):
nyquist = 0.5 * sample_rate
norm_notch_freq = notch_freq / nyquist
b, a = signal.iirnotch(norm_notch_freq, quality_factor)
filtered_data = signal.filtfilt(b, a, data)
return filtered_data

BIN
排除区间.xlsx Normal file

Binary file not shown.