Heartbeat_Annotation/heartbeat_annotation.py
2025-02-21 20:40:04 +08:00

566 lines
33 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
@author:Yosa
@file:heartbeat_annotation.py
@email:2023025086@m.scnu.edu.cn
@time:2025/2/18
"""
import sys
from logging import NOTSET, getLogger, FileHandler, Formatter, StreamHandler, info, error, debug
from time import time, strftime, localtime
import numpy as np
from PyQt5.QtGui import QFont, QDoubleValidator, QIntValidator
from matplotlib.pyplot import title
from pandas import DataFrame, read_csv
from matplotlib.ticker import FuncFormatter
from numpy import load, nan, zeros, append, linspace, place
from matplotlib import use
from matplotlib import pyplot as plt, gridspec
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg, NavigationToolbar2QT
from datetime import datetime
from pathlib import Path
from PyQt5.QtWidgets import QFileDialog, QMainWindow, QMessageBox, QButtonGroup, QApplication, QTableWidgetItem, QTableWidget, QWidget
import resample_1000hz
import detect_Rpeak
import detect_Jpeak
from ui.MainWindow import Ui_MainWindow
from ui.widget_func import Ui_widget_func
from ui.widget_resample1000Hz import Ui_widget_resample1000Hz
from ui.widget_detect_Rpeaks import Ui_widget_detect_Rpeaks
from ui.widget_detect_Jpeaks import Ui_widget_detect_Jpeaks
use("Qt5Agg") # 声明使用 QT5
# 设置日志
logger = getLogger()
logger.setLevel(NOTSET)
realtime = strftime('%Y%m%d', localtime(time()))
if not Path("logs").exists():
Path("logs").mkdir(exist_ok=True)
fh = FileHandler(Path("logs") / (realtime + ".log"), mode='a')
fh.setLevel(NOTSET)
fh.setFormatter(Formatter("%(asctime)s: %(message)s"))
logger.addHandler(fh)
ch = StreamHandler()
ch.setLevel(NOTSET)
ch.setFormatter(Formatter("%(asctime)s: %(message)s"))
logger.addHandler(ch)
getLogger('matplotlib.font_manager').disabled = True
info("------------------------------------")
info("------heartbeat_annotation.py-------")
class MainWindow(QMainWindow, Ui_MainWindow):
root_path = Path("")
data1 = None
data2 = None
data3 = None
data4 = None
ecg_seq = None
R_peak_seq = None
Interval_seq = None
RRIV_seq = None
temp = None
# 程序初始化操作
def __init__(self):
super(MainWindow, self).__init__()
self.setupUi(self)
# 设置画框
self.figure = plt.figure(figsize=(12, 6), dpi=150)
self.canvas = FigureCanvasQTAgg(self.figure)
self.figToolbar = NavigationToolbar2QT(self.canvas)
self.verticalLayout_canvas.addWidget(self.canvas)
self.verticalLayout_canvas.addWidget(self.figToolbar)
# Widget初始化
self.widget_func = QWidget()
self.ui_func = Ui_widget_func()
self.ui_func.setupUi(self.widget_func)
self.widget_resample1000Hz = QWidget()
self.ui_resample1000Hz = Ui_widget_resample1000Hz()
self.ui_resample1000Hz.setupUi(self.widget_resample1000Hz)
self.widget_detect_Rpeaks = QWidget()
self.ui_detect_Rpeaks = Ui_widget_detect_Rpeaks()
self.ui_detect_Rpeaks.setupUi(self.widget_detect_Rpeaks)
self.widget_detect_Jpeaks = QWidget()
self.ui_detect_Jpeaks = Ui_widget_detect_Jpeaks()
self.ui_detect_Jpeaks.setupUi(self.widget_detect_Jpeaks)
# 界面状态初始化
self.verticalLayout_menu.addWidget(self.widget_func)
self.ui_detect_Rpeaks.groupBox_detect_Rpeaks_signal_parts_list.setEnabled(False)
# 定义验证器用于规范lineEdit的输入内容
validator_double = QDoubleValidator(-1e100, 1e100, 10)
validator_integer = QIntValidator(-2**31, 2**31 - 1)
self.ui_resample1000Hz.lineEdit_resample1000Hz_original_sampling_rate.setValidator(validator_integer)
self.ui_resample1000Hz.lineEdit_resample1000Hz_target_sampling_rate.setValidator(validator_integer)
self.ui_resample1000Hz.lineEdit_resample1000Hz_cut_second.setValidator(validator_integer)
self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_sampling_rate.setValidator(validator_integer)
self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_peaks_value.setValidator(validator_integer)
self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_bandpass_low.setValidator(validator_integer)
self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_bandpass_high.setValidator(validator_integer)
self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_sampling_rate.setValidator(validator_integer)
self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_peaks_value.setValidator(validator_integer)
self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_amp_value.setValidator(validator_integer)
self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_interval_low.setValidator(validator_integer)
self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_interval_high.setValidator(validator_integer)
self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_sampling_rate.setValidator(validator_integer)
self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_bandpass_low.setValidator(validator_integer)
self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_bandpass_high.setValidator(validator_integer)
# 设置表格属性
self.ui_detect_Rpeaks.tableWidget_detect_Rpeaks_signal_parts_list.setHorizontalHeaderLabels(['信号片段'])
self.ui_detect_Rpeaks.tableWidget_detect_Rpeaks_signal_parts_list.setEditTriggers(QTableWidget.NoEditTriggers)
self.ui_detect_Rpeaks.tableWidget_detect_Rpeaks_signal_parts_list.horizontalHeader().setStretchLastSection(True)
self.ui_detect_Rpeaks.tableWidget_detect_Rpeaks_signal_parts_list.horizontalHeader().setSectionResizeMode(1)
# 槽函数连接初始化
self.ui_func.pushButton_rootpath_open.clicked.connect(self.slot_btn_rootpath_open)
self.ui_func.pushButton_resample1000Hz.clicked.connect(self.slot_btn_resample1000Hz)
self.ui_func.pushButton_detect_Rpeaks.clicked.connect(self.slot_btn_detect_Rpeaks)
self.ui_func.pushButton_detect_Jpeaks.clicked.connect(self.slot_btn_detect_Jpeaks)
self.pushButton_backToMenu.clicked.connect(self.slot_btn_backToMenu)
self.ui_resample1000Hz.pushButton_resample1000Hz_view.clicked.connect(self.slot_btn_resample1000Hz_view)
self.ui_resample1000Hz.pushButton_resample1000Hz_save.clicked.connect(self.slot_btn_resample1000Hz_save)
self.ui_detect_Rpeaks.pushButton_detect_Rpeaks_view.clicked.connect(self.slot_btn_detect_Rpeaks_view)
self.ui_detect_Rpeaks.pushButton_detect_Rpeaks_save.clicked.connect(self.slot_btn_detect_Rpeaks_save)
self.ui_detect_Rpeaks.pushButton_detect_Rpeaks_left.clicked.connect(self.slot_btn_detect_Rpeaks_left)
self.ui_detect_Rpeaks.pushButton_detect_Rpeaks_right.clicked.connect(self.slot_btn_detect_Rpeaks_right)
self.ui_detect_Rpeaks.tableWidget_detect_Rpeaks_signal_parts_list.cellDoubleClicked.connect(self.slot_tableWidget_detect_Rpeaks_signal_parts_list_on_cell_double_clicked)
self.ui_detect_Jpeaks.pushButton_detect_Jpeaks_view.clicked.connect(self.slot_btn_detect_Jpeaks_view)
self.ui_detect_Jpeaks.pushButton_detect_Jpeaks_save.clicked.connect(self.slot_btn_detect_Jpeaks_save)
# 消息弹窗初始化
self.msgBox = QMessageBox()
self.msgBox.setWindowTitle("消息")
def slot_btn_rootpath_open(self):
fileDialog = QFileDialog()
if self.sender() == self.ui_func.pushButton_rootpath_open:
fileDialog.setFileMode(QFileDialog.Directory)
fileDialog.setOption(QFileDialog.ShowDirsOnly, True)
if fileDialog.exec_() == QFileDialog.Accepted:
self.root_path = fileDialog.selectedFiles()[0]
if not self.root_path:
error("Root Path not Exist...")
self.textBrowser_update("操作:根目录路径输入错误")
self.msgBox.setText("根目录路径输入错误")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
return
self.ui_func.lineEdit_rootpath.setText(self.root_path)
self.root_path = Path(self.root_path)
info("Loading Root Path...")
else:
info("Canceled Loading Root Path.")
self.textBrowser_update("提示:根目录路径选择取消")
self.msgBox.setText("根目录路径选择取消")
self.msgBox.setIcon(QMessageBox.Warning)
self.msgBox.exec()
def slot_btn_resample1000Hz(self):
raw_org_path = self.root_path / "raw_org.txt"
DSbcg_sig_path = self.root_path / "bcg_test" / "DSbcg_sig.txt"
if not raw_org_path.exists() or not DSbcg_sig_path.exists():
error("Can't Find raw_org.txt or DSbcg_sig.txt.")
self.textBrowser_update("错误无法找到raw_org.txt或DSbcg_sig.txt无法执行<重采样>,请检查文件是否存在")
self.msgBox.setText("无法找到raw_org.txt或DSbcg_sig.txt无法执行<重采样>,请检查文件是否存在")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
return
info("Found raw_org.txt and DSbcg_sig.txt.")
self.widget_func.setParent(None)
self.verticalLayout_menu.removeWidget(self.widget_func)
self.verticalLayout_menu.addWidget(self.widget_resample1000Hz)
# 画框子图初始化
self.gs = gridspec.GridSpec(1, 1, height_ratios=[1])
self.figure.subplots_adjust(top=1, bottom=0, right=1, left=0, hspace=0, wspace=0)
plt.margins(0, 0)
plt.tight_layout()
plt.xticks([])
plt.yticks([])
self.ax0 = self.figure.add_subplot(self.gs[0])
self.ax0 = plt.gca()
self.ax0.grid(True)
self.ax0.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}"))
self.ui_resample1000Hz.lineEdit_resample1000Hz_raw_org_path.setText(str(raw_org_path))
self.ui_resample1000Hz.lineEdit_resample1000Hz_DSbcg_sig_path.setText(str(DSbcg_sig_path))
self.ui_resample1000Hz.lineEdit_resample1000Hz_save_path.setText(str(self.root_path / "DSbcg_sig_1000hz3.txt"))
self.textBrowser_update("提示找到raw_org.txt和DSbcg_sig.txt")
self.data1 = read_csv(raw_org_path, encoding="utf-8").to_numpy()
self.data2 = read_csv(DSbcg_sig_path, encoding="utf-8", sep="\t")
def slot_btn_resample1000Hz_view(self):
if self.ui_resample1000Hz.lineEdit_resample1000Hz_original_sampling_rate.text() != "" and self.ui_resample1000Hz.lineEdit_resample1000Hz_target_sampling_rate.text() != "" and self.ui_resample1000Hz.lineEdit_resample1000Hz_cut_second.text() != "":
self.ax0.remove()
self.ax0 = self.figure.add_subplot(self.gs[0])
self.ax0 = plt.gca()
self.ax0.grid(True)
self.ax0.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}"))
data_before = self.data1
data_test = resample_1000hz.upsample(self.data2.iloc[:, 2], float(self.ui_resample1000Hz.lineEdit_resample1000Hz_original_sampling_rate.text()), float(self.ui_resample1000Hz.lineEdit_resample1000Hz_target_sampling_rate.text()))
data_new = resample_1000hz.upsample(self.data2.iloc[int(float(self.ui_resample1000Hz.lineEdit_resample1000Hz_original_sampling_rate.text()) * float(self.ui_resample1000Hz.lineEdit_resample1000Hz_cut_second.text())):, 2], float(self.ui_resample1000Hz.lineEdit_resample1000Hz_original_sampling_rate.text()), float(self.ui_resample1000Hz.lineEdit_resample1000Hz_target_sampling_rate.text()))
self.ax0.plot(data_before, 'r', label="Original Data")
self.ax0.plot(data_test + 200, 'g', label="Filtered Data")
self.ax0.plot(data_new, 'b', label="Data After Cut")
self.ax0.legend(loc='upper right')
self.canvas.draw()
self.data3 = data_new
info("Finished Data Plot.")
self.textBrowser_update("提示:完成绘图")
else:
error(f"Miss Args for Resample1000Hz.")
self.textBrowser_update("错误:参数输入存在空值")
self.msgBox.setText("参数输入存在空值")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
def slot_btn_resample1000Hz_save(self):
if self.data3 is not None:
if self.ui_resample1000Hz.lineEdit_resample1000Hz_save_path.text() != "" and self.ui_resample1000Hz.lineEdit_resample1000Hz_save_path.text().endswith(".txt"):
reply = QMessageBox.question(self, "警告:确认操作", f"你确定要将裁剪结果保存到{self.ui_resample1000Hz.lineEdit_resample1000Hz_save_path.text()}", QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply == QMessageBox.Yes:
np.savetxt(self.ui_resample1000Hz.lineEdit_resample1000Hz_save_path.text(), self.data3, fmt='%.4f')
info(f"Saved Data After Cut to {self.ui_resample1000Hz.lineEdit_resample1000Hz_save_path.text()}.")
self.textBrowser_update(f"提示:保存成功至{self.ui_resample1000Hz.lineEdit_resample1000Hz_save_path.text()}")
self.msgBox.setText(f"保存成功至{self.ui_resample1000Hz.lineEdit_resample1000Hz_save_path.text()}")
self.msgBox.setIcon(QMessageBox.Information)
self.msgBox.exec()
else:
self.textBrowser_update(f"提示:保存操作取消")
else:
self.textBrowser_update("错误:保存路径输入有误,请检查后重新执行保存")
self.msgBox.setText("保存路径输入有误,请检查后重新执行保存")
self.msgBox.setIcon(QMessageBox.Information)
self.msgBox.exec()
else:
error(f"data new is None.")
self.textBrowser_update("错误:裁切后的数据不存在")
self.msgBox.setText("裁切后的数据不存在")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
def slot_btn_detect_Rpeaks(self):
filter_ecg_path = self.root_path / "filter_ecg.txt"
if not filter_ecg_path.exists():
error("Can't Find filter_ecg.txt.")
self.textBrowser_update("错误无法找到filter_ecg.txt无法执行<R峰提取>,请检查文件是否存在")
self.msgBox.setText("无法找到filter_ecg.txt无法执行<R峰提取>,请检查文件是否存在")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
return
info("Found filter_ecg.txt.")
self.widget_func.setParent(None)
self.verticalLayout_menu.removeWidget(self.widget_func)
self.verticalLayout_menu.addWidget(self.widget_detect_Rpeaks)
# 画框子图初始化
self.gs = gridspec.GridSpec(2, 1, height_ratios=[1, 1])
self.figure.subplots_adjust(top=1, bottom=0, right=1, left=0, hspace=0, wspace=0)
plt.margins(0, 0)
plt.tight_layout()
plt.xticks([])
plt.yticks([])
self.ax0 = self.figure.add_subplot(self.gs[0])
self.ax0 = plt.gca()
self.ax0.grid(True)
self.ax0.tick_params(axis='x', colors='white')
self.ax0.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}"))
self.ax1 = self.figure.add_subplot(self.gs[1], sharex=self.ax0)
self.ax1 = plt.gca()
self.ax1.grid(True)
self.ax1.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}"))
self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_filter_ecg_path.setText(str(filter_ecg_path))
self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_save_path.setText(str(self.root_path / "label"))
self.textBrowser_update("提示找到filter_ecg.txt")
self.data1 = read_csv(filter_ecg_path, encoding="utf-8").to_numpy().reshape(-1)
def slot_btn_detect_Rpeaks_view(self):
if self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_sampling_rate.text() != "" and self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_peaks_value.text() != "" and self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_bandpass_low.text() != "" and self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_bandpass_high.text() != "":
self.ax0.remove()
self.ax0 = self.figure.add_subplot(self.gs[0])
self.ax0 = plt.gca()
self.ax0.grid(True)
self.ax0.tick_params(axis='x', colors='white')
self.ax0.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}"))
self.ax1.remove()
self.ax1 = self.figure.add_subplot(self.gs[1], sharex=self.ax0)
self.ax1 = plt.gca()
self.ax1.grid(True)
self.ax1.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}"))
ecg_data = self.data1
if self.ui_detect_Rpeaks.radioButton_detector_method_pt.isChecked():
detector_method = 'pt'
elif self.ui_detect_Rpeaks.radioButton_detector_method_ta.isChecked():
detector_method = 'ta'
elif self.ui_detect_Rpeaks.radioButton_detector_method_Wt.isChecked():
detector_method = 'Wt'
elif self.ui_detect_Rpeaks.radioButton_detector_method_Hamilton.isChecked():
detector_method = 'Hamilton'
elif self.ui_detect_Rpeaks.radioButton_detector_method_Engzee.isChecked():
detector_method = 'Engzee'
self.ecg_seq, self.R_peak_seq, self.Interval_seq, self.RRIV_seq = detect_Rpeak.Rpeak_Detection(ecg_data, int(self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_sampling_rate.text()), int(self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_bandpass_low.text()), int(self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_bandpass_high.text()), int(self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_peaks_value.text()), detector_method)
if len(self.ecg_seq) != len(self.R_peak_seq) != len(self.Interval_seq) != len(self.RRIV_seq):
error("len(self.ecg_seq) and len(self.R_peak_seq) and len(self.Interval_seq) and len(self.RRIV_seq) are not equal.")
self.textBrowser_update("错误ecg_seq和R_peak_seq和Interval_seq和RRIV_seq的长度不相等")
return
info(f"Data Length: {len(ecg_data)}")
self.textBrowser_update(f"数据长度:{len(ecg_data)}")
info(f"Data Duration: {len(ecg_data) / int(self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_sampling_rate.text()) / 60} min")
self.textBrowser_update(f"数据时长:{len(ecg_data) / int(self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_sampling_rate.text()) / 60}分钟")
info(f"Data Parts: {len(self.ecg_seq)} hours")
self.textBrowser_update(f"数据总时长:{len(self.ecg_seq)}小时")
self.ui_detect_Rpeaks.tableWidget_detect_Rpeaks_signal_parts_list.setRowCount(len(self.ecg_seq))
for row in range(len(self.ecg_seq)):
item = QTableWidgetItem(str(row + 1))
self.ui_detect_Rpeaks.tableWidget_detect_Rpeaks_signal_parts_list.setItem(row, 0, item)
self.ui_detect_Rpeaks.groupBox_detect_Rpeaks_signal_parts_list.setEnabled(True)
self.ax0.plot(self.R_peak_seq[0][2: ], self.RRIV_seq[0], 'r.', label="RRIV")
self.ax0.legend(loc='upper right')
self.ax1.plot(self.ecg_seq[0], 'r', label="ECG")
self.ax1.plot(self.R_peak_seq[0], self.ecg_seq[0][self.R_peak_seq[0]], 'b*', label="R_peaks")
self.ax1.plot(self.Interval_seq[0], 'g', label="Interval")
self.ax1.legend(loc='upper right')
self.canvas.draw()
self.temp = 0
info("Finished R peaks Detect and Data Part 1 Plot.")
self.textBrowser_update("提示完成R峰提取并绘制信号第1段")
else:
error(f"Miss Args for detect_Rpeaks.")
self.textBrowser_update("错误:参数输入存在空值")
self.msgBox.setText("参数输入存在空值")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
def slot_btn_detect_Rpeaks_save(self):
if self.ecg_seq is not None and self.R_peak_seq is not None:
if self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_save_path.text() != "":
if Path(self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_save_path.text()).is_dir() == False:
Path(self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_save_path.text()).mkdir(parents=True, exist_ok=True)
info("Save Path is not Exist, Made it as a New Directory.")
self.textBrowser_update("提示:检测到保存路径所指向的文件夹不存在,已创建相应文件夹")
reply = QMessageBox.question(self, "警告:确认操作", f"你确定要将裁剪结果保存到{self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_save_path.text()}", QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply == QMessageBox.Yes:
for idx in range(len(self.ecg_seq)):
DataFrame(self.ecg_seq[idx].reshape(-1)).to_csv(str(Path(self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_save_path.text()) / f"{idx + 1}ecg.txt"), index=False, header=None)
DataFrame(self.R_peak_seq[idx].reshape(-1)).to_csv(str(Path(self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_save_path.text()) / f"{idx + 1}Rpeak.txt"), index=False, header=None)
info(f"Saved Data {idx + 1} to Directory {self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_save_path.text()}.")
self.textBrowser_update(f"提示:保存片段{idx + 1}成功至文件夹{self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_save_path.text()}")
self.msgBox.setText(f"保存成功至{self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_save_path.text()}")
self.msgBox.setIcon(QMessageBox.Information)
self.msgBox.exec()
else:
self.textBrowser_update(f"提示:保存操作取消")
else:
self.textBrowser_update("错误:保存路径输入有误,请检查后重新执行保存")
self.msgBox.setText("保存路径输入有误,请检查后重新执行保存")
self.msgBox.setIcon(QMessageBox.Information)
self.msgBox.exec()
else:
error(f"data is None.")
self.textBrowser_update("错误:需要保存的数据不存在")
self.msgBox.setText("需要保存的数据不存在")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
def slot_tableWidget_detect_Rpeaks_signal_parts_list_on_cell_double_clicked(self, row, column):
self.temp = int(self.ui_detect_Rpeaks.tableWidget_detect_Rpeaks_signal_parts_list.item(row, column).text()) - 1
self.detect_Rpeaks_update_plot(self.temp)
info(f"Finished Data Part {self.temp + 1} Plot.")
self.textBrowser_update(f"提示:完成绘制信号第{self.temp + 1}")
def slot_btn_detect_Rpeaks_left(self):
if self.temp <= 0:
self.msgBox.setText("你正在查看第1段信号")
self.msgBox.setIcon(QMessageBox.Warning)
self.msgBox.exec()
else:
self.temp -= 1
self.detect_Rpeaks_update_plot(self.temp)
info(f"Finished Data Part {self.temp + 1} Plot.")
self.textBrowser_update(f"提示:完成绘制信号第{self.temp + 1}")
def slot_btn_detect_Rpeaks_right(self):
if self.temp >= int(self.ui_detect_Rpeaks.tableWidget_detect_Rpeaks_signal_parts_list.item(self.ui_detect_Rpeaks.tableWidget_detect_Rpeaks_signal_parts_list.rowCount() - 1, 0).text()) - 1:
self.msgBox.setText("你正在查看最后1段信号")
self.msgBox.setIcon(QMessageBox.Warning)
self.msgBox.exec()
else:
self.temp += 1
self.detect_Rpeaks_update_plot(self.temp)
info(f"Finished Data Part {self.temp + 1} Plot.")
self.textBrowser_update(f"提示:完成绘制信号第{self.temp + 1}")
def detect_Rpeaks_update_plot(self, part_index):
self.ax0.remove()
self.ax0 = self.figure.add_subplot(self.gs[0])
self.ax0 = plt.gca()
self.ax0.grid(True)
self.ax0.tick_params(axis='x', colors='white')
self.ax0.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}"))
self.ax1.remove()
self.ax1 = self.figure.add_subplot(self.gs[1], sharex=self.ax0)
self.ax1 = plt.gca()
self.ax1.grid(True)
self.ax1.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}"))
self.ax0.plot(self.R_peak_seq[part_index][2:], self.RRIV_seq[part_index], 'r.', label="RRIV")
self.ax0.legend(loc='upper right')
self.ax1.plot(self.ecg_seq[part_index], 'r', label="ECG")
self.ax1.plot(self.R_peak_seq[part_index], self.ecg_seq[part_index][self.R_peak_seq[part_index]], 'b*', label="R_peaks")
self.ax1.plot(self.Interval_seq[part_index], 'g', label="Interval")
self.ax1.legend(loc='upper right')
self.canvas.draw()
def slot_btn_detect_Jpeaks(self):
DSbcg_sig_1000hz3_path = self.root_path / "DSbcg_sig_1000hz3.txt"
if not DSbcg_sig_1000hz3_path.exists():
error("Can't Find DSbcg_sig_1000hz3.txt.")
self.textBrowser_update("错误无法找到DSbcg_sig_1000hz3.txt无法执行<J峰提取>,请检查文件是否存在")
self.msgBox.setText("无法找到DSbcg_sig_1000hz3.txt无法执行<J峰提取>,请检查文件是否存在")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
return
info("Found DSbcg_sig_1000hz3.txt.")
self.widget_func.setParent(None)
self.verticalLayout_menu.removeWidget(self.widget_func)
self.verticalLayout_menu.addWidget(self.widget_detect_Jpeaks)
# 画框子图初始化
self.gs = gridspec.GridSpec(1, 1, height_ratios=[1])
self.figure.subplots_adjust(top=1, bottom=0, right=1, left=0, hspace=0, wspace=0)
plt.margins(0, 0)
plt.tight_layout()
plt.xticks([])
plt.yticks([])
self.ax0 = self.figure.add_subplot(self.gs[0])
self.ax0 = plt.gca()
self.ax0.grid(True)
self.ax0.tick_params(axis='x', colors='white')
self.ax0.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}"))
self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_DSbcg_sig_1000hz3_path.setText(str(DSbcg_sig_1000hz3_path))
self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_save_path.setText(str(Path(self.root_path) / "Jpeak.txt"))
self.textBrowser_update("提示找到DSbcg_sig_1000hz3.txt")
self.data1 = np.array(read_csv(DSbcg_sig_1000hz3_path, header=None)).reshape(-1)
def slot_btn_detect_Jpeaks_view(self):
if self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_sampling_rate.text() != "" and self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_peaks_value.text() != "" and self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_bandpass_low.text() != "" and self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_bandpass_high.text() != "" and self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_amp_value.text() != "" and self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_interval_low.text() != "" and self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_interval_high.text() != "":
self.ax0.remove()
self.ax0 = self.figure.add_subplot(self.gs[0])
self.ax0 = plt.gca()
self.ax0.grid(True)
self.ax0.tick_params(axis='x', colors='white')
self.ax0.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}"))
bcg_data = self.data1
if self.ui_detect_Jpeaks.radioButton_Fivelayer_Unet_1.isChecked():
detector_method = 'Fivelayer_Unet_1'
elif self.ui_detect_Jpeaks.radioButton_Fivelayer_Unet_2.isChecked():
detector_method = 'Fivelayer_Unet_2'
elif self.ui_detect_Jpeaks.radioButton_Fivelayer_Lstm_Unet_1.isChecked():
detector_method = 'Fivelayer_Lstm_Unet_1'
elif self.ui_detect_Jpeaks.radioButton_Fivelayer_Lstm_Unet_2.isChecked():
detector_method = 'Fivelayer_Lstm_Unet_2'
else:
detector_method = 'Fivelayer_Unet_1'
if self.ui_detect_Jpeaks.checkBox_useCPU.isChecked() == True:
useCPU = True
else:
useCPU = False
self.data2, self.data3, self.data4 = detect_Jpeak.Jpeak_Detection(bcg_data, detector_method, int(self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_sampling_rate.text()), int(self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_bandpass_low.text()), int(self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_bandpass_high.text()), int(self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_interval_high.text()), int(self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_interval_low.text()), int(self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_peaks_value.text()), int(self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_amp_value.text()), useCPU)
info(f"Data Length: {len(bcg_data)}")
self.textBrowser_update(f"数据长度:{len(bcg_data)}")
info(f"Data Duration: {len(bcg_data) / int(self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_sampling_rate.text()) / 60} min")
self.textBrowser_update(f"数据时长:{len(bcg_data) / int(self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_sampling_rate.text()) / 60} 分钟")
info(f"Quantity of J_peaks: {len(self.data3)}")
self.textBrowser_update(f"J峰个数{len(self.data3)}")
self.ax0.plot(self.data2, 'b', label="BCG_filtered")
self.ax0.plot(self.data3, self.data2[self.data3], 'r.', label="Predict_J_peaks")
self.ax0.plot(self.data4, 'orange', label="Interval")
self.ax0.legend(loc='upper right')
self.canvas.draw()
info("Finished J peaks Detect and Data Plot.")
self.textBrowser_update("提示完成J峰提取并绘制信号")
else:
error(f"Miss Args for detect_Jpeaks.")
self.textBrowser_update("错误:参数输入存在空值")
self.msgBox.setText("参数输入存在空值")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
def slot_btn_detect_Jpeaks_save(self):
if self.data2 is not None and self.data3 is not None and self.data4 is not None:
if self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_save_path.text() != "" and self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_save_path.text().endswith(".txt"):
reply = QMessageBox.question(self, "警告:确认操作", f"你确定要将预测结果保存到{self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_save_path.text()}", QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply == QMessageBox.Yes:
DataFrame(self.data3.reshape(-1)).to_csv(self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_save_path.text(), index=False, header=None)
info(f"Saved Data to Directory {self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_save_path.text()}.")
self.textBrowser_update(f"提示:保存数据成功至文件夹{self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_save_path.text()}")
self.msgBox.setText(f"保存成功至{self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_save_path.text()}")
self.msgBox.setIcon(QMessageBox.Information)
self.msgBox.exec()
else:
self.textBrowser_update(f"提示:保存操作取消")
else:
self.textBrowser_update("错误:保存路径输入有误,请检查后重新执行保存")
self.msgBox.setText("保存路径输入有误,请检查后重新执行保存")
self.msgBox.setIcon(QMessageBox.Information)
self.msgBox.exec()
else:
error(f"data is None.")
self.textBrowser_update("错误:需要保存的数据不存在")
self.msgBox.setText("需要保存的数据不存在")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
def slot_btn_backToMenu(self):
self.data1 = None
self.data2 = None
self.data3 = None
self.data4 = None
self.ecg_seq = None
self.R_peak_seq = None
self.Interval_seq = None
self.RRIV_seq = None
self.temp = None
self.ui_detect_Rpeaks.tableWidget_detect_Rpeaks_signal_parts_list.clearContents()
self.ui_detect_Rpeaks.groupBox_detect_Rpeaks_signal_parts_list.setEnabled(False)
for i in range(self.verticalLayout_menu.count()):
item = self.verticalLayout_menu.itemAt(i)
if item.widget():
widget = item.widget()
if widget.objectName() == "widget_resample1000Hz" or widget.objectName() == "widget_detect_Rpeaks" or widget.objectName() == "widget_detect_Jpeaks":
widget.setParent(None)
self.verticalLayout_menu.removeWidget(widget)
self.verticalLayout_menu.addWidget(self.widget_func)
self.figure.clf()
def textBrowser_update(self, message):
self.textBrowser_infoOutput.append(str(datetime.now().strftime("%H:%M:%S")) + ": " + message)
self.textBrowser_infoOutput.verticalScrollBar().setValue(self.textBrowser_infoOutput.verticalScrollBar().maximum())
# 主函数
if __name__ == '__main__':
app = QApplication(sys.argv)
mainWindow = MainWindow()
mainWindow.show()
sys.exit(app.exec_())