import numpy as np from scipy.interpolate import interp1d import utils def signal_filter_split(conf, signal_data_raw, signal_fs, verbose=True): # 滤波 # 50Hz陷波滤波器 # signal_data = utils.butterworth(data=signal_data, _type="bandpass", low_cut=0.5, high_cut=45, order=10, sample_rate=signal_fs) if verbose: print("Applying 50Hz notch filter...") signal_data = utils.notch_filter(data=signal_data_raw, notch_freq=50.0, quality_factor=30.0, sample_rate=signal_fs) resp_data_0 = utils.butterworth(data=signal_data, _type="lowpass", low_cut=50, order=10, sample_rate=signal_fs) resp_fs = conf["resp"]["downsample_fs_1"] resp_data_1 = utils.downsample_signal_fast(original_signal=resp_data_0, original_fs=signal_fs, target_fs=resp_fs) resp_data_1 = utils.average_filter(raw_data=resp_data_1, sample_rate=resp_fs, window_size_sec=20) resp_data_2 = utils.butterworth(data=resp_data_1, _type=conf["resp_filter"]["filter_type"], low_cut=conf["resp_filter"]["low_cut"], high_cut=conf["resp_filter"]["high_cut"], order=conf["resp_filter"]["order"], sample_rate=resp_fs) if verbose: print("Begin plotting signal data...") # fig = plt.figure(figsize=(12, 8)) # # 绘制三个图raw_data、resp_data_1、resp_data_2 # ax0 = fig.add_subplot(3, 1, 1) # ax0.plot(np.linspace(0, len(signal_data) // signal_fs, len(signal_data)), signal_data, color='blue') # ax0.set_title('Raw Signal Data') # ax1 = fig.add_subplot(3, 1, 2, sharex=ax0) # ax1.plot(np.linspace(0, len(resp_data_1) // resp_fs, len(resp_data_1)), resp_data_1, color='orange') # ax1.set_title('Resp Data after Average Filtering') # ax2 = fig.add_subplot(3, 1, 3, sharex=ax0) # ax2.plot(np.linspace(0, len(resp_data_1) // resp_fs, len(resp_data_2)), resp_data_2, color='green') # ax2.set_title('Resp Data after Butterworth Filtering') # plt.tight_layout() # plt.show() bcg_data = utils.butterworth(data=signal_data, _type=conf["bcg_filter"]["filter_type"], low_cut=conf["bcg_filter"]["low_cut"], high_cut=conf["bcg_filter"]["high_cut"], order=conf["bcg_filter"]["order"], sample_rate=signal_fs) return signal_data, resp_data_2, resp_fs, bcg_data, signal_fs def psg_effort_filter(conf, effort_data_raw, effort_fs): # 滤波 effort_data_1 = utils.bessel(data=effort_data_raw, _type=conf["effort_filter"]["filter_type"], low_cut=conf["effort_filter"]["low_cut"], high_cut=conf["effort_filter"]["high_cut"], order=conf["effort_filter"]["order"], sample_rate=effort_fs) # 移动平均 effort_data_2 = utils.average_filter(raw_data=effort_data_1, sample_rate=effort_fs, window_size_sec=20) return effort_data_raw, effort_data_2, effort_fs def rpeak2hr(rpeak_indices, signal_length, ecg_fs): hr_signal = np.zeros(signal_length) for i in range(1, len(rpeak_indices)): rri = rpeak_indices[i] - rpeak_indices[i - 1] if rri == 0: continue hr = 60 * ecg_fs / rri # 心率,单位:bpm if hr > 120: hr = 120 elif hr < 30: hr = 30 hr_signal[rpeak_indices[i - 1]:rpeak_indices[i]] = hr # 填充最后一个R峰之后的心率值 if len(rpeak_indices) > 1: hr_signal[rpeak_indices[-1]:] = hr_signal[rpeak_indices[-2]] return hr_signal def rpeak2rri_repeat(rpeak_indices, signal_length, ecg_fs): rri_signal = np.zeros(signal_length) for i in range(1, len(rpeak_indices)): rri = rpeak_indices[i] - rpeak_indices[i - 1] rri_signal[rpeak_indices[i - 1]:rpeak_indices[i]] = rri # 填充最后一个R峰之后的RRI值 if len(rpeak_indices) > 1: rri_signal[rpeak_indices[-1]:] = rri_signal[rpeak_indices[-2]] # 遍历异常值 for i in range(1, len(rpeak_indices)): rri = rpeak_indices[i] - rpeak_indices[i - 1] if rri < 0.3 * ecg_fs or rri > 2 * ecg_fs: rri_signal[rpeak_indices[i - 1]:rpeak_indices[i]] = 0 return rri_signal def rpeak2rri_interpolation(rpeak_indices, ecg_fs, rri_fs): r_peak_time = np.asarray(rpeak_indices) / ecg_fs rri = np.diff(r_peak_time) t_rri = r_peak_time[1:] mask = (rri > 0.3) & (rri < 2.0) rri_clean = rri[mask] t_rri_clean = t_rri[mask] t_uniform = np.arange(t_rri_clean[0], t_rri_clean[-1], 1/rri_fs) f = interp1d(t_rri_clean, rri_clean, kind='linear', fill_value="extrapolate") rri_uniform = f(t_uniform) return rri_uniform, rri_fs