DataPrepare/utils/statistics_metrics.py

106 lines
4.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from utils.operation_tools import timing_decorator
import numpy as np
import pandas as pd
@timing_decorator()
def statistic_amplitude_metrics(data, aml_interval=None, time_interval=None):
"""
计算不同幅值区间占比和时间,最后汇总成混淆矩阵
参数:
data: 采样率为1秒的一维序列其中体动所在的区域用np.nan填充
aml_interval: 幅值区间的分界点列表,默认为[200, 500, 1000, 2000]
time_interval: 时间区间的分界点列表,单位为秒,默认为[60, 300, 1800, 3600]
返回:
confusion_matrix: 幅值-时长统计矩阵
summary: 汇总统计信息
"""
if aml_interval is None:
aml_interval = [200, 500, 1000, 2000]
if time_interval is None:
time_interval = [60, 300, 1800, 3600]
# 检查输入
if not isinstance(data, np.ndarray):
data = np.array(data)
# 整个记录的时长包括nan
total_duration = len(data)
# 创建幅值标签和时间标签
amp_labels = [f"0-{aml_interval[0]}"]
for i in range(len(aml_interval) - 1):
amp_labels.append(f"{aml_interval[i]}-{aml_interval[i + 1]}")
amp_labels.append(f"{aml_interval[-1]}+")
time_labels = [f"0-{time_interval[0]}"]
for i in range(len(time_interval) - 1):
time_labels.append(f"{time_interval[i]}-{time_interval[i + 1]}")
time_labels.append(f"{time_interval[-1]}+")
# 初始化结果矩阵(时长)和片段数矩阵
result_matrix = np.zeros((len(amp_labels), len(time_labels))) # 时长矩阵
segment_count_matrix = np.zeros((len(amp_labels), len(time_labels))) # 片段数矩阵
# 有效信号总量非NaN的数据点数量
valid_signal_length = np.sum(~np.isnan(data))
# 添加信号开始和结束的边界条件
signal_padded = np.concatenate(([np.nan], data, [np.nan]))
diff = np.diff(np.isnan(signal_padded).astype(int))
# 连续片段的起始位置(从 nan 变为非 nan
segment_starts = np.where(diff == -1)[0]
# 连续片段的结束位置(从非 nan 变为 nan
segment_ends = np.where(diff == 1)[0]
# 计算每个片段的时长和平均幅值,并填充结果矩阵
for start, end in zip(segment_starts, segment_ends):
segment = data[start:end]
duration = end - start # 时长(单位:秒)
mean_amplitude = np.nanmean(segment) # 片段平均幅值
# 确定幅值区间
if mean_amplitude <= aml_interval[0]:
amp_idx = 0
elif mean_amplitude > aml_interval[-1]:
amp_idx = len(aml_interval)
else:
amp_idx = np.searchsorted(aml_interval, mean_amplitude)
# 确定时长区间
if duration <= time_interval[0]:
time_idx = 0
elif duration > time_interval[-1]:
time_idx = len(time_interval)
else:
time_idx = np.searchsorted(time_interval, duration)
# 在对应位置累加该片段的时长和片段数
result_matrix[amp_idx, time_idx] += duration
segment_count_matrix[amp_idx, time_idx] += 1 # 片段数加1
# 创建DataFrame以便于展示和后续处理
confusion_matrix = pd.DataFrame(result_matrix, index=amp_labels, columns=time_labels)
# 计算行和列的总和
confusion_matrix['总计'] = confusion_matrix.sum(axis=1)
row_totals = confusion_matrix['总计'].copy()
# 计算百分比(相对于有效记录时长)
confusion_matrix_percent = confusion_matrix.div(total_duration) * 100
# 汇总统计
summary = {
'total_duration': total_duration,
'total_valid_signal': valid_signal_length,
'amplitude_distribution': row_totals.to_dict(),
'amplitude_percent': row_totals.div(total_duration) * 100,
'time_distribution': confusion_matrix.sum(axis=0).to_dict(),
'time_percent': confusion_matrix.sum(axis=0).div(total_duration) * 100
}
return summary, (confusion_matrix, segment_count_matrix, confusion_matrix_percent, valid_signal_length,
total_duration, time_labels, amp_labels)