PPG_SpO2/spo2_pipeline.py

115 lines
3.6 KiB
Python

import numpy as np
import pandas as pd
from scipy.signal import butter, filtfilt, find_peaks
def bandpass_filter(x, fs, low=0.5, high=5.0, order=3):
nyq = 0.5 * fs
lown = low / nyq
highn = high / nyq
b, a = butter(order, [lown, highn], btype='band')
y = filtfilt(b, a, x)
return y
def detrend(x):
return x - np.mean(x)
def detect_peaks(signal, fs, hr_min=40, hr_max=220):
min_dist = int(np.floor(fs * 60.0 / hr_max))
prominence = 0.3 * np.std(signal)
peaks, props = find_peaks(signal, distance=min_dist, prominence=prominence)
return peaks, props
def compute_ac_dc_per_beat(signal, peaks, fs):
n = len(signal)
AC, DC, times, beat_windows = [], [], [], []
for i, p in enumerate(peaks):
if i == 0:
start = 0
else:
start = (peaks[i-1] + p) // 2
if i == len(peaks)-1:
end = n-1
else:
end = (p + peaks[i+1]) // 2
segment = signal[start:end+1]
if segment.size < 3:
AC.append(np.nan); DC.append(np.nan)
times.append(p / fs)
beat_windows.append((start, end))
continue
seg_max = np.max(segment)
seg_min = np.min(segment)
ac = (seg_max - seg_min) / 2.0
dc = np.mean(segment)
AC.append(ac); DC.append(dc)
times.append(p / fs)
beat_windows.append((start, end))
return np.array(AC), np.array(DC), np.array(times), beat_windows
def compute_spo2_from_R(R, A=104.0, B=17.0):
return A - B * R
def calibrate_linear(R_vals, spo2_ref):
mask = ~np.isnan(R_vals) & ~np.isnan(spo2_ref)
if mask.sum() < 2:
return None
coeffs = np.polyfit(R_vals[mask], spo2_ref[mask], 1)
c1, c0 = coeffs[0], coeffs[1]
A = c0; B = -c1
return float(A), float(B)
def spo2_pipeline(red, ir, fs=25, calibrate_with_ref=None, A=104.0, B=17.0):
assert red.shape == ir.shape, "red and ir must have same shape"
red_f = bandpass_filter(red, fs)
ir_f = bandpass_filter(ir, fs)
red_d = detrend(red_f)
ir_d = detrend(ir_f)
peaks, props = detect_peaks(ir_d, fs)
ac_red, dc_red, times, _ = compute_ac_dc_per_beat(red_d, peaks, fs)
ac_ir, dc_ir, _, _ = compute_ac_dc_per_beat(ir_d, peaks, fs)
with np.errstate(divide='ignore', invalid='ignore'):
ratio_red = ac_red / dc_red
ratio_ir = ac_ir / dc_ir
R = ratio_red / ratio_ir
spo2_vals = compute_spo2_from_R(R, A=A, B=B)
used_A, used_B = A, B
# if calibrate_with_ref is not None:
# if len(calibrate_with_ref) != len(R):
# raise ValueError("Reference SpO2 length mismatch with beats.")
# ref = np.array(calibrate_with_ref)
# fit = calibrate_linear(R, ref)
# if fit is not None:
# used_A, used_B = fit
# spo2_vals = compute_spo2_from_R(R, A=used_A, B=used_B)
df = pd.DataFrame({
"time_s": times,
"AC_red": ac_red,
"DC_red": dc_red,
"AC_ir": ac_ir,
"DC_ir": dc_ir,
"R": R,
"SpO2": spo2_vals
})
df['valid'] = (
(~np.isnan(df['R'])) &
(df['AC_red']>0) & (df['AC_ir']>0) &
(df['DC_red']>0) & (df['DC_ir']>0) &
(df['SpO2']>50) & (df['SpO2']<100)
)
summary = {
"n_beats": len(df),
"n_valid_beats": int(df['valid'].sum()),
"mean_spo2": float(np.nanmean(df['SpO2'][df['valid']])) if df['valid'].any() else float('nan'),
"median_spo2": float(np.nanmedian(df['SpO2'][df['valid']])) if df['valid'].any() else float('nan'),
"used_A": used_A,
"used_B": used_B
}
return df, summary