ecg_label_check/append.py

52 lines
2.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
import numpy as np
from pathlib import Path
def find_TPeak(data,peaks,th=50):
"""
找出真实的J峰或R峰
:param data: BCG或ECG数据
:param peaks: 初步峰值从label中导出的location_R
:param th: 范围阈值
:return: 真实峰值
"""
return_peak = []
for peak in peaks:
if peak>len(data):continue
min_win,max_win = max(0,int(peak-th)),min(len(data),int(peak+th))
return_peak.append(np.argmax(data[min_win:max_win])+min_win)
return np.array(return_peak)
def Rpeak_Detection(input_dir, fs, th1):
raw_ecg = pd.read_csv(Path(input_dir) / "filter_ecg.txt").to_numpy().reshape(-1)
# raw_ecg = raw_ecg[200*sample_rate:]
#######################限制幅值处理############################################
# for i in range(len(raw_ecg)):
# if raw_ecg[i] > 300 or raw_ecg[i] < -300:
# raw_ecg[i] = 0
##############################################################################
all_R_peak = np.array([])
##############################切割处理##########################################
for file_num in range(1, len(list((Path(input_dir) / "label").rglob("*location_J.txt"))) + 1):
R_peak_dir = Path(input_dir) / "label" / (str(file_num) + 'location_J.txt')
R_peak = np.array(pd.read_csv(R_peak_dir, header=None)).reshape(-1)
R_peak = R_peak + fs * 3600 * (file_num - 1)
all_R_peak = np.append(all_R_peak, R_peak)
##############################################################################
all_R_peak = find_TPeak(raw_ecg, all_R_peak, th=int(th1 * fs / 1000))
RR_Interval = np.full(len(all_R_peak) - 1, np.nan)
for i in range(len(all_R_peak) - 1):
RR_Interval[i] = all_R_peak[i + 1] - all_R_peak[i]
RRIV = np.full(len(RR_Interval) - 1, np.nan)
for i in range(len(RR_Interval) - 1):
RRIV[i] = RR_Interval[i + 1] - RR_Interval[i]
Interval = np.full(len(raw_ecg), np.nan)
for i in range(len(all_R_peak) - 1):
Interval[all_R_peak[i]: all_R_peak[i + 1]] = all_R_peak[i + 1] - all_R_peak[i]
return all_R_peak.reshape(-1)