import pandas as pd import numpy as np from pathlib import Path def find_TPeak(data,peaks,th=50): """ 找出真实的J峰或R峰 :param data: BCG或ECG数据 :param peaks: 初步峰值(从label中导出的location_R) :param th: 范围阈值 :return: 真实峰值 """ return_peak = [] for peak in peaks: if peak>len(data):continue min_win,max_win = max(0,int(peak-th)),min(len(data),int(peak+th)) return_peak.append(np.argmax(data[min_win:max_win])+min_win) return np.array(return_peak) def Rpeak_Detection(input_dir, fs, th1): raw_ecg = pd.read_csv(Path(input_dir) / "filter_ecg.txt").to_numpy().reshape(-1) # raw_ecg = raw_ecg[200*sample_rate:] #######################限制幅值处理############################################ # for i in range(len(raw_ecg)): # if raw_ecg[i] > 300 or raw_ecg[i] < -300: # raw_ecg[i] = 0 ############################################################################## all_R_peak = np.array([]) ##############################切割处理########################################## for file_num in range(1, len(list((Path(input_dir) / "label").rglob("*location_J.txt"))) + 1): R_peak_dir = Path(input_dir) / "label" / (str(file_num) + 'location_J.txt') R_peak = np.array(pd.read_csv(R_peak_dir, header=None)).reshape(-1) R_peak = R_peak + fs * 3600 * (file_num - 1) all_R_peak = np.append(all_R_peak, R_peak) ############################################################################## all_R_peak = find_TPeak(raw_ecg, all_R_peak, th=int(th1 * fs / 1000)) RR_Interval = np.full(len(all_R_peak) - 1, np.nan) for i in range(len(all_R_peak) - 1): RR_Interval[i] = all_R_peak[i + 1] - all_R_peak[i] RRIV = np.full(len(RR_Interval) - 1, np.nan) for i in range(len(RR_Interval) - 1): RRIV[i] = RR_Interval[i + 1] - RR_Interval[i] Interval = np.full(len(raw_ecg), np.nan) for i in range(len(all_R_peak) - 1): Interval[all_R_peak[i]: all_R_peak[i + 1]] = all_R_peak[i + 1] - all_R_peak[i] return all_R_peak.reshape(-1)