Enhance position-based sleep recognition with version updates and amplitude calculations

This commit is contained in:
marques 2025-10-16 19:58:13 +08:00
parent 0900a9b489
commit ddedfbf2cf
2 changed files with 92 additions and 18 deletions

View File

@ -2,6 +2,7 @@ from utils.operation_tools import timing_decorator
import numpy as np import numpy as np
from utils.operation_tools import merge_short_gaps, remove_short_durations from utils.operation_tools import merge_short_gaps, remove_short_durations
@timing_decorator() @timing_decorator()
def detect_low_amplitude_signal(signal_data, sampling_rate, window_size_sec=1, stride_sec=None, def detect_low_amplitude_signal(signal_data, sampling_rate, window_size_sec=1, stride_sec=None,
amplitude_threshold=50, merge_gap_sec=10, min_duration_sec=5): amplitude_threshold=50, merge_gap_sec=10, min_duration_sec=5):
@ -110,8 +111,9 @@ def get_typical_segment_for_continues_signal(signal_data, sampling_rate=100, win
# 仅对比相邻片段的幅值指标如果存在显著差异则认为存在睡姿变化即每个体动相邻的30秒内存在睡姿变化如果片段不足30秒则按实际长度对比 # 仅对比相邻片段的幅值指标如果存在显著差异则认为存在睡姿变化即每个体动相邻的30秒内存在睡姿变化如果片段不足30秒则按实际长度对比
@timing_decorator() @timing_decorator()
def position_based_sleep_recognition(signal_data, movement_mask, sampling_rate=100, window_size_sec=30, def position_based_sleep_recognition_v1(signal_data, movement_mask, sampling_rate=100, window_size_sec=30,
interval_to_movement=10): interval_to_movement=10):
mav_calc_window_sec = 2 # 计算mav的窗口大小单位秒
# 获取有效片段起止位置 # 获取有效片段起止位置
valid_mask = 1 - movement_mask valid_mask = 1 - movement_mask
valid_starts = np.where(np.diff(np.concatenate([[0], valid_mask])) == 1)[0] valid_starts = np.where(np.diff(np.concatenate([[0], valid_mask])) == 1)[0]
@ -150,17 +152,21 @@ def position_based_sleep_recognition(signal_data, movement_mask, sampling_rate=1
right_end = end right_end = end
# 新的end - start确保为200的整数倍 # 新的end - start确保为200的整数倍
if (left_end - left_start) % (2 * sampling_rate) != 0: if (left_end - left_start) % (mav_calc_window_sec * sampling_rate) != 0:
left_end = left_start + ((left_end - left_start) // (2 * sampling_rate)) * (2 * sampling_rate) left_end = left_start + ((left_end - left_start) // (mav_calc_window_sec * sampling_rate)) * (
if (right_end - right_start) % (2 * sampling_rate) != 0: mav_calc_window_sec * sampling_rate)
right_end = right_start + ((right_end - right_start) // (2 * sampling_rate)) * (2 * sampling_rate) if (right_end - right_start) % (mav_calc_window_sec * sampling_rate) != 0:
right_end = right_start + ((right_end - right_start) // (mav_calc_window_sec * sampling_rate)) * (
mav_calc_window_sec * sampling_rate)
# 计算每个片段的幅值指标 # 计算每个片段的幅值指标
left_mav = np.mean(np.max(signal_data[left_start:left_end].reshape(-1, 2 * sampling_rate), axis=0)) - np.mean( left_mav = np.mean(np.max(signal_data[left_start:left_end].reshape(-1, mav_calc_window_sec * sampling_rate),
np.min(signal_data[left_start:left_end].reshape(-1, 2 * sampling_rate), axis=0)) axis=0)) - np.mean(
np.min(signal_data[left_start:left_end].reshape(-1, mav_calc_window_sec * sampling_rate), axis=0))
right_mav = np.mean( right_mav = np.mean(
np.max(signal_data[right_start:right_end].reshape(-1, 2 * sampling_rate), axis=0)) - np.mean( np.max(signal_data[right_start:right_end].reshape(-1, mav_calc_window_sec * sampling_rate),
np.min(signal_data[right_start:right_end].reshape(-1, 2 * sampling_rate), axis=0)) axis=0)) - np.mean(
np.min(signal_data[right_start:right_end].reshape(-1, mav_calc_window_sec * sampling_rate), axis=0))
segment_left_average_amplitude.append(left_mav) segment_left_average_amplitude.append(left_mav)
segment_right_average_amplitude.append(right_mav) segment_right_average_amplitude.append(right_mav)
@ -171,6 +177,10 @@ def position_based_sleep_recognition(signal_data, movement_mask, sampling_rate=1
position_changes = [] position_changes = []
position_change_times = [] position_change_times = []
# 判断是否存在显著变化 (可根据实际情况调整阈值)
threshold_amplitude = 0.1 # 幅值变化阈值
threshold_energy = 0.1 # 能量变化阈值
for i in range(1, len(segment_left_average_amplitude)): for i in range(1, len(segment_left_average_amplitude)):
# 计算幅值指标的变化率 # 计算幅值指标的变化率
left_amplitude_change = abs(segment_left_average_amplitude[i] - segment_left_average_amplitude[i - 1]) / max( left_amplitude_change = abs(segment_left_average_amplitude[i] - segment_left_average_amplitude[i - 1]) / max(
@ -184,10 +194,6 @@ def position_based_sleep_recognition(signal_data, movement_mask, sampling_rate=1
right_energy_change = abs(segment_right_average_energy[i] - segment_right_average_energy[i - 1]) / max( right_energy_change = abs(segment_right_average_energy[i] - segment_right_average_energy[i - 1]) / max(
segment_right_average_energy[i - 1], 1e-6) segment_right_average_energy[i - 1], 1e-6)
# 判断是否存在显著变化 (可根据实际情况调整阈值)
threshold_amplitude = 0.1 # 幅值变化阈值
threshold_energy = 0.1 # 能量变化阈值
# 如果左右通道中的任一通道同时满足幅值和能量的变化阈值,则认为存在姿势变化 # 如果左右通道中的任一通道同时满足幅值和能量的变化阈值,则认为存在姿势变化
left_significant_change = (left_amplitude_change > threshold_amplitude) and ( left_significant_change = (left_amplitude_change > threshold_amplitude) and (
left_energy_change > threshold_energy) left_energy_change > threshold_energy)
@ -203,3 +209,73 @@ def position_based_sleep_recognition(signal_data, movement_mask, sampling_rate=1
return position_changes, position_change_times return position_changes, position_change_times
def position_based_sleep_recognition_v2(signal_data, movement_mask, sampling_rate=100, window_size_sec=30):
"""
:param signal_data:
:param movement_mask: mask的采样率为1Hz
:param sampling_rate:
:param window_size_sec:
:return:
"""
mav_calc_window_sec = 2 # 计算mav的窗口大小单位秒
# 获取有效片段起止位置
valid_mask = 1 - movement_mask
valid_starts = np.where(np.diff(np.concatenate([[0], valid_mask])) == 1)[0]
valid_ends = np.where(np.diff(np.concatenate([valid_mask, [0]])) == -1)[0]
# 对于有效区间大于12分钟的拆成多个5分钟
movement_start = np.where(np.diff(np.concatenate([[0], movement_mask])) == 1)[0]
movement_end = np.where(np.diff(np.concatenate([movement_mask, [0]])) == -1)[0]
segment_average_amplitude = []
segment_average_energy = []
for start, end in zip(valid_starts, valid_ends):
start *= sampling_rate
end *= sampling_rate
# 避免过短的片段
if end - start <= sampling_rate: # 小于1秒的片段不考虑
continue
# 新的end - start确保为200的整数倍
if (end - start) % (mav_calc_window_sec * sampling_rate) != 0:
end = start + ((end - start) // (mav_calc_window_sec * sampling_rate)) * (
mav_calc_window_sec * sampling_rate)
# 计算每个片段的幅值指标
mav = np.mean(
np.max(signal_data[start:end].reshape(-1, mav_calc_window_sec * sampling_rate), axis=0)) - np.mean(
np.min(signal_data[start:end].reshape(-1, mav_calc_window_sec * sampling_rate), axis=0))
segment_average_amplitude.append(mav)
energy = np.sum(np.abs(signal_data[start:end] ** 2))
segment_average_energy.append(energy)
position_changes = []
position_change_times = []
# 判断是否存在显著变化 (可根据实际情况调整阈值)
threshold_amplitude = 0.1 # 幅值变化阈值
threshold_energy = 0.1 # 能量变化阈值
for i in range(1, len(segment_average_amplitude)):
# 计算幅值指标的变化率
amplitude_change = abs(segment_average_amplitude[i] - segment_average_amplitude[i - 1]) / max(
segment_average_amplitude[i - 1], 1e-6)
# 计算能量指标的变化率
energy_change = abs(segment_average_energy[i] - segment_average_energy[i - 1]) / max(
segment_average_energy[i - 1], 1e-6)
significant_change = (amplitude_change > threshold_amplitude) and (energy_change > threshold_energy)
if significant_change:
# 记录姿势变化发生的时间点 用当前分割的体动的起始位置和结束位置表示
position_changes.append(1)
position_change_times.append((movement_start[i - 1], movement_end[i - 1]))
else:
position_changes.append(0) # 0表示不存在姿势变化
return position_changes, position_change_times

View File

@ -318,5 +318,3 @@ def slide_window_segment(signal_second: int, window_second, step_second, event_m
yield start_second, end_second, window_event, window_score, window_disable yield start_second, end_second, window_event, window_score, window_disable