From 071a59997dfafcdfcf05b6b715dacaa1af72204a Mon Sep 17 00:00:00 2001 From: marques Date: Wed, 19 Nov 2025 16:35:09 +0800 Subject: [PATCH] Implement oximetry FFT function for heart rate and SpO2 calculation --- func.py | 109 +++++++++++++++++++++++++++----------------------------- 1 file changed, 53 insertions(+), 56 deletions(-) diff --git a/func.py b/func.py index 803131c..7fc0a9a 100644 --- a/func.py +++ b/func.py @@ -172,70 +172,67 @@ def ppg2spo2_pipeline(red, ir, fs=25): +def oximetry_fft(red, ir, fs=25): -def func_1(red, ir, fs=25): - # Processing_PPG_Signal - # make a move window find min and max of ArrayIR - def movmin1(A, k): - x = A.rolling(k, min_periods=1, center=True).min().to_numpy() # - return x + fs = fs + n_fft = 128 + window_size = fs // 2 - def movmax1(A, k): - x = A.rolling(k, min_periods=1, center=True).max().to_numpy() - return x + ir_filtered = np.convolve(ir, np.ones(window_size)/window_size, mode='valid') - ArrayIR = pd.DataFrame(ir) - ArrayRed = pd.DataFrame(red) + segment_len = fs + peaks_idx = [] - # calculate ac/dc ir - max_ir = movmax1(ArrayIR, fs) - # print(f"max_ir: {max_ir}") - min_ir = movmin1(ArrayIR, fs) - # print(f"min_ir: {min_ir}") - baseline_data_ir = (max_ir + min_ir) / 2 - # print(f"baseline_data_ir: {baseline_data_ir}") - acDivDcIr = (max_ir - min_ir) / baseline_data_ir + for start in range(0, len(ir_filtered) - segment_len + 1, segment_len): + segment = ir_filtered[start:start + segment_len] + # 用 scipy.find_peaks 找局部最大值(这里只取最高峰) + pk, _ = signal.find_peaks(segment, height=None) + if len(pk) > 0: + # 取幅度最大的那个峰 + best_pk = pk[np.argmax(segment[pk])] + peaks_idx.append(start + best_pk) + peaks_idx = np.array(peaks_idx) - # calculate ac/dc red - max_red = movmax1(ArrayRed, fs) - min_red = movmin1(ArrayRed, fs) - baseline_data_red = (max_red + min_red) / 2 - acDivDcRed = (max_red - min_red) / baseline_data_red + peaks_idx = np.array(peaks_idx) - # Plot SPO2 = 110-25*(ac/dc_red)/(ac/dc_ir) - SPO2 = 110 - 25 * (acDivDcRed / acDivDcIr) - # plt.figure("SPO2") - timestamp = np.linspace(0, len(red) / fs, len(red)) - plt.figure(figsize=(10, 5)) - ax1 = plt.subplot(311) - plt.plot(timestamp, red, label='Red Signal', color='red', alpha=0.5) - plt.plot(timestamp, ir, label='IR Signal', color='blue', alpha=0.5) - plt.title('Raw PPG Signals') - plt.xlabel("seconds") - plt.ylabel('Amplitude') - plt.legend() + # 1.3 计算心率(跳过明显错误的相邻峰值间隔) + if len(peaks_idx) >= 2: + diffs = np.diff(peaks_idx) # 相邻峰之间的采样点数 + valid = diffs >= 10 # 过滤掉过于靠近的误检(原代码的 -10 条件) + beat_intervals_sec = diffs[valid] / fs # 转为秒 + HEART_RATE = 60.0 / beat_intervals_sec.mean() + else: + HEART_RATE = np.nan + + print(f"Heart Rate: {HEART_RATE:.1f} bpm") + + freqs = np.fft.rfftfreq(n_fft, d=1 / fs) + f_min = 0.7 + f_max = 2.0 + idx_range = np.where((freqs >= f_min) & (freqs <= f_max))[0] + + Y1 = np.fft.rfft(red - red.mean(), n=n_fft) # 去直流后再 FFT + mag1 = np.abs(Y1) + + Y2 = np.fft.rfft(ir - ir.mean(), n=n_fft) + mag2 = np.abs(Y2) + + peak_idx_red = idx_range[np.argmax(mag1[idx_range])] + peak_idx_ir = idx_range[np.argmax(mag2[idx_range])] + + AC_red = mag1[peak_idx_red] + DC_red = mag1[0] + AC_ir = mag2[peak_idx_ir] + DC_ir = mag2[0] + + R = (AC_red / DC_red) / (AC_ir / DC_ir) + SpO2 = 104 - 28 * R + + SpO2 = np.clip(SpO2, 0, 100) + + print(f"SpO2: {SpO2:.1f} %") - plt.subplot(312, sharex=ax1) - plt.plot(timestamp, red - baseline_data_red, label='Detrended Red Signal', color='red', alpha=0.5) - plt.plot(timestamp, ir - baseline_data_ir, label='Detrended IR Signal', color='blue', alpha=0.5) - plt.title('Detrended PPG Signals') - plt.xlabel("seconds") - plt.ylabel('Amplitude') - plt.legend() - plt.subplot(313, sharex=ax1) - plt.plot(timestamp,acDivDcRed, label='AC/DC Red', color='red', alpha=0.5) - plt.plot(timestamp,acDivDcIr, label='AC/DC IR', color='blue', alpha=0.5) - plt.title('AC/DC Ratios') - plt.xlabel("seconds") - plt.ylabel('Ratio') - plt.legend() - plt.show() - plt.xlabel("Samples") - plt.ylabel("SPO2") - plt.title("SPO2") - plt.plot(timestamp, SPO2) - plt.show()