ecg_label_check/append.py

52 lines
2.1 KiB
Python
Raw Normal View History

import pandas as pd
import numpy as np
from pathlib import Path
def find_TPeak(data,peaks,th=50):
"""
找出真实的J峰或R峰
:param data: BCG或ECG数据
:param peaks: 初步峰值从label中导出的location_R
:param th: 范围阈值
:return: 真实峰值
"""
return_peak = []
for peak in peaks:
if peak>len(data):continue
min_win,max_win = max(0,int(peak-th)),min(len(data),int(peak+th))
return_peak.append(np.argmax(data[min_win:max_win])+min_win)
return np.array(return_peak)
def Rpeak_Detection(input_dir, fs, th1):
raw_ecg = pd.read_csv(Path(input_dir) / "filter_ecg.txt").to_numpy().reshape(-1)
# raw_ecg = raw_ecg[200*sample_rate:]
#######################限制幅值处理############################################
# for i in range(len(raw_ecg)):
# if raw_ecg[i] > 300 or raw_ecg[i] < -300:
# raw_ecg[i] = 0
##############################################################################
all_R_peak = np.array([])
##############################切割处理##########################################
for file_num in range(1, len(list((Path(input_dir) / "label").rglob("*location_J.txt"))) + 1):
R_peak_dir = Path(input_dir) / "label" / (str(file_num) + 'location_J.txt')
R_peak = np.array(pd.read_csv(R_peak_dir, header=None)).reshape(-1)
R_peak = R_peak + fs * 3600 * (file_num - 1)
all_R_peak = np.append(all_R_peak, R_peak)
##############################################################################
all_R_peak = find_TPeak(raw_ecg, all_R_peak, th=int(th1 * fs / 1000))
RR_Interval = np.full(len(all_R_peak) - 1, np.nan)
for i in range(len(all_R_peak) - 1):
RR_Interval[i] = all_R_peak[i + 1] - all_R_peak[i]
RRIV = np.full(len(RR_Interval) - 1, np.nan)
for i in range(len(RR_Interval) - 1):
RRIV[i] = RR_Interval[i + 1] - RR_Interval[i]
Interval = np.full(len(raw_ecg), np.nan)
for i in range(len(all_R_peak) - 1):
Interval[all_R_peak[i]: all_R_peak[i + 1]] = all_R_peak[i + 1] - all_R_peak[i]
return all_R_peak.reshape(-1)