DataPrepare/signal_method/rule_base_event.py

208 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from utils.operation_tools import timing_decorator
import numpy as np
from utils.operation_tools import merge_short_gaps, remove_short_durations
@timing_decorator()
def detect_low_amplitude_signal(signal_data, sampling_rate, window_size_sec=1, stride_sec=None,
amplitude_threshold=50, merge_gap_sec=10, min_duration_sec=5):
"""
检测信号中的低幅值状态通过计算RMS值判断信号强度是否低于设定阈值。
参数:
- signal_data: numpy array输入的信号数据
- sampling_rate: int信号的采样率Hz
- window_size_sec: float分析窗口的时长默认值为 1 秒
- stride_sec: float窗口滑动步长默认值为None等于window_size_sec无重叠
- amplitude_threshold: floatRMS阈值低于此值表示低幅值状态默认值为 50
- merge_gap_sec: float要合并的状态之间的最大间隔默认值为 10 秒
- min_duration_sec: float要保留的状态的最小持续时间默认值为 5 秒
返回:
- low_amplitude_mask: numpy array低幅值状态的掩码1表示低幅值0表示正常幅值
"""
# 计算窗口大小(样本数)
window_samples = int(window_size_sec * sampling_rate)
# 如果未指定步长,设置为窗口大小(无重叠)
if stride_sec is None:
stride_sec = window_size_sec
# 计算步长(样本数)
stride_samples = int(stride_sec * sampling_rate)
# 确保步长至少为1
stride_samples = max(1, stride_samples)
# 处理信号边界,使用反射填充
pad_size = window_samples // 2
padded_signal = np.pad(signal_data, pad_size, mode='reflect')
# 计算填充后的窗口数量
num_windows = max(1, (len(padded_signal) - window_samples) // stride_samples + 1)
# 初始化RMS值数组
rms_values = np.zeros(num_windows)
# 计算每个窗口的RMS值
for i in range(num_windows):
start_idx = i * stride_samples
end_idx = min(start_idx + window_samples, len(signal_data))
# 处理窗口,包括可能不完整的最后一个窗口
window_data = signal_data[start_idx:end_idx]
if len(window_data) > 0:
rms_values[i] = np.sqrt(np.mean(window_data ** 2))
else:
rms_values[i] = 0
# 生成初始低幅值掩码RMS低于阈值的窗口标记为1低幅值其他为0
low_amplitude_mask = np.where(rms_values <= amplitude_threshold, 1, 0)
# 计算原始信号对应的窗口索引范围
orig_start_window = pad_size // stride_samples
if stride_sec == 1:
orig_end_window = orig_start_window + (len(signal_data) // stride_samples)
else:
orig_end_window = orig_start_window + (len(signal_data) // stride_samples) + 1
# 只保留原始信号对应的窗口低幅值掩码
low_amplitude_mask = low_amplitude_mask[orig_start_window:orig_end_window]
# print("low_amplitude_mask_length: ", len(low_amplitude_mask))
num_original_windows = len(low_amplitude_mask)
# 转换为时间轴上的状态序列
# 计算每个窗口对应的时间点(秒)
time_points = np.arange(num_original_windows) * stride_sec
# 如果需要合并间隔小的状态
if merge_gap_sec > 0:
low_amplitude_mask = merge_short_gaps(low_amplitude_mask, time_points, merge_gap_sec)
# 如果需要移除短时状态
if min_duration_sec > 0:
low_amplitude_mask = remove_short_durations(low_amplitude_mask, time_points, min_duration_sec)
low_amplitude_mask = low_amplitude_mask.repeat(stride_sec)[:len(signal_data) // sampling_rate]
# 低幅值状态起止位置 [[start, end], [start, end], ...]
low_amplitude_start = np.where(np.diff(np.concatenate([[0], low_amplitude_mask])) == 1)[0]
low_amplitude_end = np.where(np.diff(np.concatenate([low_amplitude_mask, [0]])) == -1)[0]
low_amplitude_position_list = [[start, end] for start, end in zip(low_amplitude_start, low_amplitude_end)]
return low_amplitude_mask, low_amplitude_position_list
def get_typical_segment_for_continues_signal(signal_data, sampling_rate=100, window_size=30, step_size=1):
"""
获取十个片段
:param signal_data: 信号数据
:param sampling_rate: 采样率
:param window_size: 窗口大小(秒)
:param step_size: 步长(秒)
:return: 典型片段列表
"""
pass
# 基于体动位置和幅值的睡姿识别
# 主要是依靠体动mask将整夜分割成多个有效片段然后每个片段计算幅值指标判断两个片段的幅值指标是否存在显著差异如果存在显著差异则认为存在睡姿变化
# 考虑到每个片段长度为10s所以每个片段的幅值指标计算时间长度为10s然后计算每个片段的幅值指标
# 仅对比相邻片段的幅值指标如果存在显著差异则认为存在睡姿变化即每个体动相邻的30秒内存在睡姿变化如果片段不足30秒则按实际长度对比
@timing_decorator()
def position_based_sleep_recognition(signal_data, movement_mask, sampling_rate=100, window_size_sec=30,
interval_to_movement=10):
# 获取有效片段起止位置
valid_mask = 1 - movement_mask
valid_starts = np.where(np.diff(np.concatenate([[0], valid_mask])) == 1)[0]
valid_ends = np.where(np.diff(np.concatenate([valid_mask, [0]])) == -1)[0]
movement_start = np.where(np.diff(np.concatenate([[0], movement_mask])) == 1)[0]
movement_end = np.where(np.diff(np.concatenate([movement_mask, [0]])) == -1)[0]
segment_left_average_amplitude = []
segment_right_average_amplitude = []
segment_left_average_energy = []
segment_right_average_energy = []
# window_samples = int(window_size_sec * sampling_rate)
# pad_size = window_samples // 2
# padded_signal = np.pad(signal_data, pad_size, mode='reflect')
for start, end in zip(valid_starts, valid_ends):
start *= sampling_rate
end *= sampling_rate
# 避免过短的片段
if end - start <= sampling_rate: # 小于1秒的片段不考虑
continue
# 获取当前片段数据
elif end - start < (window_size_sec * interval_to_movement + 1) * sampling_rate:
left_start = start
left_end = min(end, left_start + window_size_sec * sampling_rate)
right_start = max(start, end - window_size_sec * sampling_rate)
right_end = end
else:
left_start = start + interval_to_movement * sampling_rate
left_end = left_start + window_size_sec * sampling_rate
right_start = end - interval_to_movement * sampling_rate - window_size_sec * sampling_rate
right_end = end
# 新的end - start确保为200的整数倍
if (left_end - left_start) % (2 * sampling_rate) != 0:
left_end = left_start + ((left_end - left_start) // (2 * sampling_rate)) * (2 * sampling_rate)
if (right_end - right_start) % (2 * sampling_rate) != 0:
right_end = right_start + ((right_end - right_start) // (2 * sampling_rate)) * (2 * sampling_rate)
# 计算每个片段的幅值指标
left_mav = np.mean(np.max(signal_data[left_start:left_end].reshape(-1, 2 * sampling_rate), axis=0)) - np.mean(
np.min(signal_data[left_start:left_end].reshape(-1, 2 * sampling_rate), axis=0))
right_mav = np.mean(
np.max(signal_data[right_start:right_end].reshape(-1, 2 * sampling_rate), axis=0)) - np.mean(
np.min(signal_data[right_start:right_end].reshape(-1, 2 * sampling_rate), axis=0))
segment_left_average_amplitude.append(left_mav)
segment_right_average_amplitude.append(right_mav)
left_energy = np.sum(np.abs(signal_data[left_start:left_end] ** 2))
right_energy = np.sum(np.abs(signal_data[right_start:right_end] ** 2))
segment_left_average_energy.append(left_energy)
segment_right_average_energy.append(right_energy)
position_changes = []
position_change_times = []
for i in range(1, len(segment_left_average_amplitude)):
# 计算幅值指标的变化率
left_amplitude_change = abs(segment_left_average_amplitude[i] - segment_left_average_amplitude[i - 1]) / max(
segment_left_average_amplitude[i - 1], 1e-6)
right_amplitude_change = abs(segment_right_average_amplitude[i] - segment_right_average_amplitude[i - 1]) / max(
segment_right_average_amplitude[i - 1], 1e-6)
# 计算能量指标的变化率
left_energy_change = abs(segment_left_average_energy[i] - segment_left_average_energy[i - 1]) / max(
segment_left_average_energy[i - 1], 1e-6)
right_energy_change = abs(segment_right_average_energy[i] - segment_right_average_energy[i - 1]) / max(
segment_right_average_energy[i - 1], 1e-6)
# 判断是否存在显著变化 (可根据实际情况调整阈值)
threshold_amplitude = 0.1 # 幅值变化阈值
threshold_energy = 0.1 # 能量变化阈值
# 如果左右通道中的任一通道同时满足幅值和能量的变化阈值,则认为存在姿势变化
left_significant_change = (left_amplitude_change > threshold_amplitude) and (
left_energy_change > threshold_energy)
right_significant_change = (right_amplitude_change > threshold_amplitude) and (
right_energy_change > threshold_energy)
if left_significant_change or right_significant_change:
# 记录姿势变化发生的时间点 用当前分割的体动的起始位置和结束位置表示
position_changes.append(1)
position_change_times.append((movement_start[i - 1], movement_end[i - 1]))
else:
position_changes.append(0) # 0表示不存在姿势变化
# print(i,movement_start[i], movement_end[i], round(left_amplitude_change, 2), round(right_amplitude_change, 2), round(left_energy_change, 2), round(right_energy_change, 2))
return position_changes, position_change_times