""" @author:Yosa @file:heartbeat_annotation.py @email:2023025086@m.scnu.edu.cn @time:2025/2/18 """ import sys from logging import NOTSET, getLogger, FileHandler, Formatter, StreamHandler, info, error, debug from time import time, strftime, localtime import numpy as np from PyQt5.QtGui import QFont, QDoubleValidator, QIntValidator from matplotlib.pyplot import title from pandas import DataFrame, read_csv from matplotlib.ticker import FuncFormatter from numpy import load, nan, zeros, append, linspace, place from matplotlib import use from matplotlib import pyplot as plt, gridspec from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg, NavigationToolbar2QT from datetime import datetime from pathlib import Path from PyQt5.QtWidgets import QFileDialog, QMainWindow, QMessageBox, QButtonGroup, QApplication, QTableWidgetItem, QTableWidget, QWidget import resample_1000hz import detect_Rpeak import detect_Jpeak from ui.MainWindow import Ui_MainWindow from ui.widget_func import Ui_widget_func from ui.widget_resample1000Hz import Ui_widget_resample1000Hz from ui.widget_detect_Rpeaks import Ui_widget_detect_Rpeaks from ui.widget_detect_Jpeaks import Ui_widget_detect_Jpeaks use("Qt5Agg") # 声明使用 QT5 # 设置日志 logger = getLogger() logger.setLevel(NOTSET) realtime = strftime('%Y%m%d', localtime(time())) if not Path("logs").exists(): Path("logs").mkdir(exist_ok=True) fh = FileHandler(Path("logs") / (realtime + ".log"), mode='a') fh.setLevel(NOTSET) fh.setFormatter(Formatter("%(asctime)s: %(message)s")) logger.addHandler(fh) ch = StreamHandler() ch.setLevel(NOTSET) ch.setFormatter(Formatter("%(asctime)s: %(message)s")) logger.addHandler(ch) getLogger('matplotlib.font_manager').disabled = True info("------------------------------------") info("------heartbeat_annotation.py-------") class MainWindow(QMainWindow, Ui_MainWindow): root_path = Path("") data1 = None data2 = None data3 = None data4 = None ecg_seq = None R_peak_seq = None Interval_seq = None RRIV_seq = None temp = None # 程序初始化操作 def __init__(self): super(MainWindow, self).__init__() self.setupUi(self) # 设置画框 self.figure = plt.figure(figsize=(12, 6), dpi=150) self.canvas = FigureCanvasQTAgg(self.figure) self.figToolbar = NavigationToolbar2QT(self.canvas) self.verticalLayout_canvas.addWidget(self.canvas) self.verticalLayout_canvas.addWidget(self.figToolbar) # Widget初始化 self.widget_func = QWidget() self.ui_func = Ui_widget_func() self.ui_func.setupUi(self.widget_func) self.widget_resample1000Hz = QWidget() self.ui_resample1000Hz = Ui_widget_resample1000Hz() self.ui_resample1000Hz.setupUi(self.widget_resample1000Hz) self.widget_detect_Rpeaks = QWidget() self.ui_detect_Rpeaks = Ui_widget_detect_Rpeaks() self.ui_detect_Rpeaks.setupUi(self.widget_detect_Rpeaks) self.widget_detect_Jpeaks = QWidget() self.ui_detect_Jpeaks = Ui_widget_detect_Jpeaks() self.ui_detect_Jpeaks.setupUi(self.widget_detect_Jpeaks) # 界面状态初始化 self.verticalLayout_menu.addWidget(self.widget_func) self.ui_detect_Rpeaks.groupBox_detect_Rpeaks_signal_parts_list.setEnabled(False) # 定义验证器,用于规范lineEdit的输入内容 validator_double = QDoubleValidator(-1e100, 1e100, 10) validator_integer = QIntValidator(-2**31, 2**31 - 1) self.ui_resample1000Hz.lineEdit_resample1000Hz_original_sampling_rate.setValidator(validator_integer) self.ui_resample1000Hz.lineEdit_resample1000Hz_target_sampling_rate.setValidator(validator_integer) self.ui_resample1000Hz.lineEdit_resample1000Hz_cut_second.setValidator(validator_integer) self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_sampling_rate.setValidator(validator_integer) self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_peaks_value.setValidator(validator_integer) self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_bandpass_low.setValidator(validator_integer) self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_bandpass_high.setValidator(validator_integer) self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_sampling_rate.setValidator(validator_integer) self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_peaks_value.setValidator(validator_integer) self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_amp_value.setValidator(validator_integer) self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_interval_low.setValidator(validator_integer) self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_interval_high.setValidator(validator_integer) self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_sampling_rate.setValidator(validator_integer) self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_bandpass_low.setValidator(validator_integer) self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_bandpass_high.setValidator(validator_integer) # 设置表格属性 self.ui_detect_Rpeaks.tableWidget_detect_Rpeaks_signal_parts_list.setHorizontalHeaderLabels(['信号片段']) self.ui_detect_Rpeaks.tableWidget_detect_Rpeaks_signal_parts_list.setEditTriggers(QTableWidget.NoEditTriggers) self.ui_detect_Rpeaks.tableWidget_detect_Rpeaks_signal_parts_list.horizontalHeader().setStretchLastSection(True) self.ui_detect_Rpeaks.tableWidget_detect_Rpeaks_signal_parts_list.horizontalHeader().setSectionResizeMode(1) # 槽函数连接初始化 self.ui_func.pushButton_rootpath_open.clicked.connect(self.slot_btn_rootpath_open) self.ui_func.pushButton_resample1000Hz.clicked.connect(self.slot_btn_resample1000Hz) self.ui_func.pushButton_detect_Rpeaks.clicked.connect(self.slot_btn_detect_Rpeaks) self.ui_func.pushButton_detect_Jpeaks.clicked.connect(self.slot_btn_detect_Jpeaks) self.pushButton_backToMenu.clicked.connect(self.slot_btn_backToMenu) self.ui_resample1000Hz.pushButton_resample1000Hz_view.clicked.connect(self.slot_btn_resample1000Hz_view) self.ui_resample1000Hz.pushButton_resample1000Hz_save.clicked.connect(self.slot_btn_resample1000Hz_save) self.ui_detect_Rpeaks.pushButton_detect_Rpeaks_view.clicked.connect(self.slot_btn_detect_Rpeaks_view) self.ui_detect_Rpeaks.pushButton_detect_Rpeaks_save.clicked.connect(self.slot_btn_detect_Rpeaks_save) self.ui_detect_Rpeaks.pushButton_detect_Rpeaks_left.clicked.connect(self.slot_btn_detect_Rpeaks_left) self.ui_detect_Rpeaks.pushButton_detect_Rpeaks_right.clicked.connect(self.slot_btn_detect_Rpeaks_right) self.ui_detect_Rpeaks.tableWidget_detect_Rpeaks_signal_parts_list.cellDoubleClicked.connect(self.slot_tableWidget_detect_Rpeaks_signal_parts_list_on_cell_double_clicked) self.ui_detect_Jpeaks.pushButton_detect_Jpeaks_view.clicked.connect(self.slot_btn_detect_Jpeaks_view) self.ui_detect_Jpeaks.pushButton_detect_Jpeaks_save.clicked.connect(self.slot_btn_detect_Jpeaks_save) # 消息弹窗初始化 self.msgBox = QMessageBox() self.msgBox.setWindowTitle("消息") def slot_btn_rootpath_open(self): fileDialog = QFileDialog() if self.sender() == self.ui_func.pushButton_rootpath_open: fileDialog.setFileMode(QFileDialog.Directory) fileDialog.setOption(QFileDialog.ShowDirsOnly, True) if fileDialog.exec_() == QFileDialog.Accepted: self.root_path = fileDialog.selectedFiles()[0] if not self.root_path: error("Root Path not Exist...") self.textBrowser_update("操作:根目录路径输入错误") self.msgBox.setText("根目录路径输入错误") self.msgBox.setIcon(QMessageBox.Critical) self.msgBox.exec() return self.ui_func.lineEdit_rootpath.setText(self.root_path) self.root_path = Path(self.root_path) info("Loading Root Path...") else: info("Canceled Loading Root Path.") self.textBrowser_update("提示:根目录路径选择取消") self.msgBox.setText("根目录路径选择取消") self.msgBox.setIcon(QMessageBox.Warning) self.msgBox.exec() def slot_btn_resample1000Hz(self): raw_org_path = self.root_path / "raw_org.txt" DSbcg_sig_path = self.root_path / "bcg_test" / "DSbcg_sig.txt" if not raw_org_path.exists() or not DSbcg_sig_path.exists(): error("Can't Find raw_org.txt or DSbcg_sig.txt.") self.textBrowser_update("错误:无法找到raw_org.txt或DSbcg_sig.txt,无法执行<重采样>,请检查文件是否存在") self.msgBox.setText("无法找到raw_org.txt或DSbcg_sig.txt,无法执行<重采样>,请检查文件是否存在") self.msgBox.setIcon(QMessageBox.Critical) self.msgBox.exec() return info("Found raw_org.txt and DSbcg_sig.txt.") self.widget_func.setParent(None) self.verticalLayout_menu.removeWidget(self.widget_func) self.verticalLayout_menu.addWidget(self.widget_resample1000Hz) # 画框子图初始化 self.gs = gridspec.GridSpec(1, 1, height_ratios=[1]) self.figure.subplots_adjust(top=1, bottom=0, right=1, left=0, hspace=0, wspace=0) plt.margins(0, 0) plt.tight_layout() plt.xticks([]) plt.yticks([]) self.ax0 = self.figure.add_subplot(self.gs[0]) self.ax0 = plt.gca() self.ax0.grid(True) self.ax0.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}")) self.ui_resample1000Hz.lineEdit_resample1000Hz_raw_org_path.setText(str(raw_org_path)) self.ui_resample1000Hz.lineEdit_resample1000Hz_DSbcg_sig_path.setText(str(DSbcg_sig_path)) self.ui_resample1000Hz.lineEdit_resample1000Hz_save_path.setText(str(self.root_path / "DSbcg_sig_1000hz3.txt")) self.textBrowser_update("提示:找到raw_org.txt和DSbcg_sig.txt") self.data1 = read_csv(raw_org_path, encoding="utf-8").to_numpy() self.data2 = read_csv(DSbcg_sig_path, encoding="utf-8", sep="\t") def slot_btn_resample1000Hz_view(self): if self.ui_resample1000Hz.lineEdit_resample1000Hz_original_sampling_rate.text() != "" and self.ui_resample1000Hz.lineEdit_resample1000Hz_target_sampling_rate.text() != "" and self.ui_resample1000Hz.lineEdit_resample1000Hz_cut_second.text() != "": self.ax0.remove() self.ax0 = self.figure.add_subplot(self.gs[0]) self.ax0 = plt.gca() self.ax0.grid(True) self.ax0.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}")) data_before = self.data1 data_test = resample_1000hz.upsample(self.data2.iloc[:, 2], float(self.ui_resample1000Hz.lineEdit_resample1000Hz_original_sampling_rate.text()), float(self.ui_resample1000Hz.lineEdit_resample1000Hz_target_sampling_rate.text())) data_new = resample_1000hz.upsample(self.data2.iloc[int(float(self.ui_resample1000Hz.lineEdit_resample1000Hz_original_sampling_rate.text()) * float(self.ui_resample1000Hz.lineEdit_resample1000Hz_cut_second.text())):, 2], float(self.ui_resample1000Hz.lineEdit_resample1000Hz_original_sampling_rate.text()), float(self.ui_resample1000Hz.lineEdit_resample1000Hz_target_sampling_rate.text())) self.ax0.plot(data_before, 'r', label="Original Data") self.ax0.plot(data_test + 200, 'g', label="Filtered Data") self.ax0.plot(data_new, 'b', label="Data After Cut") self.ax0.legend(loc='upper right') self.canvas.draw() self.data3 = data_new info("Finished Data Plot.") self.textBrowser_update("提示:完成绘图") else: error(f"Miss Args for Resample1000Hz.") self.textBrowser_update("错误:参数输入存在空值") self.msgBox.setText("参数输入存在空值") self.msgBox.setIcon(QMessageBox.Critical) self.msgBox.exec() def slot_btn_resample1000Hz_save(self): if self.data3 is not None: if self.ui_resample1000Hz.lineEdit_resample1000Hz_save_path.text() != "" and self.ui_resample1000Hz.lineEdit_resample1000Hz_save_path.text().endswith(".txt"): reply = QMessageBox.question(self, "警告:确认操作", f"你确定要将裁剪结果保存到{self.ui_resample1000Hz.lineEdit_resample1000Hz_save_path.text()}?", QMessageBox.Yes | QMessageBox.No, QMessageBox.No) if reply == QMessageBox.Yes: np.savetxt(self.ui_resample1000Hz.lineEdit_resample1000Hz_save_path.text(), self.data3, fmt='%.4f') info(f"Saved Data After Cut to {self.ui_resample1000Hz.lineEdit_resample1000Hz_save_path.text()}.") self.textBrowser_update(f"提示:保存成功至{self.ui_resample1000Hz.lineEdit_resample1000Hz_save_path.text()}") self.msgBox.setText(f"保存成功至{self.ui_resample1000Hz.lineEdit_resample1000Hz_save_path.text()}") self.msgBox.setIcon(QMessageBox.Information) self.msgBox.exec() else: self.textBrowser_update(f"提示:保存操作取消") else: self.textBrowser_update("错误:保存路径输入有误,请检查后重新执行保存") self.msgBox.setText("保存路径输入有误,请检查后重新执行保存") self.msgBox.setIcon(QMessageBox.Information) self.msgBox.exec() else: error(f"data new is None.") self.textBrowser_update("错误:裁切后的数据不存在") self.msgBox.setText("裁切后的数据不存在") self.msgBox.setIcon(QMessageBox.Critical) self.msgBox.exec() def slot_btn_detect_Rpeaks(self): filter_ecg_path = self.root_path / "filter_ecg.txt" if not filter_ecg_path.exists(): error("Can't Find filter_ecg.txt.") self.textBrowser_update("错误:无法找到filter_ecg.txt,无法执行,请检查文件是否存在") self.msgBox.setText("无法找到filter_ecg.txt,无法执行,请检查文件是否存在") self.msgBox.setIcon(QMessageBox.Critical) self.msgBox.exec() return info("Found filter_ecg.txt.") self.widget_func.setParent(None) self.verticalLayout_menu.removeWidget(self.widget_func) self.verticalLayout_menu.addWidget(self.widget_detect_Rpeaks) # 画框子图初始化 self.gs = gridspec.GridSpec(2, 1, height_ratios=[1, 1]) self.figure.subplots_adjust(top=1, bottom=0, right=1, left=0, hspace=0, wspace=0) plt.margins(0, 0) plt.tight_layout() plt.xticks([]) plt.yticks([]) self.ax0 = self.figure.add_subplot(self.gs[0]) self.ax0 = plt.gca() self.ax0.grid(True) self.ax0.tick_params(axis='x', colors='white') self.ax0.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}")) self.ax1 = self.figure.add_subplot(self.gs[1], sharex=self.ax0) self.ax1 = plt.gca() self.ax1.grid(True) self.ax1.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}")) self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_filter_ecg_path.setText(str(filter_ecg_path)) self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_save_path.setText(str(self.root_path / "label")) self.textBrowser_update("提示:找到filter_ecg.txt") self.data1 = read_csv(filter_ecg_path, encoding="utf-8").to_numpy().reshape(-1) def slot_btn_detect_Rpeaks_view(self): if self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_sampling_rate.text() != "" and self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_peaks_value.text() != "" and self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_bandpass_low.text() != "" and self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_bandpass_high.text() != "": self.ax0.remove() self.ax0 = self.figure.add_subplot(self.gs[0]) self.ax0 = plt.gca() self.ax0.grid(True) self.ax0.tick_params(axis='x', colors='white') self.ax0.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}")) self.ax1.remove() self.ax1 = self.figure.add_subplot(self.gs[1], sharex=self.ax0) self.ax1 = plt.gca() self.ax1.grid(True) self.ax1.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}")) ecg_data = self.data1 if self.ui_detect_Rpeaks.radioButton_detector_method_pt.isChecked(): detector_method = 'pt' elif self.ui_detect_Rpeaks.radioButton_detector_method_ta.isChecked(): detector_method = 'ta' elif self.ui_detect_Rpeaks.radioButton_detector_method_Wt.isChecked(): detector_method = 'Wt' elif self.ui_detect_Rpeaks.radioButton_detector_method_Hamilton.isChecked(): detector_method = 'Hamilton' elif self.ui_detect_Rpeaks.radioButton_detector_method_Engzee.isChecked(): detector_method = 'Engzee' self.ecg_seq, self.R_peak_seq, self.Interval_seq, self.RRIV_seq = detect_Rpeak.Rpeak_Detection(ecg_data, int(self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_sampling_rate.text()), int(self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_bandpass_low.text()), int(self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_bandpass_high.text()), int(self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_peaks_value.text()), detector_method) if len(self.ecg_seq) != len(self.R_peak_seq) != len(self.Interval_seq) != len(self.RRIV_seq): error("len(self.ecg_seq) and len(self.R_peak_seq) and len(self.Interval_seq) and len(self.RRIV_seq) are not equal.") self.textBrowser_update("错误:ecg_seq和R_peak_seq和Interval_seq和RRIV_seq的长度不相等") return info(f"Data Length: {len(ecg_data)}") self.textBrowser_update(f"数据长度:{len(ecg_data)}") info(f"Data Duration: {len(ecg_data) / int(self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_sampling_rate.text()) / 60} min") self.textBrowser_update(f"数据时长:{len(ecg_data) / int(self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_sampling_rate.text()) / 60}分钟") info(f"Data Parts: {len(self.ecg_seq)} hours") self.textBrowser_update(f"数据总时长:{len(self.ecg_seq)}小时") self.ui_detect_Rpeaks.tableWidget_detect_Rpeaks_signal_parts_list.setRowCount(len(self.ecg_seq)) for row in range(len(self.ecg_seq)): item = QTableWidgetItem(str(row + 1)) self.ui_detect_Rpeaks.tableWidget_detect_Rpeaks_signal_parts_list.setItem(row, 0, item) self.ui_detect_Rpeaks.groupBox_detect_Rpeaks_signal_parts_list.setEnabled(True) self.ax0.plot(self.R_peak_seq[0][2: ], self.RRIV_seq[0], 'r.', label="RRIV") self.ax0.legend(loc='upper right') self.ax1.plot(self.ecg_seq[0], 'r', label="ECG") self.ax1.plot(self.R_peak_seq[0], self.ecg_seq[0][self.R_peak_seq[0]], 'b*', label="R_peaks") self.ax1.plot(self.Interval_seq[0], 'g', label="Interval") self.ax1.legend(loc='upper right') self.canvas.draw() self.temp = 0 info("Finished R peaks Detect and Data Part 1 Plot.") self.textBrowser_update("提示:完成R峰提取并绘制信号第1段") else: error(f"Miss Args for detect_Rpeaks.") self.textBrowser_update("错误:参数输入存在空值") self.msgBox.setText("参数输入存在空值") self.msgBox.setIcon(QMessageBox.Critical) self.msgBox.exec() def slot_btn_detect_Rpeaks_save(self): if self.ecg_seq is not None and self.R_peak_seq is not None: if self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_save_path.text() != "": if Path(self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_save_path.text()).is_dir() == False: Path(self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_save_path.text()).mkdir(parents=True, exist_ok=True) info("Save Path is not Exist, Made it as a New Directory.") self.textBrowser_update("提示:检测到保存路径所指向的文件夹不存在,已创建相应文件夹") reply = QMessageBox.question(self, "警告:确认操作", f"你确定要将裁剪结果保存到{self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_save_path.text()}?", QMessageBox.Yes | QMessageBox.No, QMessageBox.No) if reply == QMessageBox.Yes: for idx in range(len(self.ecg_seq)): DataFrame(self.ecg_seq[idx].reshape(-1)).to_csv(str(Path(self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_save_path.text()) / f"{idx + 1}ecg.txt"), index=False, header=None) DataFrame(self.R_peak_seq[idx].reshape(-1)).to_csv(str(Path(self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_save_path.text()) / f"{idx + 1}Rpeak.txt"), index=False, header=None) info(f"Saved Data {idx + 1} to Directory {self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_save_path.text()}.") self.textBrowser_update(f"提示:保存片段{idx + 1}成功至文件夹{self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_save_path.text()}") self.msgBox.setText(f"保存成功至{self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_save_path.text()}") self.msgBox.setIcon(QMessageBox.Information) self.msgBox.exec() else: self.textBrowser_update(f"提示:保存操作取消") else: self.textBrowser_update("错误:保存路径输入有误,请检查后重新执行保存") self.msgBox.setText("保存路径输入有误,请检查后重新执行保存") self.msgBox.setIcon(QMessageBox.Information) self.msgBox.exec() else: error(f"data is None.") self.textBrowser_update("错误:需要保存的数据不存在") self.msgBox.setText("需要保存的数据不存在") self.msgBox.setIcon(QMessageBox.Critical) self.msgBox.exec() def slot_tableWidget_detect_Rpeaks_signal_parts_list_on_cell_double_clicked(self, row, column): self.temp = int(self.ui_detect_Rpeaks.tableWidget_detect_Rpeaks_signal_parts_list.item(row, column).text()) - 1 self.detect_Rpeaks_update_plot(self.temp) info(f"Finished Data Part {self.temp + 1} Plot.") self.textBrowser_update(f"提示:完成绘制信号第{self.temp + 1}段") def slot_btn_detect_Rpeaks_left(self): if self.temp <= 0: self.msgBox.setText("你正在查看第1段信号") self.msgBox.setIcon(QMessageBox.Warning) self.msgBox.exec() else: self.temp -= 1 self.detect_Rpeaks_update_plot(self.temp) info(f"Finished Data Part {self.temp + 1} Plot.") self.textBrowser_update(f"提示:完成绘制信号第{self.temp + 1}段") def slot_btn_detect_Rpeaks_right(self): if self.temp >= int(self.ui_detect_Rpeaks.tableWidget_detect_Rpeaks_signal_parts_list.item(self.ui_detect_Rpeaks.tableWidget_detect_Rpeaks_signal_parts_list.rowCount() - 1, 0).text()) - 1: self.msgBox.setText("你正在查看最后1段信号") self.msgBox.setIcon(QMessageBox.Warning) self.msgBox.exec() else: self.temp += 1 self.detect_Rpeaks_update_plot(self.temp) info(f"Finished Data Part {self.temp + 1} Plot.") self.textBrowser_update(f"提示:完成绘制信号第{self.temp + 1}段") def detect_Rpeaks_update_plot(self, part_index): self.ax0.remove() self.ax0 = self.figure.add_subplot(self.gs[0]) self.ax0 = plt.gca() self.ax0.grid(True) self.ax0.tick_params(axis='x', colors='white') self.ax0.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}")) self.ax1.remove() self.ax1 = self.figure.add_subplot(self.gs[1], sharex=self.ax0) self.ax1 = plt.gca() self.ax1.grid(True) self.ax1.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}")) self.ax0.plot(self.R_peak_seq[part_index][2:], self.RRIV_seq[part_index], 'r.', label="RRIV") self.ax0.legend(loc='upper right') self.ax1.plot(self.ecg_seq[part_index], 'r', label="ECG") self.ax1.plot(self.R_peak_seq[part_index], self.ecg_seq[part_index][self.R_peak_seq[part_index]], 'b*', label="R_peaks") self.ax1.plot(self.Interval_seq[part_index], 'g', label="Interval") self.ax1.legend(loc='upper right') self.canvas.draw() def slot_btn_detect_Jpeaks(self): DSbcg_sig_1000hz3_path = self.root_path / "DSbcg_sig_1000hz3.txt" if not DSbcg_sig_1000hz3_path.exists(): error("Can't Find DSbcg_sig_1000hz3.txt.") self.textBrowser_update("错误:无法找到DSbcg_sig_1000hz3.txt,无法执行,请检查文件是否存在") self.msgBox.setText("无法找到DSbcg_sig_1000hz3.txt,无法执行,请检查文件是否存在") self.msgBox.setIcon(QMessageBox.Critical) self.msgBox.exec() return info("Found DSbcg_sig_1000hz3.txt.") self.widget_func.setParent(None) self.verticalLayout_menu.removeWidget(self.widget_func) self.verticalLayout_menu.addWidget(self.widget_detect_Jpeaks) # 画框子图初始化 self.gs = gridspec.GridSpec(1, 1, height_ratios=[1]) self.figure.subplots_adjust(top=1, bottom=0, right=1, left=0, hspace=0, wspace=0) plt.margins(0, 0) plt.tight_layout() plt.xticks([]) plt.yticks([]) self.ax0 = self.figure.add_subplot(self.gs[0]) self.ax0 = plt.gca() self.ax0.grid(True) self.ax0.tick_params(axis='x', colors='white') self.ax0.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}")) self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_DSbcg_sig_1000hz3_path.setText(str(DSbcg_sig_1000hz3_path)) self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_save_path.setText(str(Path(self.root_path) / "Jpeak.txt")) self.textBrowser_update("提示:找到DSbcg_sig_1000hz3.txt") self.data1 = np.array(read_csv(DSbcg_sig_1000hz3_path, header=None)).reshape(-1) def slot_btn_detect_Jpeaks_view(self): if self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_sampling_rate.text() != "" and self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_peaks_value.text() != "" and self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_bandpass_low.text() != "" and self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_bandpass_high.text() != "" and self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_amp_value.text() != "" and self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_interval_low.text() != "" and self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_interval_high.text() != "": self.ax0.remove() self.ax0 = self.figure.add_subplot(self.gs[0]) self.ax0 = plt.gca() self.ax0.grid(True) self.ax0.tick_params(axis='x', colors='white') self.ax0.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}")) bcg_data = self.data1 if self.ui_detect_Jpeaks.radioButton_Fivelayer_Unet_1.isChecked(): detector_method = 'Fivelayer_Unet_1' elif self.ui_detect_Jpeaks.radioButton_Fivelayer_Unet_2.isChecked(): detector_method = 'Fivelayer_Unet_2' elif self.ui_detect_Jpeaks.radioButton_Fivelayer_Lstm_Unet_1.isChecked(): detector_method = 'Fivelayer_Lstm_Unet_1' elif self.ui_detect_Jpeaks.radioButton_Fivelayer_Lstm_Unet_2.isChecked(): detector_method = 'Fivelayer_Lstm_Unet_2' else: detector_method = 'Fivelayer_Unet_1' if self.ui_detect_Jpeaks.checkBox_useCPU.isChecked() == True: useCPU = True else: useCPU = False self.data2, self.data3, self.data4 = detect_Jpeak.Jpeak_Detection(bcg_data, detector_method, int(self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_sampling_rate.text()), int(self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_bandpass_low.text()), int(self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_bandpass_high.text()), int(self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_interval_high.text()), int(self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_interval_low.text()), int(self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_peaks_value.text()), int(self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_amp_value.text()), useCPU) info(f"Data Length: {len(bcg_data)}") self.textBrowser_update(f"数据长度:{len(bcg_data)}") info(f"Data Duration: {len(bcg_data) / int(self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_sampling_rate.text()) / 60} min") self.textBrowser_update(f"数据时长:{len(bcg_data) / int(self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_sampling_rate.text()) / 60} 分钟") info(f"Quantity of J_peaks: {len(self.data3)}") self.textBrowser_update(f"J峰个数:{len(self.data3)}") self.ax0.plot(self.data2, 'b', label="BCG_filtered") self.ax0.plot(self.data3, self.data2[self.data3], 'r.', label="Predict_J_peaks") self.ax0.plot(self.data4, 'orange', label="Interval") self.ax0.legend(loc='upper right') self.canvas.draw() info("Finished J peaks Detect and Data Plot.") self.textBrowser_update("提示:完成J峰提取并绘制信号") else: error(f"Miss Args for detect_Jpeaks.") self.textBrowser_update("错误:参数输入存在空值") self.msgBox.setText("参数输入存在空值") self.msgBox.setIcon(QMessageBox.Critical) self.msgBox.exec() def slot_btn_detect_Jpeaks_save(self): if self.data2 is not None and self.data3 is not None and self.data4 is not None: if self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_save_path.text() != "" and self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_save_path.text().endswith(".txt"): reply = QMessageBox.question(self, "警告:确认操作", f"你确定要将预测结果保存到{self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_save_path.text()}?", QMessageBox.Yes | QMessageBox.No, QMessageBox.No) if reply == QMessageBox.Yes: DataFrame(self.data3.reshape(-1)).to_csv(self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_save_path.text(), index=False, header=None) info(f"Saved Data to Directory {self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_save_path.text()}.") self.textBrowser_update(f"提示:保存数据成功至文件夹{self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_save_path.text()}") self.msgBox.setText(f"保存成功至{self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_save_path.text()}") self.msgBox.setIcon(QMessageBox.Information) self.msgBox.exec() else: self.textBrowser_update(f"提示:保存操作取消") else: self.textBrowser_update("错误:保存路径输入有误,请检查后重新执行保存") self.msgBox.setText("保存路径输入有误,请检查后重新执行保存") self.msgBox.setIcon(QMessageBox.Information) self.msgBox.exec() else: error(f"data is None.") self.textBrowser_update("错误:需要保存的数据不存在") self.msgBox.setText("需要保存的数据不存在") self.msgBox.setIcon(QMessageBox.Critical) self.msgBox.exec() def slot_btn_backToMenu(self): self.data1 = None self.data2 = None self.data3 = None self.data4 = None self.ecg_seq = None self.R_peak_seq = None self.Interval_seq = None self.RRIV_seq = None self.temp = None self.ui_detect_Rpeaks.tableWidget_detect_Rpeaks_signal_parts_list.clearContents() self.ui_detect_Rpeaks.groupBox_detect_Rpeaks_signal_parts_list.setEnabled(False) for i in range(self.verticalLayout_menu.count()): item = self.verticalLayout_menu.itemAt(i) if item.widget(): widget = item.widget() if widget.objectName() == "widget_resample1000Hz" or widget.objectName() == "widget_detect_Rpeaks" or widget.objectName() == "widget_detect_Jpeaks": widget.setParent(None) self.verticalLayout_menu.removeWidget(widget) self.verticalLayout_menu.addWidget(self.widget_func) self.figure.clf() def textBrowser_update(self, message): self.textBrowser_infoOutput.append(str(datetime.now().strftime("%H:%M:%S")) + ": " + message) self.textBrowser_infoOutput.verticalScrollBar().setValue(self.textBrowser_infoOutput.verticalScrollBar().maximum()) # 主函数 if __name__ == '__main__': app = QApplication(sys.argv) mainWindow = MainWindow() mainWindow.show() sys.exit(app.exec_())