Heartbeat_Annotation/detect_Rpeak.py
2025-02-21 20:40:04 +08:00

100 lines
3.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import ecgdetectors
from ecgdetectors import Detectors
from scipy import fftpack
import os
from BCGDataset import BCGDataset,BCG_Operation,read_all_data
import math
def refinement( data, peak):
if len(data) == 0 or len(peak) <=2 : return None
firstPeak = peak[0]
lastPeak = peak[-1]
meanPeak = np.quantile( data[peak[1:-1]], 0.2 )
if data[firstPeak] < meanPeak * 0.6 :
peak = np.delete(peak, 0)
if data[lastPeak] < meanPeak * 0.6 :
peak = np.delete(peak, -1)
return np.array(peak)
def find_TPeak(data,peaks,th=50):
"""
找出真实的J峰或R峰
:param data: BCG或ECG数据
:param peaks: 初步峰值从label中导出的location_R
:param th: 范围阈值
:return: 真实峰值
"""
return_peak = []
for peak in peaks:
if peak>len(data):continue
min_win,max_win = max(0,int(peak-th)),min(len(data),int(peak+th))
return_peak.append(np.argmax(data[min_win:max_win])+min_win)
return np.array(return_peak)
def Rpeak_Detection(raw_ecg,fs,low_cut,high_cut,th1,detector_method):
detectors = Detectors(sampling_frequency=fs)
method_dic = {'pt': detectors.pan_tompkins_detector,
'ta': detectors.two_average_detector,
"Engzee": detectors.engzee_detector,
"Wt": detectors.swt_detector,
"Christov": detectors.christov_detector,
"Hamilton": detectors.hamilton_detector
}
detectormethods = method_dic[detector_method]
# raw_ecg = raw_ecg[200*sample_rate:]
preprocessing = BCG_Operation(sample_rate=fs) # 对ECG做了降采样处理
raw_ecg = preprocessing.Butterworth(raw_ecg, "bandpass", low_cut=low_cut, high_cut=high_cut, order=3) * 4
#######################限制幅值处理############################################
# for i in range(len(raw_ecg)):
# if raw_ecg[i] > 300 or raw_ecg[i] < -300:
# raw_ecg[i] = 0
##############################################################################
##############################切割处理##########################################
all_file_num = math.ceil((len(raw_ecg) / fs / 60 / 60))
ecg_seq = np.array(np.arange(all_file_num))
ecg_seq = ecg_seq.astype(np.ndarray)
R_peak_seq = np.array(np.arange(all_file_num))
R_peak_seq = R_peak_seq.astype(np.ndarray)
Interval_seq = np.array(np.arange(all_file_num))
Interval_seq = Interval_seq.astype(np.ndarray)
RRIV_seq = np.array(np.arange(all_file_num))
RRIV_seq = RRIV_seq.astype(np.ndarray)
for file_num in range(1,all_file_num+1):
if file_num != all_file_num:
new_ecg = raw_ecg[fs*3600*(file_num - 1) : fs*3600*file_num]
else:
new_ecg = raw_ecg[fs*3600*(file_num - 1):]
##############################################################################
R_peak = np.array(detectormethods(new_ecg)) - 100
# R_peak = np.array(detectors.pan_tompkins_detector(raw_ecg))-100
R_peak = find_TPeak(new_ecg, R_peak, th=int(th1 * fs / 1000))
R_peak = refinement(new_ecg, R_peak)
RR_Interval = np.full(len(R_peak) - 1, np.nan)
for i in range(len(R_peak) - 1):
RR_Interval[i] = R_peak[i + 1] - R_peak[i]
RRIV = np.full(len(RR_Interval) - 1, np.nan)
for i in range(len(RR_Interval) - 1):
RRIV[i] = RR_Interval[i + 1] - RR_Interval[i]
Interval = np.full(len(new_ecg), np.nan)
for i in range(len(R_peak) - 1):
Interval[R_peak[i]: R_peak[i + 1]] = R_peak[i + 1] - R_peak[i]
ecg_seq[file_num - 1] = new_ecg
R_peak_seq[file_num - 1] = R_peak
Interval_seq[file_num - 1] = Interval
RRIV_seq[file_num - 1] = RRIV
return ecg_seq, R_peak_seq, Interval_seq, RRIV_seq