Heartbeat_Annotation/heartbeat_annotation.py

428 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
@author:Yosa
@file:heartbeat_annotation.py
@email:2023025086@m.scnu.edu.cn
@time:2025/2/18
"""
import sys
from logging import NOTSET, getLogger, FileHandler, Formatter, StreamHandler, info, error, debug
from time import time, strftime, localtime
import numpy as np
from PyQt5.QtGui import QFont, QDoubleValidator, QIntValidator
from matplotlib.pyplot import title
from pandas import DataFrame, read_csv, read_excel, Series, concat
from matplotlib.ticker import FuncFormatter
from numpy import load, nan, zeros, append, linspace, place
from matplotlib import use
from matplotlib import pyplot as plt, gridspec
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg, NavigationToolbar2QT
from datetime import datetime
from pathlib import Path
from PyQt5.QtCore import QCoreApplication, QTimer
from PyQt5.QtWidgets import QFileDialog, QMainWindow, QMessageBox, QButtonGroup, QApplication, QTableWidgetItem, \
QLineEdit, QAction, QTableWidget
from scipy import signal
from scipy.signal import butter, filtfilt, find_peaks
import detect_Rpeak2
import resample_1000hz
from MainWindow import Ui_MainWindow
use("Qt5Agg") # 声明使用 QT5
# 设置日志
logger = getLogger()
logger.setLevel(NOTSET)
realtime = strftime('%Y%m%d', localtime(time()))
if not Path("logs").exists():
Path("logs").mkdir(exist_ok=True)
fh = FileHandler(Path("logs") / (realtime + ".log"), mode='a')
fh.setLevel(NOTSET)
fh.setFormatter(Formatter("%(asctime)s: %(message)s"))
logger.addHandler(fh)
ch = StreamHandler()
ch.setLevel(NOTSET)
ch.setFormatter(Formatter("%(asctime)s: %(message)s"))
logger.addHandler(ch)
getLogger('matplotlib.font_manager').disabled = True
info("------------------------------------")
info("------heartbeat_annotation.py-------")
class MainWindow(QMainWindow, Ui_MainWindow):
root_path = Path("")
data1 = None
data2 = None
data3 = None
ecg_seq = None
R_peak_seq = None
Interval_seq = None
RRIV_seq = None
temp = None
# 程序初始化操作
def __init__(self):
super(MainWindow, self).__init__()
self.setupUi(self)
# 设置画框
self.figure = plt.figure(figsize=(12, 6), dpi=150)
self.canvas = FigureCanvasQTAgg(self.figure)
self.figToolbar = NavigationToolbar2QT(self.canvas)
self.verticalLayout_canvas.addWidget(self.canvas)
self.verticalLayout_canvas.addWidget(self.figToolbar)
# 界面状态初始化
self.groupBox_func_select.setVisible(True)
self.groupBox_resample1000Hz.setVisible(False)
self.groupBox_detect_Rpeaks.setVisible(False)
self.groupBox_detect_Jpeaks.setVisible(False)
self.groupBox_info.setVisible(False)
self.groupBox_plot.setVisible(False)
self.lineEdit_resample1000Hz_raw_org_path.setEnabled(False)
self.lineEdit_resample1000Hz_DSbcg_sig_path.setEnabled(False)
self.lineEdit_detect_Rpeaks_filter_ecg_path.setEnabled(False)
self.groupBox_detect_Rpeaks_signal_parts_list.setEnabled(False)
# 设置表格属性
self.tableWidget_detect_Rpeaks_signal_parts_list.setHorizontalHeaderLabels(['信号片段'])
self.tableWidget_detect_Rpeaks_signal_parts_list.setEditTriggers(QTableWidget.NoEditTriggers)
self.tableWidget_detect_Rpeaks_signal_parts_list.horizontalHeader().setStretchLastSection(True)
self.tableWidget_detect_Rpeaks_signal_parts_list.horizontalHeader().setSectionResizeMode(1)
# 槽函数连接初始化
self.pushButton_rootpath_open.clicked.connect(self.slot_btn_rootpath_open)
self.pushButton_resample1000Hz.clicked.connect(self.slot_btn_resample1000Hz)
self.pushButton_detect_Rpeaks.clicked.connect(self.slot_btn_detect_Rpeaks)
self.pushButton_detect_Jpeaks.clicked.connect(self.slot_btn_detect_Jpeaks)
self.pushButton_backToMenu.clicked.connect(self.slot_btn_backToMenu)
self.pushButton_resample1000Hz_view.clicked.connect(self.slot_btn_resample1000Hz_view)
self.pushButton_resample1000Hz_save.clicked.connect(self.slot_btn_resample1000Hz_save)
self.pushButton_detect_Rpeaks_view.clicked.connect(self.slot_btn_detect_Rpeaks_view)
self.pushButton_detect_Rpeaks_save.clicked.connect(self.slot_btn_detect_Rpeaks_save)
self.pushButton_detect_Rpeaks_left.clicked.connect(self.slot_btn_detect_Rpeaks_left)
self.pushButton_detect_Rpeaks_right.clicked.connect(self.slot_btn_detect_Rpeaks_right)
self.tableWidget_detect_Rpeaks_signal_parts_list.cellDoubleClicked.connect(self.slot_tableWidget_detect_Rpeaks_signal_parts_list_on_cell_double_clicked)
# 消息弹窗初始化
self.msgBox = QMessageBox()
self.msgBox.setWindowTitle("消息")
def slot_btn_rootpath_open(self):
fileDialog = QFileDialog()
if self.sender() == self.pushButton_rootpath_open:
fileDialog.setFileMode(QFileDialog.Directory)
fileDialog.setOption(QFileDialog.ShowDirsOnly, True)
if fileDialog.exec_() == QFileDialog.Accepted:
self.root_path = fileDialog.selectedFiles()[0]
if not self.root_path:
error("Root Path not Exist...")
self.textBrowser_update("操作:根目录路径输入错误")
self.msgBox.setText("根目录路径输入错误")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
return
self.lineEdit_rootpath.setText(self.root_path)
self.root_path = Path(self.root_path)
info("Loading Root Path...")
else:
info("Canceled Loading Root Path.")
self.textBrowser_update("提示:根目录路径选择取消")
self.msgBox.setText("根目录路径选择取消")
self.msgBox.setIcon(QMessageBox.Warning)
self.msgBox.exec()
def slot_btn_resample1000Hz(self):
raw_org_path = self.root_path / "raw_org.txt"
DSbcg_sig_path = self.root_path / "bcg_test" / "DSbcg_sig.txt"
if not raw_org_path.exists() or not DSbcg_sig_path.exists():
error("Can't Find raw_org.txt or DSbcg_sig.txt.")
self.textBrowser_update("错误无法找到raw_org.txt或DSbcg_sig.txt无法执行<重采样>,请检查文件是否存在")
self.msgBox.setText("无法找到raw_org.txt或DSbcg_sig.txt无法执行<重采样>,请检查文件是否存在")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
return
info("Found raw_org.txt and DSbcg_sig.txt.")
# 画框子图初始化
self.gs = gridspec.GridSpec(1, 1, height_ratios=[1])
self.figure.subplots_adjust(top=1, bottom=0, right=1, left=0, hspace=0, wspace=0)
plt.margins(0, 0)
plt.tight_layout()
plt.xticks([])
plt.yticks([])
self.ax0 = self.figure.add_subplot(self.gs[0])
self.ax0 = plt.gca()
self.ax0.grid(True)
self.ax0.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}"))
self.lineEdit_resample1000Hz_raw_org_path.setText(str(raw_org_path))
self.lineEdit_resample1000Hz_DSbcg_sig_path.setText(str(DSbcg_sig_path))
self.lineEdit_resample1000Hz_save_path.setText(str(self.root_path / "DSbcg_sig_1000hz3.txt"))
self.textBrowser_update("提示找到raw_org.txt和DSbcg_sig.txt")
self.data1 = read_csv(raw_org_path, encoding="utf-8").to_numpy()
self.data2 = read_csv(DSbcg_sig_path, encoding="utf-8", sep="\t")
self.groupBox_func_select.setVisible(False)
self.groupBox_resample1000Hz.setVisible(True)
self.groupBox_info.setVisible(True)
self.groupBox_plot.setVisible(True)
def slot_btn_resample1000Hz_view(self):
if self.lineEdit_resample1000Hz_original_sampling_rate.text() != "" and self.lineEdit_resample1000Hz_target_sampling_rate.text() != "" and self.lineEdit_resample1000Hz_cut_second.text() != "":
self.ax0.remove()
self.ax0 = self.figure.add_subplot(self.gs[0])
self.ax0 = plt.gca()
self.ax0.grid(True)
self.ax0.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}"))
data_before = self.data1
data_test = resample_1000hz.upsample(self.data2.iloc[:, 2], float(self.lineEdit_resample1000Hz_original_sampling_rate.text()), float(self.lineEdit_resample1000Hz_target_sampling_rate.text()))
data_new = resample_1000hz.upsample(self.data2.iloc[int(float(self.lineEdit_resample1000Hz_original_sampling_rate.text()) * float(self.lineEdit_resample1000Hz_cut_second.text())):, 2], float(self.lineEdit_resample1000Hz_original_sampling_rate.text()), float(self.lineEdit_resample1000Hz_target_sampling_rate.text()))
self.ax0.plot(data_before, 'r', label="Original Data")
self.ax0.plot(data_test + 200, 'g', label="Filtered Data")
self.ax0.plot(data_new, 'b', label="Data After Cut")
self.ax0.legend(loc='upper right')
self.canvas.draw()
self.data3 = data_new
info("Finished Data Plot.")
self.textBrowser_update("提示:完成绘图")
else:
error(f"Miss Args for Resample1000Hz.")
self.textBrowser_update("错误:参数输入存在空值")
self.msgBox.setText("参数输入存在空值")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
def slot_btn_resample1000Hz_save(self):
if self.data3 is not None:
if self.lineEdit_resample1000Hz_save_path.text() != "" and self.lineEdit_resample1000Hz_save_path.text().endswith(".txt"):
reply = QMessageBox.question(self, "警告:确认操作", f"你确定要将裁剪结果保存到{self.lineEdit_resample1000Hz_save_path.text()}", QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply == QMessageBox.Yes:
np.savetxt(self.lineEdit_resample1000Hz_save_path.text(), self.data3, fmt='%.4f')
info(f"Saved Data After Cut to {self.lineEdit_resample1000Hz_save_path.text()}.")
self.textBrowser_update(f"提示:保存成功至{self.lineEdit_resample1000Hz_save_path.text()}")
self.msgBox.setText(f"保存成功至{self.lineEdit_resample1000Hz_save_path.text()}")
self.msgBox.setIcon(QMessageBox.Information)
self.msgBox.exec()
else:
self.textBrowser_update("错误:保存路径输入有误,请检查后重新执行保存")
self.msgBox.setText("保存路径输入有误,请检查后重新执行保存")
self.msgBox.setIcon(QMessageBox.Information)
self.msgBox.exec()
else:
error(f"data new is None.")
self.textBrowser_update("错误:裁切后的数据不存在")
self.msgBox.setText("裁切后的数据不存在")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
def slot_btn_detect_Rpeaks(self):
filter_ecg_path = self.root_path / "filter_ecg.txt"
if not filter_ecg_path.exists():
error("Can't Find filter_ecg.txt.")
self.textBrowser_update("错误无法找到filter_ecg.txt无法执行<R峰提取>,请检查文件是否存在")
self.msgBox.setText("无法找到filter_ecg.txt无法执行<R峰提取>,请检查文件是否存在")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
return
info("Found filter_ecg.txt.")
# 画框子图初始化
self.gs = gridspec.GridSpec(2, 1, height_ratios=[1, 1])
self.figure.subplots_adjust(top=1, bottom=0, right=1, left=0, hspace=0, wspace=0)
plt.margins(0, 0)
plt.tight_layout()
plt.xticks([])
plt.yticks([])
self.ax0 = self.figure.add_subplot(self.gs[0])
self.ax0 = plt.gca()
self.ax0.grid(True)
self.ax0.tick_params(axis='x', colors='white')
self.ax0.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}"))
self.ax1 = self.figure.add_subplot(self.gs[1], sharex=self.ax0)
self.ax1 = plt.gca()
self.ax1.grid(True)
self.ax1.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}"))
self.lineEdit_detect_Rpeaks_filter_ecg_path.setText(str(filter_ecg_path))
self.lineEdit_detect_Rpeaks_save_path.setText(str(self.root_path / "label"))
self.textBrowser_update("提示找到filter_ecg.txt")
self.data1 = read_csv(filter_ecg_path, encoding="utf-8").to_numpy().reshape(-1)
self.groupBox_func_select.setVisible(False)
self.groupBox_detect_Rpeaks.setVisible(True)
self.groupBox_info.setVisible(True)
self.groupBox_plot.setVisible(True)
self.radioButton_detector_method_pt.setChecked(True)
def slot_btn_detect_Rpeaks_view(self):
if self.lineEdit_detect_Rpeaks_sampling_rate.text() != "" and self.lineEdit_detect_Rpeaks_peaks_value.text() != "" and self.lineEdit_detect_Rpeaks_bandpass_low.text() != "" and self.lineEdit_detect_Rpeaks_bandpass_high.text() != "":
if self.lineEdit_detect_Rpeaks_save_path.text() != "":
self.ax0.remove()
self.ax0 = self.figure.add_subplot(self.gs[0])
self.ax0 = plt.gca()
self.ax0.grid(True)
self.ax0.tick_params(axis='x', colors='white')
self.ax0.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}"))
self.ax1.remove()
self.ax1 = self.figure.add_subplot(self.gs[1], sharex=self.ax0)
self.ax1 = plt.gca()
self.ax1.grid(True)
self.ax1.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}"))
ecg_data = self.data1
if self.radioButton_detector_method_pt.isChecked():
detector_method = 'pt'
elif self.radioButton_detector_method_ta.isChecked():
detector_method = 'ta'
elif self.radioButton_detector_method_Wt.isChecked():
detector_method = 'Wt'
elif self.radioButton_detector_method_Hamilton.isChecked():
detector_method = 'Hamilton'
elif self.radioButton_detector_method_Engzee.isChecked():
detector_method = 'Engzee'
self.ecg_seq, self.R_peak_seq, self.Interval_seq, self.RRIV_seq = detect_Rpeak2.Rpeak_Detection(ecg_data, int(self.lineEdit_detect_Rpeaks_sampling_rate.text()), int(self.lineEdit_detect_Rpeaks_bandpass_low.text()), int(self.lineEdit_detect_Rpeaks_bandpass_high.text()), int(self.lineEdit_detect_Rpeaks_peaks_value.text()), detector_method)
if len(self.ecg_seq) != len(self.R_peak_seq) != len(self.Interval_seq) != len(self.RRIV_seq):
error("len(self.ecg_seq) and len(self.R_peak_seq) and len(self.Interval_seq) and len(self.RRIV_seq) are not equal.")
self.textBrowser_update("错误ecg_seq和R_peak_seq和Interval_seq和RRIV_seq的长度不相等")
return
info(f"Data Length:{len(ecg_data)}")
self.textBrowser_update(f"数据长度:{len(ecg_data)}")
info(f"Data Duration:{len(ecg_data) / int(self.lineEdit_detect_Rpeaks_sampling_rate.text()) / 60}分钟")
self.textBrowser_update(f"数据时长:{len(ecg_data) / int(self.lineEdit_detect_Rpeaks_sampling_rate.text()) / 60}分钟")
info(f"Data Parts:{len(self.ecg_seq)}小时")
self.textBrowser_update(f"数据总时长:{len(self.ecg_seq)}小时")
self.tableWidget_detect_Rpeaks_signal_parts_list.setRowCount(len(self.ecg_seq))
for row in range(len(self.ecg_seq)):
item = QTableWidgetItem(str(row + 1))
self.tableWidget_detect_Rpeaks_signal_parts_list.setItem(row, 0, item)
self.groupBox_detect_Rpeaks_signal_parts_list.setEnabled(True)
self.ax0.plot(self.R_peak_seq[0][2: ], self.RRIV_seq[0], 'r.', label="RRIV")
self.ax0.legend(loc='upper right')
self.ax1.plot(self.ecg_seq[0], 'r', label="ECG")
self.ax1.plot(self.R_peak_seq[0], self.ecg_seq[0][self.R_peak_seq[0]], 'b*', label="R_peaks")
self.ax1.plot(self.Interval_seq[0], 'g', label="Interval")
self.ax1.legend(loc='upper right')
self.canvas.draw()
self.temp = 0
info("Finished R peaks Detect and Data Part 1 Plot.")
self.textBrowser_update("提示完成R峰提取并绘制信号第1段")
else:
error(f"Miss Args for detect_Rpeaks.")
self.textBrowser_update("错误:参数输入存在空值")
self.msgBox.setText("参数输入存在空值")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
def slot_btn_detect_Rpeaks_save(self):
if self.ecg_seq is not None and self.R_peak_seq is not None:
if self.lineEdit_detect_Rpeaks_save_path.text() != "":
if Path(self.lineEdit_detect_Rpeaks_save_path.text()).is_dir() == False:
Path(self.lineEdit_detect_Rpeaks_save_path.text()).mkdir(parents=True, exist_ok=True)
info("Save Path is not Exist, Made it as a New Directory.")
self.textBrowser_update("提示:检测到保存路径所指向的文件夹不存在,已创建相应文件夹")
reply = QMessageBox.question(self, "警告:确认操作", f"你确定要将裁剪结果保存到{self.lineEdit_resample1000Hz_save_path.text()}", QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply == QMessageBox.Yes:
for idx in range(len(self.ecg_seq)):
DataFrame(self.ecg_seq[idx].reshape(-1)).to_csv(str(Path(self.lineEdit_detect_Rpeaks_save_path.text()) / f"{idx + 1}ecg.txt"), index=False, header=None)
DataFrame(self.R_peak_seq[idx].reshape(-1)).to_csv(str(Path(self.lineEdit_detect_Rpeaks_save_path.text()) / f"{idx + 1}Rpeak.txt"), index=False, header=None)
info(f"Saved Data {idx + 1} to Directory {self.lineEdit_detect_Rpeaks_save_path.text()}.")
self.textBrowser_update(f"提示:保存片段{idx + 1}成功至文件夹{self.lineEdit_detect_Rpeaks_save_path.text()}")
self.msgBox.setText(f"保存成功至{self.lineEdit_detect_Rpeaks_save_path.text()}")
self.msgBox.setIcon(QMessageBox.Information)
self.msgBox.exec()
else:
self.textBrowser_update("错误:保存路径输入有误,请检查后重新执行保存")
self.msgBox.setText("保存路径输入有误,请检查后重新执行保存")
self.msgBox.setIcon(QMessageBox.Information)
self.msgBox.exec()
else:
error(f"data is None.")
self.textBrowser_update("错误:需要保存的数据不存在")
self.msgBox.setText("需要保存的数据不存在")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
def slot_tableWidget_detect_Rpeaks_signal_parts_list_on_cell_double_clicked(self, row, column):
self.temp = int(self.tableWidget_detect_Rpeaks_signal_parts_list.item(row, column).text()) - 1
self.detect_Rpeaks_update_plot(self.temp)
info(f"Finished Data Part {self.temp + 1} Plot.")
self.textBrowser_update(f"提示:完成绘制信号第{self.temp + 1}")
def slot_btn_detect_Rpeaks_left(self):
if self.temp <= 0:
self.msgBox.setText("你正在查看第1段信号")
self.msgBox.setIcon(QMessageBox.Warning)
self.msgBox.exec()
else:
self.temp -= 1
self.detect_Rpeaks_update_plot(self.temp)
info(f"Finished Data Part {self.temp + 1} Plot.")
self.textBrowser_update(f"提示:完成绘制信号第{self.temp + 1}")
def slot_btn_detect_Rpeaks_right(self):
if self.temp >= int(self.tableWidget_detect_Rpeaks_signal_parts_list.item(self.tableWidget_detect_Rpeaks_signal_parts_list.rowCount() - 1, 0).text()) - 1:
self.msgBox.setText("你正在查看最后1段信号")
self.msgBox.setIcon(QMessageBox.Warning)
self.msgBox.exec()
else:
self.temp += 1
self.detect_Rpeaks_update_plot(self.temp)
info(f"Finished Data Part {self.temp + 1} Plot.")
self.textBrowser_update(f"提示:完成绘制信号第{self.temp + 1}")
def detect_Rpeaks_update_plot(self, part_index):
self.ax0.remove()
self.ax0 = self.figure.add_subplot(self.gs[0])
self.ax0 = plt.gca()
self.ax0.grid(True)
self.ax0.tick_params(axis='x', colors='white')
self.ax0.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}"))
self.ax1.remove()
self.ax1 = self.figure.add_subplot(self.gs[1], sharex=self.ax0)
self.ax1 = plt.gca()
self.ax1.grid(True)
self.ax1.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}"))
self.ax0.plot(self.R_peak_seq[part_index][2:], self.RRIV_seq[part_index], 'r.', label="RRIV")
self.ax0.legend(loc='upper right')
self.ax1.plot(self.ecg_seq[part_index], 'r', label="ECG")
self.ax1.plot(self.R_peak_seq[part_index], self.ecg_seq[part_index][self.R_peak_seq[part_index]], 'b*', label="R_peaks")
self.ax1.plot(self.Interval_seq[part_index], 'g', label="Interval")
self.ax1.legend(loc='upper right')
self.canvas.draw()
def slot_btn_detect_Jpeaks(self):
pass
def slot_btn_backToMenu(self):
self.data1 = None
self.data2 = None
self.data3 = None
self.ecg_seq = None
self.R_peak_seq = None
self.Interval_seq = None
self.RRIV_seq = None
self.temp = None
self.tableWidget_detect_Rpeaks_signal_parts_list.clearContents()
self.groupBox_func_select.setVisible(True)
self.groupBox_resample1000Hz.setVisible(False)
self.groupBox_detect_Rpeaks.setVisible(False)
self.groupBox_detect_Jpeaks.setVisible(False)
self.groupBox_info.setVisible(False)
self.groupBox_plot.setVisible(False)
self.groupBox_detect_Rpeaks_signal_parts_list.setEnabled(False)
self.figure.clf()
def textBrowser_update(self, message):
self.textBrowser_infoOutput.append(str(datetime.now().strftime("%H:%M:%S")) + ": " + message)
# 主函数
if __name__ == '__main__':
app = QApplication(sys.argv)
mainWindow = MainWindow()
mainWindow.show()
sys.exit(app.exec_())