from utils.operation_tools import calculate_by_slide_windows from utils.operation_tools import timing_decorator import numpy as np @timing_decorator() def calc_mav(signal_data, movement_mask, low_amp_mask, sampling_rate=100, window_second=10, step_second=1, inner_window_second=2): assert len(movement_mask) * sampling_rate == len(signal_data), f"movement_mask 长度与 signal_data 长度不一致, {len(movement_mask) * sampling_rate} != {len(signal_data)}" assert len(movement_mask) == len(low_amp_mask), f"movement_mask 和 low_amp_mask 长度不一致, {len(movement_mask)} != {len(low_amp_mask)}" # print(f"movement_mask_length: {len(movement_mask)}, signal_data_length: {len(signal_data)}") processed_mask = movement_mask.copy() def mav_func(x): return np.mean(np.nanmax(x.reshape(-1, inner_window_second*sampling_rate), axis=1) - np.nanmin(x.reshape(-1, inner_window_second*sampling_rate), axis=1)) / 2 mav_nan, mav = calculate_by_slide_windows(mav_func, signal_data, processed_mask, sampling_rate=sampling_rate, window_second=window_second, step_second=step_second) return mav_nan, mav @timing_decorator() def calc_wavefactor(signal_data, movement_mask, low_amp_mask, sampling_rate=100): assert len(movement_mask) * sampling_rate == len(signal_data), f"movement_mask 长度与 signal_data 长度不一致, {len(movement_mask) * sampling_rate} != {len(signal_data)}" assert len(movement_mask) == len(low_amp_mask), f"movement_mask 和 low_amp_mask 长度不一致, {len(movement_mask)} != {len(low_amp_mask)}" processed_mask = movement_mask.copy() processed_mask = processed_mask.repeat(sampling_rate) wavefactor_nan, wavefactor = calculate_by_slide_windows(lambda x: np.sqrt(np.mean(x ** 2)) / np.mean(np.abs(x)), signal_data, processed_mask, sampling_rate=sampling_rate, window_second=2, step_second=1) return wavefactor_nan, wavefactor @timing_decorator() def calc_peakfactor(signal_data, movement_mask, low_amp_mask, sampling_rate=100): assert len(movement_mask) * sampling_rate == len(signal_data), f"movement_mask 长度与 signal_data 长度不一致, {len(movement_mask) * sampling_rate} != {len(signal_data)}" assert len(movement_mask) == len(low_amp_mask), f"movement_mask 和 low_amp_mask 长度不一致, {len(movement_mask)} != {len(low_amp_mask)}" processed_mask = movement_mask.copy() processed_mask = processed_mask.repeat(sampling_rate) peakfactor_nan, peakfactor = calculate_by_slide_windows(lambda x: np.max(np.abs(x)) / np.sqrt(np.mean(x ** 2)), signal_data, processed_mask, sampling_rate=sampling_rate, window_second=2, step_second=1) return peakfactor_nan, peakfactor