52 lines
2.1 KiB
Python
52 lines
2.1 KiB
Python
import pandas as pd
|
||
import numpy as np
|
||
from pathlib import Path
|
||
|
||
def find_TPeak(data,peaks,th=50):
|
||
"""
|
||
找出真实的J峰或R峰
|
||
:param data: BCG或ECG数据
|
||
:param peaks: 初步峰值(从label中导出的location_R)
|
||
:param th: 范围阈值
|
||
:return: 真实峰值
|
||
"""
|
||
return_peak = []
|
||
for peak in peaks:
|
||
if peak>len(data):continue
|
||
min_win,max_win = max(0,int(peak-th)),min(len(data),int(peak+th))
|
||
return_peak.append(np.argmax(data[min_win:max_win])+min_win)
|
||
return np.array(return_peak)
|
||
|
||
def Rpeak_Detection(input_dir, fs, th1):
|
||
raw_ecg = pd.read_csv(Path(input_dir) / "filter_ecg.txt").to_numpy().reshape(-1)
|
||
# raw_ecg = raw_ecg[200*sample_rate:]
|
||
#######################限制幅值处理############################################
|
||
# for i in range(len(raw_ecg)):
|
||
# if raw_ecg[i] > 300 or raw_ecg[i] < -300:
|
||
# raw_ecg[i] = 0
|
||
##############################################################################
|
||
all_R_peak = np.array([])
|
||
|
||
##############################切割处理##########################################
|
||
for file_num in range(1, len(list((Path(input_dir) / "label").rglob("*location_J.txt"))) + 1):
|
||
R_peak_dir = Path(input_dir) / "label" / (str(file_num) + 'location_J.txt')
|
||
R_peak = np.array(pd.read_csv(R_peak_dir, header=None)).reshape(-1)
|
||
R_peak = R_peak + fs * 3600 * (file_num - 1)
|
||
all_R_peak = np.append(all_R_peak, R_peak)
|
||
##############################################################################
|
||
all_R_peak = find_TPeak(raw_ecg, all_R_peak, th=int(th1 * fs / 1000))
|
||
|
||
RR_Interval = np.full(len(all_R_peak) - 1, np.nan)
|
||
|
||
for i in range(len(all_R_peak) - 1):
|
||
RR_Interval[i] = all_R_peak[i + 1] - all_R_peak[i]
|
||
|
||
RRIV = np.full(len(RR_Interval) - 1, np.nan)
|
||
for i in range(len(RR_Interval) - 1):
|
||
RRIV[i] = RR_Interval[i + 1] - RR_Interval[i]
|
||
|
||
Interval = np.full(len(raw_ecg), np.nan)
|
||
for i in range(len(all_R_peak) - 1):
|
||
Interval[all_R_peak[i]: all_R_peak[i + 1]] = all_R_peak[i + 1] - all_R_peak[i]
|
||
|
||
return all_R_peak.reshape(-1) |