566 lines
33 KiB
Python
566 lines
33 KiB
Python
"""
|
||
@author:Yosa
|
||
@file:heartbeat_annotation.py
|
||
@email:2023025086@m.scnu.edu.cn
|
||
@time:2025/2/18
|
||
"""
|
||
|
||
import sys
|
||
from logging import NOTSET, getLogger, FileHandler, Formatter, StreamHandler, info, error, debug
|
||
from time import time, strftime, localtime
|
||
|
||
import numpy as np
|
||
from PyQt5.QtGui import QFont, QDoubleValidator, QIntValidator
|
||
from matplotlib.pyplot import title
|
||
from pandas import DataFrame, read_csv
|
||
from matplotlib.ticker import FuncFormatter
|
||
from numpy import load, nan, zeros, append, linspace, place
|
||
from matplotlib import use
|
||
from matplotlib import pyplot as plt, gridspec
|
||
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg, NavigationToolbar2QT
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from PyQt5.QtWidgets import QFileDialog, QMainWindow, QMessageBox, QButtonGroup, QApplication, QTableWidgetItem, QTableWidget, QWidget
|
||
|
||
import resample_1000hz
|
||
import detect_Rpeak
|
||
import detect_Jpeak
|
||
from ui.MainWindow import Ui_MainWindow
|
||
from ui.widget_func import Ui_widget_func
|
||
from ui.widget_resample1000Hz import Ui_widget_resample1000Hz
|
||
from ui.widget_detect_Rpeaks import Ui_widget_detect_Rpeaks
|
||
from ui.widget_detect_Jpeaks import Ui_widget_detect_Jpeaks
|
||
|
||
use("Qt5Agg") # 声明使用 QT5
|
||
|
||
# 设置日志
|
||
logger = getLogger()
|
||
logger.setLevel(NOTSET)
|
||
realtime = strftime('%Y%m%d', localtime(time()))
|
||
if not Path("logs").exists():
|
||
Path("logs").mkdir(exist_ok=True)
|
||
fh = FileHandler(Path("logs") / (realtime + ".log"), mode='a')
|
||
fh.setLevel(NOTSET)
|
||
fh.setFormatter(Formatter("%(asctime)s: %(message)s"))
|
||
logger.addHandler(fh)
|
||
|
||
ch = StreamHandler()
|
||
ch.setLevel(NOTSET)
|
||
ch.setFormatter(Formatter("%(asctime)s: %(message)s"))
|
||
logger.addHandler(ch)
|
||
getLogger('matplotlib.font_manager').disabled = True
|
||
info("------------------------------------")
|
||
info("------heartbeat_annotation.py-------")
|
||
|
||
class MainWindow(QMainWindow, Ui_MainWindow):
|
||
|
||
root_path = Path("")
|
||
data1 = None
|
||
data2 = None
|
||
data3 = None
|
||
data4 = None
|
||
ecg_seq = None
|
||
R_peak_seq = None
|
||
Interval_seq = None
|
||
RRIV_seq = None
|
||
temp = None
|
||
|
||
# 程序初始化操作
|
||
def __init__(self):
|
||
super(MainWindow, self).__init__()
|
||
self.setupUi(self)
|
||
|
||
# 设置画框
|
||
self.figure = plt.figure(figsize=(12, 6), dpi=150)
|
||
self.canvas = FigureCanvasQTAgg(self.figure)
|
||
self.figToolbar = NavigationToolbar2QT(self.canvas)
|
||
self.verticalLayout_canvas.addWidget(self.canvas)
|
||
self.verticalLayout_canvas.addWidget(self.figToolbar)
|
||
|
||
# Widget初始化
|
||
self.widget_func = QWidget()
|
||
self.ui_func = Ui_widget_func()
|
||
self.ui_func.setupUi(self.widget_func)
|
||
self.widget_resample1000Hz = QWidget()
|
||
self.ui_resample1000Hz = Ui_widget_resample1000Hz()
|
||
self.ui_resample1000Hz.setupUi(self.widget_resample1000Hz)
|
||
self.widget_detect_Rpeaks = QWidget()
|
||
self.ui_detect_Rpeaks = Ui_widget_detect_Rpeaks()
|
||
self.ui_detect_Rpeaks.setupUi(self.widget_detect_Rpeaks)
|
||
self.widget_detect_Jpeaks = QWidget()
|
||
self.ui_detect_Jpeaks = Ui_widget_detect_Jpeaks()
|
||
self.ui_detect_Jpeaks.setupUi(self.widget_detect_Jpeaks)
|
||
|
||
# 界面状态初始化
|
||
self.verticalLayout_menu.addWidget(self.widget_func)
|
||
self.ui_detect_Rpeaks.groupBox_detect_Rpeaks_signal_parts_list.setEnabled(False)
|
||
|
||
# 定义验证器,用于规范lineEdit的输入内容
|
||
validator_double = QDoubleValidator(-1e100, 1e100, 10)
|
||
validator_integer = QIntValidator(-2**31, 2**31 - 1)
|
||
self.ui_resample1000Hz.lineEdit_resample1000Hz_original_sampling_rate.setValidator(validator_integer)
|
||
self.ui_resample1000Hz.lineEdit_resample1000Hz_target_sampling_rate.setValidator(validator_integer)
|
||
self.ui_resample1000Hz.lineEdit_resample1000Hz_cut_second.setValidator(validator_integer)
|
||
self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_sampling_rate.setValidator(validator_integer)
|
||
self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_peaks_value.setValidator(validator_integer)
|
||
self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_bandpass_low.setValidator(validator_integer)
|
||
self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_bandpass_high.setValidator(validator_integer)
|
||
self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_sampling_rate.setValidator(validator_integer)
|
||
self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_peaks_value.setValidator(validator_integer)
|
||
self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_amp_value.setValidator(validator_integer)
|
||
self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_interval_low.setValidator(validator_integer)
|
||
self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_interval_high.setValidator(validator_integer)
|
||
self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_sampling_rate.setValidator(validator_integer)
|
||
self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_bandpass_low.setValidator(validator_integer)
|
||
self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_bandpass_high.setValidator(validator_integer)
|
||
|
||
# 设置表格属性
|
||
self.ui_detect_Rpeaks.tableWidget_detect_Rpeaks_signal_parts_list.setHorizontalHeaderLabels(['信号片段'])
|
||
self.ui_detect_Rpeaks.tableWidget_detect_Rpeaks_signal_parts_list.setEditTriggers(QTableWidget.NoEditTriggers)
|
||
self.ui_detect_Rpeaks.tableWidget_detect_Rpeaks_signal_parts_list.horizontalHeader().setStretchLastSection(True)
|
||
self.ui_detect_Rpeaks.tableWidget_detect_Rpeaks_signal_parts_list.horizontalHeader().setSectionResizeMode(1)
|
||
|
||
# 槽函数连接初始化
|
||
self.ui_func.pushButton_rootpath_open.clicked.connect(self.slot_btn_rootpath_open)
|
||
self.ui_func.pushButton_resample1000Hz.clicked.connect(self.slot_btn_resample1000Hz)
|
||
self.ui_func.pushButton_detect_Rpeaks.clicked.connect(self.slot_btn_detect_Rpeaks)
|
||
self.ui_func.pushButton_detect_Jpeaks.clicked.connect(self.slot_btn_detect_Jpeaks)
|
||
self.pushButton_backToMenu.clicked.connect(self.slot_btn_backToMenu)
|
||
self.ui_resample1000Hz.pushButton_resample1000Hz_view.clicked.connect(self.slot_btn_resample1000Hz_view)
|
||
self.ui_resample1000Hz.pushButton_resample1000Hz_save.clicked.connect(self.slot_btn_resample1000Hz_save)
|
||
self.ui_detect_Rpeaks.pushButton_detect_Rpeaks_view.clicked.connect(self.slot_btn_detect_Rpeaks_view)
|
||
self.ui_detect_Rpeaks.pushButton_detect_Rpeaks_save.clicked.connect(self.slot_btn_detect_Rpeaks_save)
|
||
self.ui_detect_Rpeaks.pushButton_detect_Rpeaks_left.clicked.connect(self.slot_btn_detect_Rpeaks_left)
|
||
self.ui_detect_Rpeaks.pushButton_detect_Rpeaks_right.clicked.connect(self.slot_btn_detect_Rpeaks_right)
|
||
self.ui_detect_Rpeaks.tableWidget_detect_Rpeaks_signal_parts_list.cellDoubleClicked.connect(self.slot_tableWidget_detect_Rpeaks_signal_parts_list_on_cell_double_clicked)
|
||
self.ui_detect_Jpeaks.pushButton_detect_Jpeaks_view.clicked.connect(self.slot_btn_detect_Jpeaks_view)
|
||
self.ui_detect_Jpeaks.pushButton_detect_Jpeaks_save.clicked.connect(self.slot_btn_detect_Jpeaks_save)
|
||
|
||
# 消息弹窗初始化
|
||
self.msgBox = QMessageBox()
|
||
self.msgBox.setWindowTitle("消息")
|
||
|
||
def slot_btn_rootpath_open(self):
|
||
fileDialog = QFileDialog()
|
||
if self.sender() == self.ui_func.pushButton_rootpath_open:
|
||
fileDialog.setFileMode(QFileDialog.Directory)
|
||
fileDialog.setOption(QFileDialog.ShowDirsOnly, True)
|
||
if fileDialog.exec_() == QFileDialog.Accepted:
|
||
self.root_path = fileDialog.selectedFiles()[0]
|
||
if not self.root_path:
|
||
error("Root Path not Exist...")
|
||
self.textBrowser_update("操作:根目录路径输入错误")
|
||
self.msgBox.setText("根目录路径输入错误")
|
||
self.msgBox.setIcon(QMessageBox.Critical)
|
||
self.msgBox.exec()
|
||
return
|
||
self.ui_func.lineEdit_rootpath.setText(self.root_path)
|
||
self.root_path = Path(self.root_path)
|
||
info("Loading Root Path...")
|
||
else:
|
||
info("Canceled Loading Root Path.")
|
||
self.textBrowser_update("提示:根目录路径选择取消")
|
||
self.msgBox.setText("根目录路径选择取消")
|
||
self.msgBox.setIcon(QMessageBox.Warning)
|
||
self.msgBox.exec()
|
||
|
||
def slot_btn_resample1000Hz(self):
|
||
raw_org_path = self.root_path / "raw_org.txt"
|
||
DSbcg_sig_path = self.root_path / "bcg_test" / "DSbcg_sig.txt"
|
||
if not raw_org_path.exists() or not DSbcg_sig_path.exists():
|
||
error("Can't Find raw_org.txt or DSbcg_sig.txt.")
|
||
self.textBrowser_update("错误:无法找到raw_org.txt或DSbcg_sig.txt,无法执行<重采样>,请检查文件是否存在")
|
||
self.msgBox.setText("无法找到raw_org.txt或DSbcg_sig.txt,无法执行<重采样>,请检查文件是否存在")
|
||
self.msgBox.setIcon(QMessageBox.Critical)
|
||
self.msgBox.exec()
|
||
return
|
||
info("Found raw_org.txt and DSbcg_sig.txt.")
|
||
|
||
self.widget_func.setParent(None)
|
||
self.verticalLayout_menu.removeWidget(self.widget_func)
|
||
self.verticalLayout_menu.addWidget(self.widget_resample1000Hz)
|
||
|
||
# 画框子图初始化
|
||
self.gs = gridspec.GridSpec(1, 1, height_ratios=[1])
|
||
self.figure.subplots_adjust(top=1, bottom=0, right=1, left=0, hspace=0, wspace=0)
|
||
plt.margins(0, 0)
|
||
plt.tight_layout()
|
||
plt.xticks([])
|
||
plt.yticks([])
|
||
self.ax0 = self.figure.add_subplot(self.gs[0])
|
||
self.ax0 = plt.gca()
|
||
self.ax0.grid(True)
|
||
self.ax0.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}"))
|
||
|
||
self.ui_resample1000Hz.lineEdit_resample1000Hz_raw_org_path.setText(str(raw_org_path))
|
||
self.ui_resample1000Hz.lineEdit_resample1000Hz_DSbcg_sig_path.setText(str(DSbcg_sig_path))
|
||
self.ui_resample1000Hz.lineEdit_resample1000Hz_save_path.setText(str(self.root_path / "DSbcg_sig_1000hz3.txt"))
|
||
self.textBrowser_update("提示:找到raw_org.txt和DSbcg_sig.txt")
|
||
self.data1 = read_csv(raw_org_path, encoding="utf-8").to_numpy()
|
||
self.data2 = read_csv(DSbcg_sig_path, encoding="utf-8", sep="\t")
|
||
|
||
def slot_btn_resample1000Hz_view(self):
|
||
if self.ui_resample1000Hz.lineEdit_resample1000Hz_original_sampling_rate.text() != "" and self.ui_resample1000Hz.lineEdit_resample1000Hz_target_sampling_rate.text() != "" and self.ui_resample1000Hz.lineEdit_resample1000Hz_cut_second.text() != "":
|
||
self.ax0.remove()
|
||
self.ax0 = self.figure.add_subplot(self.gs[0])
|
||
self.ax0 = plt.gca()
|
||
self.ax0.grid(True)
|
||
self.ax0.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}"))
|
||
data_before = self.data1
|
||
data_test = resample_1000hz.upsample(self.data2.iloc[:, 2], float(self.ui_resample1000Hz.lineEdit_resample1000Hz_original_sampling_rate.text()), float(self.ui_resample1000Hz.lineEdit_resample1000Hz_target_sampling_rate.text()))
|
||
data_new = resample_1000hz.upsample(self.data2.iloc[int(float(self.ui_resample1000Hz.lineEdit_resample1000Hz_original_sampling_rate.text()) * float(self.ui_resample1000Hz.lineEdit_resample1000Hz_cut_second.text())):, 2], float(self.ui_resample1000Hz.lineEdit_resample1000Hz_original_sampling_rate.text()), float(self.ui_resample1000Hz.lineEdit_resample1000Hz_target_sampling_rate.text()))
|
||
self.ax0.plot(data_before, 'r', label="Original Data")
|
||
self.ax0.plot(data_test + 200, 'g', label="Filtered Data")
|
||
self.ax0.plot(data_new, 'b', label="Data After Cut")
|
||
self.ax0.legend(loc='upper right')
|
||
self.canvas.draw()
|
||
self.data3 = data_new
|
||
info("Finished Data Plot.")
|
||
self.textBrowser_update("提示:完成绘图")
|
||
else:
|
||
error(f"Miss Args for Resample1000Hz.")
|
||
self.textBrowser_update("错误:参数输入存在空值")
|
||
self.msgBox.setText("参数输入存在空值")
|
||
self.msgBox.setIcon(QMessageBox.Critical)
|
||
self.msgBox.exec()
|
||
|
||
def slot_btn_resample1000Hz_save(self):
|
||
if self.data3 is not None:
|
||
if self.ui_resample1000Hz.lineEdit_resample1000Hz_save_path.text() != "" and self.ui_resample1000Hz.lineEdit_resample1000Hz_save_path.text().endswith(".txt"):
|
||
reply = QMessageBox.question(self, "警告:确认操作", f"你确定要将裁剪结果保存到{self.ui_resample1000Hz.lineEdit_resample1000Hz_save_path.text()}?", QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
|
||
if reply == QMessageBox.Yes:
|
||
np.savetxt(self.ui_resample1000Hz.lineEdit_resample1000Hz_save_path.text(), self.data3, fmt='%.4f')
|
||
info(f"Saved Data After Cut to {self.ui_resample1000Hz.lineEdit_resample1000Hz_save_path.text()}.")
|
||
self.textBrowser_update(f"提示:保存成功至{self.ui_resample1000Hz.lineEdit_resample1000Hz_save_path.text()}")
|
||
self.msgBox.setText(f"保存成功至{self.ui_resample1000Hz.lineEdit_resample1000Hz_save_path.text()}")
|
||
self.msgBox.setIcon(QMessageBox.Information)
|
||
self.msgBox.exec()
|
||
else:
|
||
self.textBrowser_update(f"提示:保存操作取消")
|
||
else:
|
||
self.textBrowser_update("错误:保存路径输入有误,请检查后重新执行保存")
|
||
self.msgBox.setText("保存路径输入有误,请检查后重新执行保存")
|
||
self.msgBox.setIcon(QMessageBox.Information)
|
||
self.msgBox.exec()
|
||
else:
|
||
error(f"data new is None.")
|
||
self.textBrowser_update("错误:裁切后的数据不存在")
|
||
self.msgBox.setText("裁切后的数据不存在")
|
||
self.msgBox.setIcon(QMessageBox.Critical)
|
||
self.msgBox.exec()
|
||
|
||
def slot_btn_detect_Rpeaks(self):
|
||
filter_ecg_path = self.root_path / "filter_ecg.txt"
|
||
if not filter_ecg_path.exists():
|
||
error("Can't Find filter_ecg.txt.")
|
||
self.textBrowser_update("错误:无法找到filter_ecg.txt,无法执行<R峰提取>,请检查文件是否存在")
|
||
self.msgBox.setText("无法找到filter_ecg.txt,无法执行<R峰提取>,请检查文件是否存在")
|
||
self.msgBox.setIcon(QMessageBox.Critical)
|
||
self.msgBox.exec()
|
||
return
|
||
info("Found filter_ecg.txt.")
|
||
|
||
self.widget_func.setParent(None)
|
||
self.verticalLayout_menu.removeWidget(self.widget_func)
|
||
self.verticalLayout_menu.addWidget(self.widget_detect_Rpeaks)
|
||
|
||
# 画框子图初始化
|
||
self.gs = gridspec.GridSpec(2, 1, height_ratios=[1, 1])
|
||
self.figure.subplots_adjust(top=1, bottom=0, right=1, left=0, hspace=0, wspace=0)
|
||
plt.margins(0, 0)
|
||
plt.tight_layout()
|
||
plt.xticks([])
|
||
plt.yticks([])
|
||
self.ax0 = self.figure.add_subplot(self.gs[0])
|
||
self.ax0 = plt.gca()
|
||
self.ax0.grid(True)
|
||
self.ax0.tick_params(axis='x', colors='white')
|
||
self.ax0.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}"))
|
||
self.ax1 = self.figure.add_subplot(self.gs[1], sharex=self.ax0)
|
||
self.ax1 = plt.gca()
|
||
self.ax1.grid(True)
|
||
self.ax1.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}"))
|
||
|
||
self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_filter_ecg_path.setText(str(filter_ecg_path))
|
||
self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_save_path.setText(str(self.root_path / "label"))
|
||
self.textBrowser_update("提示:找到filter_ecg.txt")
|
||
self.data1 = read_csv(filter_ecg_path, encoding="utf-8").to_numpy().reshape(-1)
|
||
|
||
def slot_btn_detect_Rpeaks_view(self):
|
||
if self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_sampling_rate.text() != "" and self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_peaks_value.text() != "" and self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_bandpass_low.text() != "" and self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_bandpass_high.text() != "":
|
||
self.ax0.remove()
|
||
self.ax0 = self.figure.add_subplot(self.gs[0])
|
||
self.ax0 = plt.gca()
|
||
self.ax0.grid(True)
|
||
self.ax0.tick_params(axis='x', colors='white')
|
||
self.ax0.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}"))
|
||
self.ax1.remove()
|
||
self.ax1 = self.figure.add_subplot(self.gs[1], sharex=self.ax0)
|
||
self.ax1 = plt.gca()
|
||
self.ax1.grid(True)
|
||
self.ax1.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}"))
|
||
ecg_data = self.data1
|
||
if self.ui_detect_Rpeaks.radioButton_detector_method_pt.isChecked():
|
||
detector_method = 'pt'
|
||
elif self.ui_detect_Rpeaks.radioButton_detector_method_ta.isChecked():
|
||
detector_method = 'ta'
|
||
elif self.ui_detect_Rpeaks.radioButton_detector_method_Wt.isChecked():
|
||
detector_method = 'Wt'
|
||
elif self.ui_detect_Rpeaks.radioButton_detector_method_Hamilton.isChecked():
|
||
detector_method = 'Hamilton'
|
||
elif self.ui_detect_Rpeaks.radioButton_detector_method_Engzee.isChecked():
|
||
detector_method = 'Engzee'
|
||
self.ecg_seq, self.R_peak_seq, self.Interval_seq, self.RRIV_seq = detect_Rpeak.Rpeak_Detection(ecg_data, int(self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_sampling_rate.text()), int(self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_bandpass_low.text()), int(self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_bandpass_high.text()), int(self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_peaks_value.text()), detector_method)
|
||
if len(self.ecg_seq) != len(self.R_peak_seq) != len(self.Interval_seq) != len(self.RRIV_seq):
|
||
error("len(self.ecg_seq) and len(self.R_peak_seq) and len(self.Interval_seq) and len(self.RRIV_seq) are not equal.")
|
||
self.textBrowser_update("错误:ecg_seq和R_peak_seq和Interval_seq和RRIV_seq的长度不相等")
|
||
return
|
||
|
||
info(f"Data Length: {len(ecg_data)}")
|
||
self.textBrowser_update(f"数据长度:{len(ecg_data)}")
|
||
info(f"Data Duration: {len(ecg_data) / int(self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_sampling_rate.text()) / 60} min")
|
||
self.textBrowser_update(f"数据时长:{len(ecg_data) / int(self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_sampling_rate.text()) / 60}分钟")
|
||
info(f"Data Parts: {len(self.ecg_seq)} hours")
|
||
self.textBrowser_update(f"数据总时长:{len(self.ecg_seq)}小时")
|
||
|
||
self.ui_detect_Rpeaks.tableWidget_detect_Rpeaks_signal_parts_list.setRowCount(len(self.ecg_seq))
|
||
for row in range(len(self.ecg_seq)):
|
||
item = QTableWidgetItem(str(row + 1))
|
||
self.ui_detect_Rpeaks.tableWidget_detect_Rpeaks_signal_parts_list.setItem(row, 0, item)
|
||
|
||
self.ui_detect_Rpeaks.groupBox_detect_Rpeaks_signal_parts_list.setEnabled(True)
|
||
|
||
self.ax0.plot(self.R_peak_seq[0][2: ], self.RRIV_seq[0], 'r.', label="RRIV")
|
||
self.ax0.legend(loc='upper right')
|
||
self.ax1.plot(self.ecg_seq[0], 'r', label="ECG")
|
||
self.ax1.plot(self.R_peak_seq[0], self.ecg_seq[0][self.R_peak_seq[0]], 'b*', label="R_peaks")
|
||
self.ax1.plot(self.Interval_seq[0], 'g', label="Interval")
|
||
self.ax1.legend(loc='upper right')
|
||
self.canvas.draw()
|
||
self.temp = 0
|
||
info("Finished R peaks Detect and Data Part 1 Plot.")
|
||
self.textBrowser_update("提示:完成R峰提取并绘制信号第1段")
|
||
else:
|
||
error(f"Miss Args for detect_Rpeaks.")
|
||
self.textBrowser_update("错误:参数输入存在空值")
|
||
self.msgBox.setText("参数输入存在空值")
|
||
self.msgBox.setIcon(QMessageBox.Critical)
|
||
self.msgBox.exec()
|
||
|
||
def slot_btn_detect_Rpeaks_save(self):
|
||
if self.ecg_seq is not None and self.R_peak_seq is not None:
|
||
if self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_save_path.text() != "":
|
||
if Path(self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_save_path.text()).is_dir() == False:
|
||
Path(self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_save_path.text()).mkdir(parents=True, exist_ok=True)
|
||
info("Save Path is not Exist, Made it as a New Directory.")
|
||
self.textBrowser_update("提示:检测到保存路径所指向的文件夹不存在,已创建相应文件夹")
|
||
reply = QMessageBox.question(self, "警告:确认操作", f"你确定要将裁剪结果保存到{self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_save_path.text()}?", QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
|
||
if reply == QMessageBox.Yes:
|
||
for idx in range(len(self.ecg_seq)):
|
||
DataFrame(self.ecg_seq[idx].reshape(-1)).to_csv(str(Path(self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_save_path.text()) / f"{idx + 1}ecg.txt"), index=False, header=None)
|
||
DataFrame(self.R_peak_seq[idx].reshape(-1)).to_csv(str(Path(self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_save_path.text()) / f"{idx + 1}Rpeak.txt"), index=False, header=None)
|
||
info(f"Saved Data {idx + 1} to Directory {self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_save_path.text()}.")
|
||
self.textBrowser_update(f"提示:保存片段{idx + 1}成功至文件夹{self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_save_path.text()}")
|
||
self.msgBox.setText(f"保存成功至{self.ui_detect_Rpeaks.lineEdit_detect_Rpeaks_save_path.text()}")
|
||
self.msgBox.setIcon(QMessageBox.Information)
|
||
self.msgBox.exec()
|
||
else:
|
||
self.textBrowser_update(f"提示:保存操作取消")
|
||
else:
|
||
self.textBrowser_update("错误:保存路径输入有误,请检查后重新执行保存")
|
||
self.msgBox.setText("保存路径输入有误,请检查后重新执行保存")
|
||
self.msgBox.setIcon(QMessageBox.Information)
|
||
self.msgBox.exec()
|
||
else:
|
||
error(f"data is None.")
|
||
self.textBrowser_update("错误:需要保存的数据不存在")
|
||
self.msgBox.setText("需要保存的数据不存在")
|
||
self.msgBox.setIcon(QMessageBox.Critical)
|
||
self.msgBox.exec()
|
||
|
||
def slot_tableWidget_detect_Rpeaks_signal_parts_list_on_cell_double_clicked(self, row, column):
|
||
self.temp = int(self.ui_detect_Rpeaks.tableWidget_detect_Rpeaks_signal_parts_list.item(row, column).text()) - 1
|
||
self.detect_Rpeaks_update_plot(self.temp)
|
||
info(f"Finished Data Part {self.temp + 1} Plot.")
|
||
self.textBrowser_update(f"提示:完成绘制信号第{self.temp + 1}段")
|
||
|
||
def slot_btn_detect_Rpeaks_left(self):
|
||
if self.temp <= 0:
|
||
self.msgBox.setText("你正在查看第1段信号")
|
||
self.msgBox.setIcon(QMessageBox.Warning)
|
||
self.msgBox.exec()
|
||
else:
|
||
self.temp -= 1
|
||
self.detect_Rpeaks_update_plot(self.temp)
|
||
info(f"Finished Data Part {self.temp + 1} Plot.")
|
||
self.textBrowser_update(f"提示:完成绘制信号第{self.temp + 1}段")
|
||
|
||
def slot_btn_detect_Rpeaks_right(self):
|
||
if self.temp >= int(self.ui_detect_Rpeaks.tableWidget_detect_Rpeaks_signal_parts_list.item(self.ui_detect_Rpeaks.tableWidget_detect_Rpeaks_signal_parts_list.rowCount() - 1, 0).text()) - 1:
|
||
self.msgBox.setText("你正在查看最后1段信号")
|
||
self.msgBox.setIcon(QMessageBox.Warning)
|
||
self.msgBox.exec()
|
||
else:
|
||
self.temp += 1
|
||
self.detect_Rpeaks_update_plot(self.temp)
|
||
info(f"Finished Data Part {self.temp + 1} Plot.")
|
||
self.textBrowser_update(f"提示:完成绘制信号第{self.temp + 1}段")
|
||
|
||
def detect_Rpeaks_update_plot(self, part_index):
|
||
self.ax0.remove()
|
||
self.ax0 = self.figure.add_subplot(self.gs[0])
|
||
self.ax0 = plt.gca()
|
||
self.ax0.grid(True)
|
||
self.ax0.tick_params(axis='x', colors='white')
|
||
self.ax0.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}"))
|
||
self.ax1.remove()
|
||
self.ax1 = self.figure.add_subplot(self.gs[1], sharex=self.ax0)
|
||
self.ax1 = plt.gca()
|
||
self.ax1.grid(True)
|
||
self.ax1.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}"))
|
||
self.ax0.plot(self.R_peak_seq[part_index][2:], self.RRIV_seq[part_index], 'r.', label="RRIV")
|
||
self.ax0.legend(loc='upper right')
|
||
self.ax1.plot(self.ecg_seq[part_index], 'r', label="ECG")
|
||
self.ax1.plot(self.R_peak_seq[part_index], self.ecg_seq[part_index][self.R_peak_seq[part_index]], 'b*', label="R_peaks")
|
||
self.ax1.plot(self.Interval_seq[part_index], 'g', label="Interval")
|
||
self.ax1.legend(loc='upper right')
|
||
self.canvas.draw()
|
||
|
||
def slot_btn_detect_Jpeaks(self):
|
||
DSbcg_sig_1000hz3_path = self.root_path / "DSbcg_sig_1000hz3.txt"
|
||
if not DSbcg_sig_1000hz3_path.exists():
|
||
error("Can't Find DSbcg_sig_1000hz3.txt.")
|
||
self.textBrowser_update("错误:无法找到DSbcg_sig_1000hz3.txt,无法执行<J峰提取>,请检查文件是否存在")
|
||
self.msgBox.setText("无法找到DSbcg_sig_1000hz3.txt,无法执行<J峰提取>,请检查文件是否存在")
|
||
self.msgBox.setIcon(QMessageBox.Critical)
|
||
self.msgBox.exec()
|
||
return
|
||
info("Found DSbcg_sig_1000hz3.txt.")
|
||
|
||
self.widget_func.setParent(None)
|
||
self.verticalLayout_menu.removeWidget(self.widget_func)
|
||
self.verticalLayout_menu.addWidget(self.widget_detect_Jpeaks)
|
||
|
||
# 画框子图初始化
|
||
self.gs = gridspec.GridSpec(1, 1, height_ratios=[1])
|
||
self.figure.subplots_adjust(top=1, bottom=0, right=1, left=0, hspace=0, wspace=0)
|
||
plt.margins(0, 0)
|
||
plt.tight_layout()
|
||
plt.xticks([])
|
||
plt.yticks([])
|
||
self.ax0 = self.figure.add_subplot(self.gs[0])
|
||
self.ax0 = plt.gca()
|
||
self.ax0.grid(True)
|
||
self.ax0.tick_params(axis='x', colors='white')
|
||
self.ax0.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}"))
|
||
|
||
self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_DSbcg_sig_1000hz3_path.setText(str(DSbcg_sig_1000hz3_path))
|
||
self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_save_path.setText(str(Path(self.root_path) / "Jpeak.txt"))
|
||
self.textBrowser_update("提示:找到DSbcg_sig_1000hz3.txt")
|
||
self.data1 = np.array(read_csv(DSbcg_sig_1000hz3_path, header=None)).reshape(-1)
|
||
|
||
def slot_btn_detect_Jpeaks_view(self):
|
||
if self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_sampling_rate.text() != "" and self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_peaks_value.text() != "" and self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_bandpass_low.text() != "" and self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_bandpass_high.text() != "" and self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_amp_value.text() != "" and self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_interval_low.text() != "" and self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_interval_high.text() != "":
|
||
self.ax0.remove()
|
||
self.ax0 = self.figure.add_subplot(self.gs[0])
|
||
self.ax0 = plt.gca()
|
||
self.ax0.grid(True)
|
||
self.ax0.tick_params(axis='x', colors='white')
|
||
self.ax0.xaxis.set_major_formatter(FuncFormatter(lambda x, p: f"{x:.0f}"))
|
||
bcg_data = self.data1
|
||
if self.ui_detect_Jpeaks.radioButton_Fivelayer_Unet_1.isChecked():
|
||
detector_method = 'Fivelayer_Unet_1'
|
||
elif self.ui_detect_Jpeaks.radioButton_Fivelayer_Unet_2.isChecked():
|
||
detector_method = 'Fivelayer_Unet_2'
|
||
elif self.ui_detect_Jpeaks.radioButton_Fivelayer_Lstm_Unet_1.isChecked():
|
||
detector_method = 'Fivelayer_Lstm_Unet_1'
|
||
elif self.ui_detect_Jpeaks.radioButton_Fivelayer_Lstm_Unet_2.isChecked():
|
||
detector_method = 'Fivelayer_Lstm_Unet_2'
|
||
else:
|
||
detector_method = 'Fivelayer_Unet_1'
|
||
|
||
if self.ui_detect_Jpeaks.checkBox_useCPU.isChecked() == True:
|
||
useCPU = True
|
||
else:
|
||
useCPU = False
|
||
|
||
self.data2, self.data3, self.data4 = detect_Jpeak.Jpeak_Detection(bcg_data, detector_method, int(self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_sampling_rate.text()), int(self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_bandpass_low.text()), int(self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_bandpass_high.text()), int(self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_interval_high.text()), int(self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_interval_low.text()), int(self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_peaks_value.text()), int(self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_amp_value.text()), useCPU)
|
||
|
||
info(f"Data Length: {len(bcg_data)}")
|
||
self.textBrowser_update(f"数据长度:{len(bcg_data)}")
|
||
info(f"Data Duration: {len(bcg_data) / int(self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_sampling_rate.text()) / 60} min")
|
||
self.textBrowser_update(f"数据时长:{len(bcg_data) / int(self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_sampling_rate.text()) / 60} 分钟")
|
||
info(f"Quantity of J_peaks: {len(self.data3)}")
|
||
self.textBrowser_update(f"J峰个数:{len(self.data3)}")
|
||
|
||
self.ax0.plot(self.data2, 'b', label="BCG_filtered")
|
||
self.ax0.plot(self.data3, self.data2[self.data3], 'r.', label="Predict_J_peaks")
|
||
self.ax0.plot(self.data4, 'orange', label="Interval")
|
||
self.ax0.legend(loc='upper right')
|
||
self.canvas.draw()
|
||
info("Finished J peaks Detect and Data Plot.")
|
||
self.textBrowser_update("提示:完成J峰提取并绘制信号")
|
||
else:
|
||
error(f"Miss Args for detect_Jpeaks.")
|
||
self.textBrowser_update("错误:参数输入存在空值")
|
||
self.msgBox.setText("参数输入存在空值")
|
||
self.msgBox.setIcon(QMessageBox.Critical)
|
||
self.msgBox.exec()
|
||
|
||
def slot_btn_detect_Jpeaks_save(self):
|
||
if self.data2 is not None and self.data3 is not None and self.data4 is not None:
|
||
if self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_save_path.text() != "" and self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_save_path.text().endswith(".txt"):
|
||
reply = QMessageBox.question(self, "警告:确认操作", f"你确定要将预测结果保存到{self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_save_path.text()}?", QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
|
||
if reply == QMessageBox.Yes:
|
||
DataFrame(self.data3.reshape(-1)).to_csv(self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_save_path.text(), index=False, header=None)
|
||
info(f"Saved Data to Directory {self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_save_path.text()}.")
|
||
self.textBrowser_update(f"提示:保存数据成功至文件夹{self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_save_path.text()}")
|
||
self.msgBox.setText(f"保存成功至{self.ui_detect_Jpeaks.lineEdit_detect_Jpeaks_save_path.text()}")
|
||
self.msgBox.setIcon(QMessageBox.Information)
|
||
self.msgBox.exec()
|
||
else:
|
||
self.textBrowser_update(f"提示:保存操作取消")
|
||
else:
|
||
self.textBrowser_update("错误:保存路径输入有误,请检查后重新执行保存")
|
||
self.msgBox.setText("保存路径输入有误,请检查后重新执行保存")
|
||
self.msgBox.setIcon(QMessageBox.Information)
|
||
self.msgBox.exec()
|
||
else:
|
||
error(f"data is None.")
|
||
self.textBrowser_update("错误:需要保存的数据不存在")
|
||
self.msgBox.setText("需要保存的数据不存在")
|
||
self.msgBox.setIcon(QMessageBox.Critical)
|
||
self.msgBox.exec()
|
||
|
||
def slot_btn_backToMenu(self):
|
||
self.data1 = None
|
||
self.data2 = None
|
||
self.data3 = None
|
||
self.data4 = None
|
||
self.ecg_seq = None
|
||
self.R_peak_seq = None
|
||
self.Interval_seq = None
|
||
self.RRIV_seq = None
|
||
self.temp = None
|
||
self.ui_detect_Rpeaks.tableWidget_detect_Rpeaks_signal_parts_list.clearContents()
|
||
self.ui_detect_Rpeaks.groupBox_detect_Rpeaks_signal_parts_list.setEnabled(False)
|
||
for i in range(self.verticalLayout_menu.count()):
|
||
item = self.verticalLayout_menu.itemAt(i)
|
||
if item.widget():
|
||
widget = item.widget()
|
||
if widget.objectName() == "widget_resample1000Hz" or widget.objectName() == "widget_detect_Rpeaks" or widget.objectName() == "widget_detect_Jpeaks":
|
||
widget.setParent(None)
|
||
self.verticalLayout_menu.removeWidget(widget)
|
||
self.verticalLayout_menu.addWidget(self.widget_func)
|
||
self.figure.clf()
|
||
|
||
def textBrowser_update(self, message):
|
||
self.textBrowser_infoOutput.append(str(datetime.now().strftime("%H:%M:%S")) + ": " + message)
|
||
self.textBrowser_infoOutput.verticalScrollBar().setValue(self.textBrowser_infoOutput.verticalScrollBar().maximum())
|
||
|
||
# 主函数
|
||
if __name__ == '__main__':
|
||
app = QApplication(sys.argv)
|
||
mainWindow = MainWindow()
|
||
mainWindow.show()
|
||
sys.exit(app.exec_()) |