DataPrepare/signal_method/time_metrics.py
marques 8ee5980906 feat: Add utility functions for signal processing and event mapping
- Created a new module `utils/__init__.py` to consolidate utility imports.
- Added `event_map.py` for mapping apnea event types to numerical values and colors.
- Implemented various filtering functions in `filter_func.py`, including Butterworth, Bessel, downsampling, and notch filters.
- Developed `operation_tools.py` for dataset configuration loading, event mask generation, and signal processing utilities.
- Introduced `split_method.py` for segmenting data based on movement and amplitude criteria.
- Added `statistics_metrics.py` for calculating amplitude metrics and generating confusion matrices.
- Included a new Excel file for additional data storage.
2026-03-24 21:15:05 +08:00

45 lines
2.9 KiB
Python

from utils.operation_tools import calculate_by_slide_windows
from utils.operation_tools import timing_decorator
import numpy as np
@timing_decorator()
def calc_mav_by_slide_windows(signal_data, movement_mask, low_amp_mask, sampling_rate=100, window_second=10, step_second=1, inner_window_second=2):
if movement_mask is not None:
assert len(movement_mask) * sampling_rate == len(signal_data), f"movement_mask 长度与 signal_data 长度不一致, {len(movement_mask) * sampling_rate} != {len(signal_data)}"
# assert len(movement_mask) == len(low_amp_mask), f"movement_mask 和 low_amp_mask 长度不一致, {len(movement_mask)} != {len(low_amp_mask)}"
# print(f"movement_mask_length: {len(movement_mask)}, signal_data_length: {len(signal_data)}")
processed_mask = movement_mask.copy()
else:
processed_mask = None
def mav_func(x):
return np.mean(np.nanmax(x.reshape(-1, inner_window_second*sampling_rate), axis=1) - np.nanmin(x.reshape(-1, inner_window_second*sampling_rate), axis=1)) / 2
mav_nan, mav = calculate_by_slide_windows(mav_func, signal_data, processed_mask, sampling_rate=sampling_rate,
window_second=window_second, step_second=step_second)
return mav_nan, mav
@timing_decorator()
def calc_wavefactor_by_slide_windows(signal_data, movement_mask, low_amp_mask, sampling_rate=100):
assert len(movement_mask) * sampling_rate == len(signal_data), f"movement_mask 长度与 signal_data 长度不一致, {len(movement_mask) * sampling_rate} != {len(signal_data)}"
assert len(movement_mask) == len(low_amp_mask), f"movement_mask 和 low_amp_mask 长度不一致, {len(movement_mask)} != {len(low_amp_mask)}"
processed_mask = movement_mask.copy()
processed_mask = processed_mask.repeat(sampling_rate)
wavefactor_nan, wavefactor = calculate_by_slide_windows(lambda x: np.sqrt(np.mean(x ** 2)) / np.mean(np.abs(x)),
signal_data, processed_mask, sampling_rate=sampling_rate, window_second=2, step_second=1)
return wavefactor_nan, wavefactor
@timing_decorator()
def calc_peakfactor_by_slide_windows(signal_data, movement_mask, low_amp_mask, sampling_rate=100):
assert len(movement_mask) * sampling_rate == len(signal_data), f"movement_mask 长度与 signal_data 长度不一致, {len(movement_mask) * sampling_rate} != {len(signal_data)}"
assert len(movement_mask) == len(low_amp_mask), f"movement_mask 和 low_amp_mask 长度不一致, {len(movement_mask)} != {len(low_amp_mask)}"
processed_mask = movement_mask.copy()
processed_mask = processed_mask.repeat(sampling_rate)
peakfactor_nan, peakfactor = calculate_by_slide_windows(lambda x: np.max(np.abs(x)) / np.sqrt(np.mean(x ** 2)),
signal_data, processed_mask, sampling_rate=sampling_rate, window_second=2, step_second=1)
return peakfactor_nan, peakfactor