441 lines
20 KiB
Python
441 lines
20 KiB
Python
from utils.operation_tools import timing_decorator
|
||
import numpy as np
|
||
from utils import merge_short_gaps, remove_short_durations, event_mask_2_list
|
||
|
||
|
||
@timing_decorator()
|
||
def detect_movement(signal_data, sampling_rate, window_size_sec=2, stride_sec=None,
|
||
std_median_multiplier=4.5, compare_intervals_sec=[30, 60],
|
||
interval_multiplier=2.5,
|
||
merge_gap_sec=10, min_duration_sec=5,
|
||
low_amplitude_periods=None):
|
||
"""
|
||
检测信号中的体动状态,结合两种方法:标准差比较和前后窗口幅值对比。
|
||
使用反射填充处理信号边界。
|
||
|
||
参数:
|
||
- signal_data: numpy array,输入的信号数据
|
||
- sampling_rate: int,信号的采样率(Hz)
|
||
- window_size_sec: float,分析窗口的时长(秒),默认值为 2 秒
|
||
- stride_sec: float,窗口滑动步长(秒),默认值为None(等于window_size_sec,无重叠)
|
||
- std_median_multiplier: float,标准差中位数的乘数阈值,默认值为 4.5
|
||
- compare_intervals_sec: list,用于比较的时间间隔列表(秒),默认为 [30, 60]
|
||
- interval_multiplier: float,间隔中位数的上限乘数,默认值为 2.5
|
||
- merge_gap_sec: float,要合并的体动状态之间的最大间隔(秒),默认值为 10 秒
|
||
- min_duration_sec: float,要保留的体动状态的最小持续时间(秒),默认值为 5 秒
|
||
- low_amplitude_periods: numpy array,低幅值期间的掩码(1表示低幅值期间),默认为None
|
||
|
||
返回:
|
||
- movement_mask: numpy array,体动状态的掩码(1表示体动,0表示睡眠)
|
||
"""
|
||
# 计算窗口大小(样本数)
|
||
window_samples = int(window_size_sec * sampling_rate)
|
||
|
||
# 如果未指定步长,设置为窗口大小(无重叠)
|
||
if stride_sec is None:
|
||
stride_sec = window_size_sec
|
||
|
||
# 计算步长(样本数)
|
||
stride_samples = int(stride_sec * sampling_rate)
|
||
|
||
# 确保步长至少为1
|
||
stride_samples = max(1, stride_samples)
|
||
|
||
# 计算需要的最大填充大小(基于比较间隔)
|
||
max_interval_samples = int(max(compare_intervals_sec) * sampling_rate)
|
||
|
||
# 应用反射填充以正确处理边界
|
||
# 填充大小为最大比较间隔的一半,以确保边界有足够的上下文
|
||
pad_size = max_interval_samples
|
||
padded_signal = np.pad(signal_data, pad_size, mode='reflect')
|
||
|
||
# 计算填充后的窗口数量
|
||
num_windows = max(1, (len(padded_signal) - window_samples) // stride_samples + 1)
|
||
|
||
# 初始化窗口标准差数组
|
||
window_std = np.zeros(num_windows)
|
||
# 计算每个窗口的标准差
|
||
# 分窗计算标准差
|
||
for i in range(num_windows):
|
||
start_idx = i * stride_samples
|
||
end_idx = min(start_idx + window_samples, len(padded_signal))
|
||
|
||
# 处理窗口,包括可能不完整的最后一个窗口
|
||
window_data = padded_signal[start_idx:end_idx]
|
||
if len(window_data) > 0:
|
||
window_std[i] = np.std(window_data, ddof=1)
|
||
else:
|
||
window_std[i] = 0
|
||
|
||
# 计算原始信号对应的窗口索引范围
|
||
# 填充后,原始信号从pad_size开始
|
||
orig_start_window = pad_size // stride_samples
|
||
if stride_sec == 1:
|
||
orig_end_window = orig_start_window + (len(signal_data) // stride_samples)
|
||
else:
|
||
orig_end_window = orig_start_window + (len(signal_data) // stride_samples) + 1
|
||
|
||
# 只保留原始信号对应的窗口标准差
|
||
original_window_std = window_std[orig_start_window:orig_end_window]
|
||
num_original_windows = len(original_window_std)
|
||
|
||
# 创建时间点数组(秒)
|
||
time_points = np.arange(num_original_windows) * stride_sec
|
||
|
||
# # 如果提供了低幅值期间的掩码,则在计算全局中位数时排除这些期间
|
||
# if low_amplitude_periods is not None and len(low_amplitude_periods) == num_original_windows:
|
||
# valid_std = original_window_std[low_amplitude_periods == 0]
|
||
# if len(valid_std) == 0: # 如果所有窗口都在低幅值期间
|
||
# valid_std = original_window_std # 使用全部窗口
|
||
# else:
|
||
# valid_std = original_window_std
|
||
|
||
valid_std = original_window_std ##20250418新修改
|
||
|
||
#---------------------- 方法一:基于STD的体动判定 ----------------------#
|
||
# 计算所有有效窗口标准差的中位数
|
||
median_std = np.median(valid_std)
|
||
|
||
# 当窗口标准差大于中位数的倍数,判定为体动状态
|
||
std_movement = np.where(original_window_std > median_std * std_median_multiplier, 1, 0)
|
||
|
||
#------------------ 方法二:基于前后信号幅值变化的体动判定 ------------------#
|
||
amplitude_movement = np.zeros(num_original_windows, dtype=int)
|
||
|
||
# 定义基于时间粒度的比较间隔索引
|
||
compare_intervals_idx = [int(interval // stride_sec) for interval in compare_intervals_sec]
|
||
|
||
# 逐窗口判断
|
||
for win_idx in range(num_original_windows):
|
||
# 全局索引(在填充后的窗口数组中)
|
||
global_win_idx = win_idx + orig_start_window
|
||
|
||
# 对每个比较间隔进行检查
|
||
for interval_idx in compare_intervals_idx:
|
||
# 确定比较范围的结束索引(在填充后的窗口数组中)
|
||
end_idx = min(global_win_idx + interval_idx, len(window_std))
|
||
|
||
# 提取相应时间范围内的标准差值
|
||
if global_win_idx < end_idx:
|
||
interval_std = window_std[global_win_idx:end_idx]
|
||
|
||
# 计算该间隔的中位数
|
||
interval_median = np.median(interval_std)
|
||
|
||
# 计算上下阈值
|
||
upper_threshold = interval_median * interval_multiplier
|
||
|
||
# 检查当前窗口是否超出阈值范围,如果超出则直接标记为体动
|
||
if window_std[global_win_idx] > upper_threshold:
|
||
amplitude_movement[win_idx] = 1
|
||
break # 一旦确定为体动就不需要继续检查其他间隔
|
||
|
||
# 将两种方法的结果合并:只要其中一种方法判定为体动,最终结果就是体动
|
||
movement_mask = np.logical_or(std_movement, amplitude_movement).astype(int)
|
||
raw_movement_mask = movement_mask
|
||
|
||
# 如果需要合并间隔小的体动状态
|
||
if merge_gap_sec > 0:
|
||
movement_mask = merge_short_gaps(movement_mask, time_points, merge_gap_sec)
|
||
|
||
# 如果需要移除短时体动状态
|
||
if min_duration_sec > 0:
|
||
movement_mask = remove_short_durations(movement_mask, time_points, min_duration_sec)
|
||
|
||
# raw_movement_mask, movement_mask恢复对应秒数,而不是点数
|
||
raw_movement_mask = raw_movement_mask.repeat(stride_sec)[:len(signal_data) // sampling_rate]
|
||
movement_mask = movement_mask.repeat(stride_sec)[:len(signal_data) // sampling_rate]
|
||
|
||
|
||
# 比较剔除的体动,如果被剔除的体动所在区域有高于3std的幅值,则不剔除
|
||
removed_movement_mask = (raw_movement_mask - movement_mask) > 0
|
||
removed_movement_start = np.where(np.diff(np.concatenate([[0], removed_movement_mask])) == 1)[0]
|
||
removed_movement_end = np.where(np.diff(np.concatenate([removed_movement_mask, [0]])) == -1)[0]
|
||
|
||
for start, end in zip(removed_movement_start, removed_movement_end):
|
||
# print(start ,end)
|
||
# 计算剔除的体动区域的幅值
|
||
if np.nanmax(signal_data[start*sampling_rate:(end+1)*sampling_rate]) > median_std * std_median_multiplier:
|
||
movement_mask[start:end+1] = 1
|
||
|
||
# raw体动起止位置 [[start, end], [start, end], ...]
|
||
raw_movement_position_list = event_mask_2_list(raw_movement_mask)
|
||
|
||
# merge体动起止位置 [[start, end], [start, end], ...]
|
||
movement_position_list = event_mask_2_list(movement_mask)
|
||
|
||
return raw_movement_mask, movement_mask, raw_movement_position_list, movement_position_list
|
||
|
||
|
||
|
||
@timing_decorator()
|
||
def detect_low_amplitude_signal(signal_data, sampling_rate, window_size_sec=1, stride_sec=None,
|
||
amplitude_threshold=50, merge_gap_sec=10, min_duration_sec=5):
|
||
"""
|
||
检测信号中的低幅值状态,通过计算RMS值判断信号强度是否低于设定阈值。
|
||
|
||
参数:
|
||
- signal_data: numpy array,输入的信号数据
|
||
- sampling_rate: int,信号的采样率(Hz)
|
||
- window_size_sec: float,分析窗口的时长(秒),默认值为 1 秒
|
||
- stride_sec: float,窗口滑动步长(秒),默认值为None(等于window_size_sec,无重叠)
|
||
- amplitude_threshold: float,RMS阈值,低于此值表示低幅值状态,默认值为 50
|
||
- merge_gap_sec: float,要合并的状态之间的最大间隔(秒),默认值为 10 秒
|
||
- min_duration_sec: float,要保留的状态的最小持续时间(秒),默认值为 5 秒
|
||
|
||
返回:
|
||
- low_amplitude_mask: numpy array,低幅值状态的掩码(1表示低幅值,0表示正常幅值)
|
||
"""
|
||
# 计算窗口大小(样本数)
|
||
window_samples = int(window_size_sec * sampling_rate)
|
||
|
||
# 如果未指定步长,设置为窗口大小(无重叠)
|
||
if stride_sec is None:
|
||
stride_sec = window_size_sec
|
||
|
||
# 计算步长(样本数)
|
||
stride_samples = int(stride_sec * sampling_rate)
|
||
|
||
# 确保步长至少为1
|
||
stride_samples = max(sampling_rate, stride_samples)
|
||
|
||
# 处理信号边界,使用反射填充
|
||
pad_size = window_samples // 2
|
||
padded_signal = np.pad(signal_data, pad_size, mode='reflect')
|
||
|
||
# 计算填充后的窗口数量
|
||
num_windows = max(1, (len(padded_signal) - window_samples) // stride_samples + 1)
|
||
|
||
# 初始化RMS值数组
|
||
rms_values = np.zeros(num_windows)
|
||
|
||
# 计算每个窗口的RMS值
|
||
for i in range(num_windows):
|
||
start_idx = i * stride_samples
|
||
end_idx = min(start_idx + window_samples, len(signal_data))
|
||
|
||
# 处理窗口,包括可能不完整的最后一个窗口
|
||
window_data = signal_data[start_idx:end_idx]
|
||
if len(window_data) > 0:
|
||
rms_values[i] = np.sqrt(np.mean(window_data ** 2))
|
||
else:
|
||
rms_values[i] = 0
|
||
|
||
# 生成初始低幅值掩码:RMS低于阈值的窗口标记为1(低幅值),其他为0
|
||
low_amplitude_mask = np.where(rms_values <= amplitude_threshold, 1, 0)
|
||
|
||
# 计算原始信号对应的窗口索引范围
|
||
orig_start_window = pad_size // stride_samples
|
||
if stride_sec == 1:
|
||
orig_end_window = orig_start_window + (len(signal_data) // stride_samples)
|
||
else:
|
||
orig_end_window = orig_start_window + (len(signal_data) // stride_samples) + 1
|
||
|
||
# 只保留原始信号对应的窗口低幅值掩码
|
||
low_amplitude_mask = low_amplitude_mask[orig_start_window:orig_end_window]
|
||
# print("low_amplitude_mask_length: ", len(low_amplitude_mask))
|
||
num_original_windows = len(low_amplitude_mask)
|
||
|
||
# 转换为时间轴上的状态序列
|
||
# 计算每个窗口对应的时间点(秒)
|
||
time_points = np.arange(num_original_windows) * stride_sec
|
||
|
||
# 如果需要合并间隔小的状态
|
||
if merge_gap_sec > 0:
|
||
low_amplitude_mask = merge_short_gaps(low_amplitude_mask, time_points, merge_gap_sec)
|
||
|
||
# 如果需要移除短时状态
|
||
if min_duration_sec > 0:
|
||
low_amplitude_mask = remove_short_durations(low_amplitude_mask, time_points, min_duration_sec)
|
||
|
||
low_amplitude_mask = low_amplitude_mask.repeat(stride_sec)[:len(signal_data) // sampling_rate]
|
||
|
||
# 低幅值状态起止位置 [[start, end], [start, end], ...]
|
||
low_amplitude_position_list = event_mask_2_list(low_amplitude_mask)
|
||
|
||
return low_amplitude_mask, low_amplitude_position_list
|
||
|
||
|
||
def get_typical_segment_for_continues_signal(signal_data, sampling_rate=100, window_size=30, step_size=1):
|
||
"""
|
||
获取十个片段
|
||
:param signal_data: 信号数据
|
||
:param sampling_rate: 采样率
|
||
:param window_size: 窗口大小(秒)
|
||
:param step_size: 步长(秒)
|
||
:return: 典型片段列表
|
||
"""
|
||
pass
|
||
|
||
|
||
# 基于体动位置和幅值的睡姿识别
|
||
# 主要是依靠体动mask,将整夜分割成多个有效片段,然后每个片段计算幅值指标,判断两个片段的幅值指标是否存在显著差异,如果存在显著差异,则认为存在睡姿变化
|
||
# 考虑到每个片段长度为10s,所以每个片段的幅值指标计算时间长度为10s,然后计算每个片段的幅值指标
|
||
# 仅对比相邻片段的幅值指标,如果存在显著差异,则认为存在睡姿变化,即每个体动相邻的30秒内存在睡姿变化,如果片段不足30秒,则按实际长度对比
|
||
|
||
@timing_decorator()
|
||
def position_based_sleep_recognition_v1(signal_data, movement_mask, sampling_rate=100, window_size_sec=30,
|
||
interval_to_movement=10):
|
||
mav_calc_window_sec = 2 # 计算mav的窗口大小,单位秒
|
||
# 获取有效片段起止位置
|
||
valid_mask = 1 - movement_mask
|
||
valid_starts = np.where(np.diff(np.concatenate([[0], valid_mask])) == 1)[0]
|
||
valid_ends = np.where(np.diff(np.concatenate([valid_mask, [0]])) == -1)[0]
|
||
|
||
movement_start = np.where(np.diff(np.concatenate([[0], movement_mask])) == 1)[0]
|
||
movement_end = np.where(np.diff(np.concatenate([movement_mask, [0]])) == -1)[0]
|
||
|
||
segment_left_average_amplitude = []
|
||
segment_right_average_amplitude = []
|
||
segment_left_average_energy = []
|
||
segment_right_average_energy = []
|
||
|
||
# window_samples = int(window_size_sec * sampling_rate)
|
||
# pad_size = window_samples // 2
|
||
# padded_signal = np.pad(signal_data, pad_size, mode='reflect')
|
||
|
||
for start, end in zip(valid_starts, valid_ends):
|
||
start *= sampling_rate
|
||
end *= sampling_rate
|
||
# 避免过短的片段
|
||
if end - start <= sampling_rate: # 小于1秒的片段不考虑
|
||
continue
|
||
# 获取当前片段数据
|
||
|
||
|
||
elif end - start < (window_size_sec * interval_to_movement + 1) * sampling_rate:
|
||
left_start = start
|
||
left_end = min(end, left_start + window_size_sec * sampling_rate)
|
||
right_start = max(start, end - window_size_sec * sampling_rate)
|
||
right_end = end
|
||
else:
|
||
left_start = start + interval_to_movement * sampling_rate
|
||
left_end = left_start + window_size_sec * sampling_rate
|
||
right_start = end - interval_to_movement * sampling_rate - window_size_sec * sampling_rate
|
||
right_end = end
|
||
|
||
# 新的end - start确保为200的整数倍
|
||
if (left_end - left_start) % (mav_calc_window_sec * sampling_rate) != 0:
|
||
left_end = left_start + ((left_end - left_start) // (mav_calc_window_sec * sampling_rate)) * (
|
||
mav_calc_window_sec * sampling_rate)
|
||
if (right_end - right_start) % (mav_calc_window_sec * sampling_rate) != 0:
|
||
right_end = right_start + ((right_end - right_start) // (mav_calc_window_sec * sampling_rate)) * (
|
||
mav_calc_window_sec * sampling_rate)
|
||
|
||
# 计算每个片段的幅值指标
|
||
left_mav = np.mean(np.max(signal_data[left_start:left_end].reshape(-1, mav_calc_window_sec * sampling_rate),
|
||
axis=0)) - np.mean(
|
||
np.min(signal_data[left_start:left_end].reshape(-1, mav_calc_window_sec * sampling_rate), axis=0))
|
||
right_mav = np.mean(
|
||
np.max(signal_data[right_start:right_end].reshape(-1, mav_calc_window_sec * sampling_rate),
|
||
axis=0)) - np.mean(
|
||
np.min(signal_data[right_start:right_end].reshape(-1, mav_calc_window_sec * sampling_rate), axis=0))
|
||
segment_left_average_amplitude.append(left_mav)
|
||
segment_right_average_amplitude.append(right_mav)
|
||
|
||
left_energy = np.sum(np.abs(signal_data[left_start:left_end] ** 2))
|
||
right_energy = np.sum(np.abs(signal_data[right_start:right_end] ** 2))
|
||
segment_left_average_energy.append(left_energy)
|
||
segment_right_average_energy.append(right_energy)
|
||
|
||
position_changes = []
|
||
position_change_times = []
|
||
# 判断是否存在显著变化 (可根据实际情况调整阈值)
|
||
threshold_amplitude = 0.1 # 幅值变化阈值
|
||
threshold_energy = 0.1 # 能量变化阈值
|
||
|
||
for i in range(1, len(segment_left_average_amplitude)):
|
||
# 计算幅值指标的变化率
|
||
left_amplitude_change = abs(segment_left_average_amplitude[i] - segment_left_average_amplitude[i - 1]) / max(
|
||
segment_left_average_amplitude[i - 1], 1e-6)
|
||
right_amplitude_change = abs(segment_right_average_amplitude[i] - segment_right_average_amplitude[i - 1]) / max(
|
||
segment_right_average_amplitude[i - 1], 1e-6)
|
||
|
||
# 计算能量指标的变化率
|
||
left_energy_change = abs(segment_left_average_energy[i] - segment_left_average_energy[i - 1]) / max(
|
||
segment_left_average_energy[i - 1], 1e-6)
|
||
right_energy_change = abs(segment_right_average_energy[i] - segment_right_average_energy[i - 1]) / max(
|
||
segment_right_average_energy[i - 1], 1e-6)
|
||
|
||
# 如果左右通道中的任一通道同时满足幅值和能量的变化阈值,则认为存在姿势变化
|
||
left_significant_change = (left_amplitude_change > threshold_amplitude) and (
|
||
left_energy_change > threshold_energy)
|
||
right_significant_change = (right_amplitude_change > threshold_amplitude) and (
|
||
right_energy_change > threshold_energy)
|
||
|
||
if left_significant_change or right_significant_change:
|
||
# 记录姿势变化发生的时间点 用当前分割的体动的起始位置和结束位置表示
|
||
position_changes.append(1)
|
||
position_change_times.append((movement_start[i - 1], movement_end[i - 1]))
|
||
else:
|
||
position_changes.append(0) # 0表示不存在姿势变化
|
||
|
||
return position_changes, position_change_times
|
||
|
||
|
||
def position_based_sleep_recognition_v2(signal_data, movement_mask, sampling_rate=100):
|
||
"""
|
||
|
||
:param signal_data:
|
||
:param movement_mask: mask的采样率为1Hz
|
||
:param sampling_rate:
|
||
:param window_size_sec:
|
||
:return:
|
||
"""
|
||
mav_calc_window_sec = 2 # 计算mav的窗口大小,单位秒
|
||
# 获取有效片段起止位置
|
||
valid_mask = 1 - movement_mask
|
||
valid_starts = np.where(np.diff(np.concatenate([[0], valid_mask])) == 1)[0]
|
||
valid_ends = np.where(np.diff(np.concatenate([valid_mask, [0]])) == -1)[0]
|
||
|
||
movement_start = np.where(np.diff(np.concatenate([[0], movement_mask])) == 1)[0]
|
||
movement_end = np.where(np.diff(np.concatenate([movement_mask, [0]])) == -1)[0]
|
||
|
||
segment_average_amplitude = []
|
||
segment_average_energy = []
|
||
|
||
for start, end in zip(valid_starts, valid_ends):
|
||
start *= sampling_rate
|
||
end *= sampling_rate
|
||
# 避免过短的片段
|
||
if end - start <= sampling_rate: # 小于1秒的片段不考虑
|
||
continue
|
||
|
||
# 新的end - start确保为200的整数倍
|
||
if (end - start) % (mav_calc_window_sec * sampling_rate) != 0:
|
||
end = start + ((end - start) // (mav_calc_window_sec * sampling_rate)) * (
|
||
mav_calc_window_sec * sampling_rate)
|
||
|
||
# 计算每个片段的幅值指标
|
||
mav = np.mean(
|
||
np.max(signal_data[start:end].reshape(-1, mav_calc_window_sec * sampling_rate), axis=0)) - np.mean(
|
||
np.min(signal_data[start:end].reshape(-1, mav_calc_window_sec * sampling_rate), axis=0))
|
||
segment_average_amplitude.append(mav)
|
||
|
||
energy = np.sum(np.abs(signal_data[start:end] ** 2))
|
||
segment_average_energy.append(energy)
|
||
|
||
position_changes = np.zeros(len(signal_data) // sampling_rate, dtype=int)
|
||
position_change_times = []
|
||
# 判断是否存在显著变化 (可根据实际情况调整阈值)
|
||
threshold_amplitude = 0.1 # 幅值变化阈值
|
||
threshold_energy = 0.1 # 能量变化阈值
|
||
|
||
for i in range(1, len(segment_average_amplitude)):
|
||
# 计算幅值指标的变化率
|
||
amplitude_change = abs(segment_average_amplitude[i] - segment_average_amplitude[i - 1]) / max(
|
||
segment_average_amplitude[i - 1], 1e-6)
|
||
|
||
# 计算能量指标的变化率
|
||
energy_change = abs(segment_average_energy[i] - segment_average_energy[i - 1]) / max(
|
||
segment_average_energy[i - 1], 1e-6)
|
||
|
||
significant_change = (amplitude_change > threshold_amplitude) and (energy_change > threshold_energy)
|
||
|
||
if significant_change:
|
||
# 记录姿势变化发生的时间点 用当前分割的体动的起始位置和结束位置表示
|
||
position_changes[movement_start[i - 1]:movement_end[i - 1]] = 1
|
||
position_change_times.append((movement_start[i - 1], movement_end[i - 1]))
|
||
|
||
return position_changes, position_change_times
|