DataPrepare/signal_method/time_metrics.py

41 lines
2.7 KiB
Python

from utils.operation_tools import calculate_by_slide_windows
from utils.operation_tools import timing_decorator
import numpy as np
@timing_decorator()
def calc_mav(signal_data, movement_mask, low_amp_mask, sampling_rate=100, window_second=10, step_second=1, inner_window_second=2):
assert len(movement_mask) * sampling_rate == len(signal_data), f"movement_mask 长度与 signal_data 长度不一致, {len(movement_mask) * sampling_rate} != {len(signal_data)}"
assert len(movement_mask) == len(low_amp_mask), f"movement_mask 和 low_amp_mask 长度不一致, {len(movement_mask)} != {len(low_amp_mask)}"
# print(f"movement_mask_length: {len(movement_mask)}, signal_data_length: {len(signal_data)}")
processed_mask = movement_mask.copy()
def mav_func(x):
return np.mean(np.nanmax(x.reshape(-1, inner_window_second*sampling_rate), axis=1) - np.nanmin(x.reshape(-1, inner_window_second*sampling_rate), axis=1)) / 2
mav_nan, mav = calculate_by_slide_windows(mav_func, signal_data, processed_mask, sampling_rate=sampling_rate,
window_second=window_second, step_second=step_second)
return mav_nan, mav
@timing_decorator()
def calc_wavefactor(signal_data, movement_mask, low_amp_mask, sampling_rate=100):
assert len(movement_mask) * sampling_rate == len(signal_data), f"movement_mask 长度与 signal_data 长度不一致, {len(movement_mask) * sampling_rate} != {len(signal_data)}"
assert len(movement_mask) == len(low_amp_mask), f"movement_mask 和 low_amp_mask 长度不一致, {len(movement_mask)} != {len(low_amp_mask)}"
processed_mask = movement_mask.copy()
processed_mask = processed_mask.repeat(sampling_rate)
wavefactor_nan, wavefactor = calculate_by_slide_windows(lambda x: np.sqrt(np.mean(x ** 2)) / np.mean(np.abs(x)),
signal_data, processed_mask, sampling_rate=sampling_rate, window_second=2, step_second=1)
return wavefactor_nan, wavefactor
@timing_decorator()
def calc_peakfactor(signal_data, movement_mask, low_amp_mask, sampling_rate=100):
assert len(movement_mask) * sampling_rate == len(signal_data), f"movement_mask 长度与 signal_data 长度不一致, {len(movement_mask) * sampling_rate} != {len(signal_data)}"
assert len(movement_mask) == len(low_amp_mask), f"movement_mask 和 low_amp_mask 长度不一致, {len(movement_mask)} != {len(low_amp_mask)}"
processed_mask = movement_mask.copy()
processed_mask = processed_mask.repeat(sampling_rate)
peakfactor_nan, peakfactor = calculate_by_slide_windows(lambda x: np.max(np.abs(x)) / np.sqrt(np.mean(x ** 2)),
signal_data, processed_mask, sampling_rate=sampling_rate, window_second=2, step_second=1)
return peakfactor_nan, peakfactor