Add event mask generation and sliding window segmentation for signal analysis

This commit is contained in:
marques 2025-10-16 19:21:05 +08:00
parent 180d872cd7
commit 0900a9b489
5 changed files with 57 additions and 9 deletions

View File

@ -52,6 +52,7 @@ def process_one_signal(samp_id):
label_data = utils.read_label_csv(label_path)
label_mask = utils.generate_event_mask(signal_second, label_data)
manual_disable_mask = utils.generate_disable_mask(signal_second, all_samp_disable_df[all_samp_disable_df["id"] == samp_id])
print(f"disable_mask_shape: {manual_disable_mask.shape}, num_disable: {np.sum(manual_disable_mask == 0)}")
@ -60,6 +61,10 @@ def process_one_signal(samp_id):
if __name__ == '__main__':
yaml_path = Path("./dataset_config/HYS_config.yaml")
disable_df_path = Path("./排除区间.xlsx")

View File

@ -201,7 +201,5 @@ def position_based_sleep_recognition(signal_data, movement_mask, sampling_rate=1
else:
position_changes.append(0) # 0表示不存在姿势变化
# print(i,movement_start[i], movement_end[i], round(left_amplitude_change, 2), round(right_amplitude_change, 2), round(left_energy_change, 2), round(right_energy_change, 2))
return position_changes, position_change_times

View File

@ -1,2 +1,3 @@
from utils.HYS_FileReader import read_label_csv, read_signal_txt, read_disable_excel
from utils.operation_tools import load_dataset_info, generate_disable_mask
from utils.operation_tools import load_dataset_info, generate_disable_mask, generate_event_mask
from utils.event_map import E2N

7
utils/event_map.py Normal file
View File

@ -0,0 +1,7 @@
# apnea event type to number mapping
E2N = {
"Hypopnea": 1,
"Central apnea": 2,
"Obstructive apnea": 3,
"Mixed apnea": 4
}

View File

@ -5,7 +5,7 @@ import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
import yaml
from utils.event_map import E2N
plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
@ -272,14 +272,51 @@ def generate_disable_mask(signal_second: int, disable_df) -> np.ndarray:
return disable_mask
def generate_event_mask(signal_second: int, event_df) -> np.ndarray:
def generate_event_mask(signal_second: int, event_df):
event_mask = np.zeros(signal_second, dtype=int)
score_mask = np.zeros(signal_second, dtype=int)
# 剔除start = -1 的行
event_df = event_df[event_df["correct_Start"] >= 0]
for _, row in event_df.iterrows():
start = row["start"]
end = row["end"]
event_mask[start:end] = 1
return event_mask
start = row["correct_Start"]
end = row["correct_End"] + 1
event_mask[start:end] = E2N[row["correct_EventsType"]]
score_mask[start:end] = row["score"]
return event_mask, score_mask
def slide_window_segment(signal_second: int, window_second, step_second, event_mask, score_mask, disable_mask, ):
# 避开不可用区域进行滑窗分割
# 滑动到不可用区域时如果窗口内一侧的不可用区域不超过1/2 windows_second则继续滑动, 用reflect填充
# 如果不可用区间大于1/2的window_second则跳过该不可用区间继续滑动
# TODO 对于短时强体动区间 考虑填充或者掩码覆盖
#
half_window_second = window_second // 2
for start_second in range(0, signal_second - window_second + 1, step_second):
end_second = start_second + window_second
# 检查当前窗口是否包含不可用区域
windows_middle_second = (start_second + end_second) // 2
if np.sum(disable_mask[start_second:end_second] > 1) > half_window_second:
# 如果窗口内不可用区域超过一半,跳过该窗口
continue
if disable_mask[start_second:end_second] > half_window_second:
# 确保新的起始位置不超过信号长度
if start_second + window_second > signal_second:
break
window_event = event_mask[start_second:end_second]
window_score = score_mask[start_second:end_second]
window_disable = disable_mask[start_second:end_second]
yield start_second, end_second, window_event, window_score, window_disable