import numpy as np import pandas as pd from scipy.signal import butter, filtfilt, find_peaks def bandpass_filter(x, fs, low=0.5, high=5.0, order=3): nyq = 0.5 * fs lown = low / nyq highn = high / nyq b, a = butter(order, [lown, highn], btype='band') y = filtfilt(b, a, x) return y def detrend(x): return x - np.mean(x) def detect_peaks(signal, fs, hr_min=40, hr_max=220): min_dist = int(np.floor(fs * 60.0 / hr_max)) prominence = 0.3 * np.std(signal) peaks, props = find_peaks(signal, distance=min_dist, prominence=prominence) return peaks, props def compute_ac_dc_per_beat(signal, peaks, fs): n = len(signal) AC, DC, times, beat_windows = [], [], [], [] for i, p in enumerate(peaks): if i == 0: start = 0 else: start = (peaks[i-1] + p) // 2 if i == len(peaks)-1: end = n-1 else: end = (p + peaks[i+1]) // 2 segment = signal[start:end+1] if segment.size < 3: AC.append(np.nan); DC.append(np.nan) times.append(p / fs) beat_windows.append((start, end)) continue seg_max = np.max(segment) seg_min = np.min(segment) ac = (seg_max - seg_min) / 2.0 dc = np.mean(segment) AC.append(ac); DC.append(dc) times.append(p / fs) beat_windows.append((start, end)) return np.array(AC), np.array(DC), np.array(times), beat_windows def compute_spo2_from_R(R, A=104.0, B=17.0): return A - B * R def calibrate_linear(R_vals, spo2_ref): mask = ~np.isnan(R_vals) & ~np.isnan(spo2_ref) if mask.sum() < 2: return None coeffs = np.polyfit(R_vals[mask], spo2_ref[mask], 1) c1, c0 = coeffs[0], coeffs[1] A = c0; B = -c1 return float(A), float(B) def spo2_pipeline(red, ir, fs=25, calibrate_with_ref=None, A=104.0, B=17.0): assert red.shape == ir.shape, "red and ir must have same shape" red_f = bandpass_filter(red, fs) ir_f = bandpass_filter(ir, fs) red_d = detrend(red_f) ir_d = detrend(ir_f) peaks, props = detect_peaks(ir_d, fs) ac_red, dc_red, times, _ = compute_ac_dc_per_beat(red_d, peaks, fs) ac_ir, dc_ir, _, _ = compute_ac_dc_per_beat(ir_d, peaks, fs) with np.errstate(divide='ignore', invalid='ignore'): ratio_red = ac_red / dc_red ratio_ir = ac_ir / dc_ir R = ratio_red / ratio_ir spo2_vals = compute_spo2_from_R(R, A=A, B=B) used_A, used_B = A, B # if calibrate_with_ref is not None: # if len(calibrate_with_ref) != len(R): # raise ValueError("Reference SpO2 length mismatch with beats.") # ref = np.array(calibrate_with_ref) # fit = calibrate_linear(R, ref) # if fit is not None: # used_A, used_B = fit # spo2_vals = compute_spo2_from_R(R, A=used_A, B=used_B) df = pd.DataFrame({ "time_s": times, "AC_red": ac_red, "DC_red": dc_red, "AC_ir": ac_ir, "DC_ir": dc_ir, "R": R, "SpO2": spo2_vals }) df['valid'] = ( (~np.isnan(df['R'])) & (df['AC_red']>0) & (df['AC_ir']>0) & (df['DC_red']>0) & (df['DC_ir']>0) & (df['SpO2']>50) & (df['SpO2']<100) ) summary = { "n_beats": len(df), "n_valid_beats": int(df['valid'].sum()), "mean_spo2": float(np.nanmean(df['SpO2'][df['valid']])) if df['valid'].any() else float('nan'), "median_spo2": float(np.nanmedian(df['SpO2'][df['valid']])) if df['valid'].any() else float('nan'), "used_A": used_A, "used_B": used_B } return df, summary