241 lines
7.3 KiB
Python
241 lines
7.3 KiB
Python
import numpy as np
|
||
import pandas as pd
|
||
from scipy import signal
|
||
from matplotlib import pyplot as plt
|
||
|
||
def calculate_hr_spo2_zhihu(X, fs, FFT_size=512):
|
||
"""
|
||
X : ndarray, shape (N, 2) # 第0列 RED,第1列 IR
|
||
fs : 采样率 (Hz)
|
||
FFT_size : FFT 点数(建议 2 的幂)
|
||
返回值 : (Heart_Rate, SpO2_Level) 已经取整
|
||
https://zhuanlan.zhihu.com/p/658858641
|
||
https://github.com/thinkng/ppgprocessing/blob/master/HR_SpO2_Estimation.m
|
||
"""
|
||
X = np.asarray(X)
|
||
if X.ndim != 2 or X.shape[1] != 2:
|
||
raise ValueError("X 必须是 (样本数, 2) 的数组,列0=RED,列1=IR")
|
||
|
||
# 计算能处理的完整窗口数量(每个窗口长度 = FFT_size)
|
||
step = fs # MATLAB 中是 n*fs 开始,步长为 fs(1 秒)
|
||
n_windows = int((len(X) / (2 * fs)) - 2) # 与原 MATLAB 一致
|
||
|
||
HEART_RATE = np.zeros(n_windows)
|
||
SpO2 = np.zeros(n_windows)
|
||
|
||
for n in range(n_windows):
|
||
start_idx = n * step
|
||
end_idx = start_idx + FFT_size
|
||
y1 = X[start_idx:end_idx, 0] # RED
|
||
y2 = X[start_idx:end_idx, 1] # IR
|
||
|
||
# ---------- FFT RED ----------
|
||
Y1 = np.fft.fft(y1, n=FFT_size)
|
||
Y1_abs = np.abs(Y1[:FFT_size//2 + 1])
|
||
f1 = fs / 2 * np.linspace(0, 1, FFT_size//2 + 1)
|
||
|
||
# ---------- FFT IR ----------
|
||
Y2 = np.fft.fft(y2, n=FFT_size)
|
||
Y2_abs = np.abs(Y2[:FFT_size//2 + 1])
|
||
f2 = f1.copy() # 与 f1 完全相同
|
||
|
||
# ---------- 在 0.5~2.5 Hz(对应心率 30~150 bpm)范围内找局部最大 ----------
|
||
# MATLAB 中索引 6:12 对应频率大约 0.6~1.4 Hz(取决于 FFT_size/fs)
|
||
# 这里统一取频率 0.5~2.5 Hz 对应的索引
|
||
idx_range = np.where((f1 >= 0.5) & (f1 <= 2.5))[0]
|
||
|
||
# RED 峰值索引
|
||
segment_red = Y1_abs[idx_range]
|
||
local_max_i = np.argmax(segment_red)
|
||
pk_RED_i = idx_range[local_max_i]
|
||
|
||
# IR 峰值索引
|
||
segment_ir = Y2_abs[idx_range]
|
||
local_max_i = np.argmax(segment_ir)
|
||
pk_IR_i = idx_range[local_max_i]
|
||
|
||
# ---------- 心率 ----------
|
||
heart_rate_bpm = f2[pk_IR_i] * 60
|
||
HEART_RATE[n] = heart_rate_bpm
|
||
|
||
# ---------- SpO2 ----------
|
||
R_RED = Y1_abs[pk_RED_i] / (Y1_abs[0] + 1e-12) # 防止除以 0
|
||
R_IR = Y2_abs[pk_IR_i] / (Y2_abs[0] + 1e-12)
|
||
R = R_RED / R_IR
|
||
spo2 = 104 - 28 * R
|
||
SpO2[n] = spo2
|
||
|
||
# 去掉首尾(与原 MATLAB 相同)
|
||
if len(HEART_RATE) > 2:
|
||
HR_mean = np.mean(HEART_RATE[1:-1])
|
||
SpO2_mean = np.mean(SpO2[1:-1])
|
||
else:
|
||
HR_mean = np.mean(HEART_RATE)
|
||
SpO2_mean = np.mean(SpO2)
|
||
|
||
Heart_Rate = round(HR_mean)
|
||
SpO2_Level = round(SpO2_mean)
|
||
|
||
return Heart_Rate, SpO2_Level
|
||
|
||
|
||
|
||
def _culculate_spo2(ir_list_data, red_list_data):
|
||
ir_dc = min(ir_list_data)
|
||
red_dc = min(red_list_data)
|
||
ir_ac = max(ir_list_data) - ir_dc
|
||
red_ac = max(red_list_data) - red_dc
|
||
temp1 = ir_ac * red_dc
|
||
if temp1 < 1:
|
||
temp1 = 1
|
||
R2 = (red_ac * ir_dc) / temp1
|
||
SPO2 = -45.060 * R2 * R2 + 30.354 * R2 + 94.845
|
||
if SPO2 > 100 or SPO2 < 0:
|
||
SPO2 = 0
|
||
return SPO2
|
||
|
||
def _culculate_HR(ir_list_data_filtered, data_list_time):
|
||
HR_num = signal.find_peaks(ir_list_data_filtered, distance=10)[0]
|
||
time = data_list_time[-1] -data_list_time[0]
|
||
HR = len(HR_num) / (time / 1000) * 60
|
||
return HR
|
||
|
||
|
||
def process_signal(signal_segment, fs, highpass=True):
|
||
if highpass:
|
||
h_b, h_a = signal.butter(N=8, Wn=1/(fs/2), btype="highpass", output="ba")
|
||
data = signal.filtfilt(h_b, h_a, signal_segment, axis=0)
|
||
else:
|
||
data = signal_segment
|
||
data = signal.detrend(data,
|
||
axis=0,
|
||
type='linear',
|
||
bp=0,
|
||
overwrite_data=False)
|
||
return data
|
||
|
||
|
||
def ppg2spo2_pipeline(red, ir, fs=25):
|
||
"""
|
||
采用滑窗分析法计算心率和血氧饱和度
|
||
每秒中输出结果
|
||
|
||
red : ndarray, 红光信号
|
||
ir : ndarray, 红外光信号
|
||
fs : 采样率 (Hz)
|
||
"""
|
||
|
||
red = np.asarray(red).reshape(-1)
|
||
ir = np.asarray(ir).reshape(-1)
|
||
|
||
|
||
if len(red) != len(ir):
|
||
raise ValueError("红光和红外光信号长度必须相同")
|
||
|
||
red_filtered = process_signal(red, fs, highpass=False)
|
||
ir_filtered = process_signal(ir, fs, highpass=True)
|
||
|
||
bpm_list_data = []
|
||
spo2_list_data = []
|
||
temp_bpm_list_data = []
|
||
temp_spo2_list_data = []
|
||
|
||
for i in range(len(red)//fs - 1):
|
||
red_segment = red_filtered[i*fs:(i+1)*fs]
|
||
ir_segment = ir_filtered[i*fs:(i+1)*fs]
|
||
spo2 = _culculate_spo2(ir_segment, red_segment)
|
||
bpm = _culculate_HR(ir_segment, np.arange(i*fs, (i+1)*fs) * (1000/fs))
|
||
temp_bpm_list_data.append(bpm)
|
||
temp_spo2_list_data.append(spo2)
|
||
# matlab
|
||
|
||
# python
|
||
|
||
|
||
plt.figure(figsize=(10, 5))
|
||
timestamp = np.linspace(0, len(red_filtered)/fs, len(red_filtered))
|
||
ax1 = plt.subplot(211)
|
||
plt.plot( timestamp, red, label='Red Signal', color='red', alpha=0.5)
|
||
plt.plot(timestamp,ir, label='IR Signal', color='blue', alpha=0.5)
|
||
plt.title('Raw PPG Signals')
|
||
plt.xlabel("seconds")
|
||
plt.ylabel('Amplitude')
|
||
plt.legend()
|
||
plt.subplot(212, sharex=ax1)
|
||
plt.plot(timestamp,red_filtered, label='Filtered Red Signal', color='red', alpha=0.5)
|
||
plt.plot(timestamp,ir_filtered, label='Filtered IR Signal', color='blue', alpha=0.5)
|
||
plt.title('Filtered PPG Signals')
|
||
plt.xlabel("seconds")
|
||
plt.ylabel('Amplitude')
|
||
plt.legend()
|
||
plt.show()
|
||
|
||
|
||
|
||
def oximetry_fft(red, ir, fs=25):
|
||
|
||
fs = fs
|
||
n_fft = 128
|
||
window_size = fs // 2
|
||
|
||
ir_filtered = np.convolve(ir, np.ones(window_size)/window_size, mode='valid')
|
||
|
||
segment_len = fs
|
||
peaks_idx = []
|
||
|
||
for start in range(0, len(ir_filtered) - segment_len + 1, segment_len):
|
||
segment = ir_filtered[start:start + segment_len]
|
||
# 用 scipy.find_peaks 找局部最大值(这里只取最高峰)
|
||
pk, _ = signal.find_peaks(segment, height=None)
|
||
if len(pk) > 0:
|
||
# 取幅度最大的那个峰
|
||
best_pk = pk[np.argmax(segment[pk])]
|
||
peaks_idx.append(start + best_pk)
|
||
peaks_idx = np.array(peaks_idx)
|
||
|
||
peaks_idx = np.array(peaks_idx)
|
||
|
||
# 1.3 计算心率(跳过明显错误的相邻峰值间隔)
|
||
if len(peaks_idx) >= 2:
|
||
diffs = np.diff(peaks_idx) # 相邻峰之间的采样点数
|
||
valid = diffs >= 10 # 过滤掉过于靠近的误检(原代码的 -10 条件)
|
||
beat_intervals_sec = diffs[valid] / fs # 转为秒
|
||
HEART_RATE = 60.0 / beat_intervals_sec.mean()
|
||
else:
|
||
HEART_RATE = np.nan
|
||
|
||
print(f"Heart Rate: {HEART_RATE:.1f} bpm")
|
||
|
||
freqs = np.fft.rfftfreq(n_fft, d=1 / fs)
|
||
f_min = 0.7
|
||
f_max = 2.0
|
||
idx_range = np.where((freqs >= f_min) & (freqs <= f_max))[0]
|
||
|
||
Y1 = np.fft.rfft(red - red.mean(), n=n_fft) # 去直流后再 FFT
|
||
mag1 = np.abs(Y1)
|
||
|
||
Y2 = np.fft.rfft(ir - ir.mean(), n=n_fft)
|
||
mag2 = np.abs(Y2)
|
||
|
||
peak_idx_red = idx_range[np.argmax(mag1[idx_range])]
|
||
peak_idx_ir = idx_range[np.argmax(mag2[idx_range])]
|
||
|
||
AC_red = mag1[peak_idx_red]
|
||
DC_red = mag1[0]
|
||
AC_ir = mag2[peak_idx_ir]
|
||
DC_ir = mag2[0]
|
||
|
||
R = (AC_red / DC_red) / (AC_ir / DC_ir)
|
||
SpO2 = 104 - 28 * R
|
||
|
||
SpO2 = np.clip(SpO2, 0, 100)
|
||
|
||
print(f"SpO2: {SpO2:.1f} %")
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|