SA_Label/Main_Quality_Relabel_GUI.py
2025-01-04 17:33:28 +08:00

1108 lines
58 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
@author:Yosa, Marques
@file:Main_Quality_Relabel_GUI.py
@email:2023025086@m.scnu.edu.cn, 2021022362@m.scnu.edu.cn
@time:2025/1/4
"""
import os
import sys
import logging
import time
import numpy as np
import pandas as pd
import matplotlib
import pyedflib
import traceback
from datetime import datetime
from tqdm import tqdm
from pathlib import Path
from PyQt5 import QtCore, QtWidgets
from PyQt5.QtWidgets import QFileDialog, QMainWindow, QMessageBox, QButtonGroup
from matplotlib import pyplot as plt, gridspec
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg, NavigationToolbar2QT
from MainWindow import Ui_MainWindow
from utils.Preprocessing import BCG_Operation
matplotlib.use("Qt5Agg") # 声明使用QT5
plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
# ['EEG F3-A2', 'EEG F4-A1', 'EEG C3-A2', 'EEG C4-A1', 'EEG O1-A2',
# 'EEG O2-A1', 'EOG Right', 'EOG Left', 'EMG Chin', 'ECG I', 'RR',
# 'ECG II', 'Effort Tho', 'Flow Patient', 'Flow Patient', 'Effort Abd',
# 'SpO2', 'Pleth', 'Snore', 'Body', 'Pulse', 'Leg LEG1', 'Leg LEG2',
# 'EEG A1-A2', 'Imp']
# 设置日志
logger = logging.getLogger()
logger.setLevel(logging.NOTSET)
realtime = time.strftime('%Y%m%d', time.localtime(time.time()))
fh = logging.FileHandler(Path("history") / (realtime + ".log"), mode='a')
fh.setLevel(logging.NOTSET)
fh.setFormatter(logging.Formatter("%(asctime)s: %(message)s"))
logger.addHandler(fh)
ch = logging.StreamHandler()
ch.setLevel(logging.NOTSET)
ch.setFormatter(logging.Formatter("%(asctime)s: %(message)s"))
logger.addHandler(ch)
logging.getLogger('matplotlib.font_manager').disabled = True
logging.info("------------------------------------")
class MainWindow(QMainWindow, Ui_MainWindow):
# 可选择的通道
base_channel = ['EEG F3-A2', 'EEG F4-A1', 'EEG C3-A2', 'EEG C4-A1', 'EEG O1-A2', 'EEG O2-A1', 'EOG Right',
'EOG Left', 'EMG Chin', 'ECG I', 'RR', 'ECG II', 'Effort Tho', 'Flow Patient', 'Flow Patient', 'HR',
'Effort Abd', 'SpO2', 'Pleth', 'Snore', 'Body', 'Pulse', 'Leg LEG1', 'Leg LEG2', 'EEG A1-A2', 'Imp']
# 显示事件
base_event = ["Hypopnea", "Central apnea", "Obstructive apnea", "Mixed apnea", "Desaturation"]
# 设定事件和其对应颜色
# event_code color event
# 0 黑色 背景
# 1 粉色 低通气
# 2 蓝色 中枢性
# 3 红色 阻塞型
# 4 灰色 混合型
# 5 绿色 血氧饱和度下降
# 6 橙色 大体动
# 7 橙色 小体动
# 8 橙色 深呼吸
# 9 橙色 脉冲体动
# 10 橙色 无效片段
color_cycle = ["black", "pink", "blue", "red", "silver", "green", "orange", "orange", "orange", "orange", "orange"]
# 程序初始化操作
def __init__(self):
super(MainWindow, self).__init__()
self.setupUi(self)
self.figure = plt.figure(figsize=(12, 6), dpi=150)
self.canvas = FigureCanvasQTAgg(self.figure)
self.figToolbar = CustomNavigationToolbar(self.canvas, self, self.slot_btn_resetOriginalView)
self.verticalLayout_canvas.addWidget(self.canvas)
self.verticalLayout_canvas.addWidget(self.figToolbar)
self.msgBox = QMessageBox()
self.msgBox.setWindowTitle("消息")
self.pushButton_left.clicked.connect(self.slot_btn_left)
self.pushButton_right.clicked.connect(self.slot_btn_right)
self.pushButton_confirmLabel.clicked.connect(self.slot_btn_run)
self.pushButton_confirmLabel.setText("请选择数据路径")
self.pushButton_previous10s.setEnabled(False)
self.pushButton_previous30s.setEnabled(False)
self.pushButton_previous60s.setEnabled(False)
self.pushButton_next10s.setEnabled(False)
self.pushButton_next30s.setEnabled(False)
self.pushButton_next60s.setEnabled(False)
self.time_move_count = int(0)
self.pushButton_previous10s.clicked.connect(self.slot_btn_previous10s)
self.pushButton_previous30s.clicked.connect(self.slot_btn_previous30s)
self.pushButton_previous60s.clicked.connect(self.slot_btn_previous60s)
self.pushButton_next10s.clicked.connect(self.slot_btn_next10s)
self.pushButton_next30s.clicked.connect(self.slot_btn_next30s)
self.pushButton_next60s.clicked.connect(self.slot_btn_next60s)
self.action_selectPath.triggered.connect(self.slot_btn_selectPath)
self.pushButton_left.setEnabled(False)
self.pushButton_right.setEnabled(False)
self.pushButton_confirmLabel.setEnabled(False)
self.pushButton_quick_remark_input_waitingForTalk.setEnabled(False)
self.pushButton_quick_remark_input_waitingForTalk.clicked.connect(self.slot_btn_quick_remark_input_waitingForTalk)
self.checkBox_examineBySecond.setEnabled(False)
self.checkBox_examineBySecond.clicked.connect(self.enable_checkBox_examineBySecond)
self.lineEdit_start_bcg_index.setEnabled(False)
self.lineEdit_frequency.setEnabled(False)
self.lineEdit_bcg_frequency.setEnabled(False)
self.lineEdit_front_add_second.setEnabled(False)
self.lineEdit_back_add_second.setEnabled(False)
self.radioButton_1_class.setEnabled(False)
self.radioButton_2_class.setEnabled(False)
self.radioButton_3_class.setEnabled(False)
self.buttonGroup_1 = QButtonGroup()
self.buttonGroup_1.addButton(self.radioButton_1_class)
self.buttonGroup_1.addButton(self.radioButton_2_class)
self.buttonGroup_1.addButton(self.radioButton_3_class)
self.lineEdit_remark.setEnabled(False)
self.lineEdit_correctStart.setEnabled(False)
self.lineEdit_correctEnd.setEnabled(False)
self.comboBox_sampID.setEnabled(False)
self.checkBox_OSA.setEnabled(False)
self.checkBox_CSA.setEnabled(False)
self.checkBox_MSA.setEnabled(False)
self.checkBox_HPY.setEnabled(False)
self.checkBox_examineLabeled.setEnabled(False)
self.checkBox_examineLabeled.clicked.connect(self.enable_checkBox_examineLabeled)
self.radioButton_OSA.setEnabled(False)
self.radioButton_CSA.setEnabled(False)
self.radioButton_MSA.setEnabled(False)
self.radioButton_HPY.setEnabled(False)
self.buttonGroup_2 = QButtonGroup()
self.buttonGroup_2.addButton(self.radioButton_OSA)
self.buttonGroup_2.addButton(self.radioButton_CSA)
self.buttonGroup_2.addButton(self.radioButton_MSA)
self.buttonGroup_2.addButton(self.radioButton_HPY)
self.thread_textbrowserUpdate = Thread_textbrowserUpdate()
self.thread_textbrowserUpdate.trigger.connect(self.textBrowser_update)
self.textBrowser_update("提示:请点击左上角“打开”菜单来开始任务")
# 选择路径按钮的槽函数
def slot_btn_selectPath(self):
fileDialog = QFileDialog()
fileDialog.setFileMode(QFileDialog.Directory)
fileDialog.setOption(QFileDialog.ShowDirsOnly, True)
if fileDialog.exec_() == QFileDialog.Accepted:
self.dir_path = fileDialog.selectedFiles()[0]
if self.dir_path:
logging.info("Loading Data Path...")
self.PSG_Data_Path = Path(os.path.join(self.dir_path, "PSG"))
self.PSG_Label_Path = Path(os.path.join(self.dir_path, "PSG_label"))
self.BCG_Data_Path = Path(os.path.join(self.dir_path, "BCG"))
self.BCG_Label_Path = Path(os.path.join(self.dir_path, "BCG_label"))
self.Artifact_Label_Path = Path(os.path.join(self.dir_path, "Artifact_label"))
self.Artifact_Offset_Path = Path(os.path.join(self.dir_path, "Artifact_label", "20220421Artifact_offset_value.xlsx"))
if self.PSG_Data_Path.exists() and self.PSG_Label_Path.exists() and self.BCG_Data_Path.exists() and self.BCG_Label_Path.exists() and self.Artifact_Label_Path.exists() and self.Artifact_Label_Path.exists():
sampIDs = os.listdir(self.BCG_Data_Path)
sampID_for_comboBox = []
for sampID in sampIDs:
sampID = sampID.replace("samp.npy", "")
sampID_for_comboBox.append(sampID)
bcg_path = self.BCG_Data_Path / f"{sampID}samp.npy"
ecg_path = self.PSG_Data_Path / f"A{str(sampID).rjust(7, '0')}.edf"
bcg_label_path = self.BCG_Label_Path / f"export{sampID}_all.csv"
ecg_label_path = self.PSG_Label_Path / f"export{sampID}.csv"
if not bcg_path.exists():
logging.error(f"Can't find {bcg_path}!")
self.msgBox.setText(f"找不到数据{bcg_path}")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
return
if not ecg_path.exists():
logging.error(f"Can't find {ecg_path}!")
self.msgBox.setText(f"找不到数据{ecg_path}")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
return
if not bcg_label_path.exists():
logging.error(f"Can't find {bcg_label_path}!")
self.msgBox.setText(f"找不到数据{bcg_label_path}")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
return
if not ecg_label_path.exists():
logging.error(f"Can't find {ecg_label_path}!")
self.msgBox.setText(f"找不到数据{ecg_label_path}")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
return
self.comboBox_sampID.setEnabled(True)
self.lineEdit_start_bcg_index.setEnabled(True)
self.comboBox_sampID.addItems(sampID_for_comboBox)
self.lineEdit_start_bcg_index.setEnabled(True)
self.lineEdit_frequency.setEnabled(True)
self.lineEdit_bcg_frequency.setEnabled(True)
self.lineEdit_front_add_second.setEnabled(True)
self.lineEdit_back_add_second.setEnabled(True)
self.checkBox_OSA.setEnabled(True)
self.checkBox_CSA.setEnabled(True)
self.checkBox_MSA.setEnabled(True)
self.checkBox_HPY.setEnabled(True)
self.action_selectPath.setEnabled(False)
self.pushButton_confirmLabel.setEnabled(True)
self.pushButton_confirmLabel.setText("开始打标")
MainWindow.setWindowTitle(self, QtCore.QCoreApplication.translate("MainWindow", "Main_Quality_Relabel_GUI - Data Path: " + self.dir_path))
logging.info("Successfully Loaded Data Path.")
self.textBrowser_update("操作:数据路径选择成功")
else:
logging.info("Failed to Load Data Path.")
self.textBrowser_update("操作:数据路径选择错误,缺乏必要数据文件夹")
self.msgBox.setText("数据路径选择错误,缺乏必要数据文件夹")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
else:
logging.info("Data Path not Exist...")
self.textBrowser_update("操作:载入存档错误,存档不存在")
self.msgBox.setText("载入存档错误,存档不存在")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
else:
logging.info("Canceled Loading Data Path.")
self.textBrowser_update("提示:数据路径选择取消")
self.msgBox.setText("数据路径选择取消")
self.msgBox.setIcon(QMessageBox.Warning)
self.msgBox.exec()
# 开始打标按钮的槽函数
def slot_btn_run(self):
self.init_variable()
self.check_channel()
self.read_data()
self.read_event()
self.read_artifact_label()
if self.plotEventIndex >= len(self.bcg_event_label_index_list) or self.plotEventIndex < 0:
self.msgBox.setText("输入起始心晓事件序列过大或过小导致错误")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
else:
self.pushButton_left.setEnabled(False)
self.pushButton_right.setEnabled(True)
self.pushButton_confirmLabel.setText("确定打标参数(S)")
self.pushButton_confirmLabel.setShortcut(QtCore.QCoreApplication.translate("MainWindow", "S"))
self.pushButton_confirmLabel.clicked.disconnect(self.slot_btn_run)
self.pushButton_confirmLabel.clicked.connect(self.slot_btn_confirmLabel)
self.pushButton_quick_remark_input_waitingForTalk.setEnabled(True)
self.comboBox_sampID.setEnabled(False)
self.lineEdit_start_bcg_index.setEnabled(False)
self.lineEdit_frequency.setEnabled(False)
self.lineEdit_bcg_frequency.setEnabled(False)
self.lineEdit_front_add_second.setEnabled(False)
self.lineEdit_back_add_second.setEnabled(False)
self.checkBox_OSA.setEnabled(False)
self.checkBox_CSA.setEnabled(False)
self.checkBox_MSA.setEnabled(False)
self.checkBox_HPY.setEnabled(False)
self.radioButton_1_class.setEnabled(True)
self.radioButton_2_class.setEnabled(True)
self.radioButton_3_class.setEnabled(True)
self.checkBox_examineLabeled.setEnabled(True)
self.checkBox_examineBySecond.setEnabled(True)
self.radioButton_OSA.setEnabled(True)
self.radioButton_CSA.setEnabled(True)
self.radioButton_MSA.setEnabled(True)
self.radioButton_HPY.setEnabled(True)
self.lineEdit_remark.setEnabled(True)
self.lineEdit_correctStart.setEnabled(True)
self.lineEdit_correctEnd.setEnabled(True)
self.pd = pd.read_csv(Path(os.path.join(self.BCG_Label_Path, f"export{self.comboBox_sampID.currentText()}_all.csv")), encoding="gbk")
csv_headers = self.pd.columns.tolist()
if not (("score" in csv_headers) and ("remark" in csv_headers) and ("correct_Start" in csv_headers) and ("correct_End") in csv_headers and ("isLabeled" in csv_headers)):
self.pd["score"] = "-1"
self.pd["remark"] = ""
self.pd["correct_Start"] = "-1"
self.pd["correct_End"] = "-1"
self.pd["correct_EventsType"] = ""
self.pd["isLabeled"] = "-1"
self.pd["score"] = self.pd["score"].astype(int)
self.pd["remark"] = self.pd["remark"].astype(str)
self.pd["correct_Start"] = self.pd["correct_Start"].astype(int)
self.pd["correct_End"] = self.pd["correct_End"].astype(int)
self.pd["correct_EventsType"] = self.pd["correct_EventsType"].astype(str)
self.pd["isLabeled"] = self.pd["isLabeled"].astype(int)
self.change_radioButton_events(self.bcg_event_label_index_list[self.plotEventIndex])
if not Path(os.path.join(self.BCG_Label_Path, f"export{self.comboBox_sampID.currentText()}_addNew.csv")).exists():
self.pd_examineBySecond = pd.DataFrame(columns=self.pd.columns)
self.pd_examineBySecond["score"] = self.pd_examineBySecond["score"].astype(int)
self.pd_examineBySecond["remark"] = self.pd_examineBySecond["remark"].astype(str)
self.pd_examineBySecond["correct_Start"] = self.pd_examineBySecond["correct_Start"].astype(int)
self.pd_examineBySecond["correct_End"] = self.pd_examineBySecond["correct_End"].astype(int)
self.pd_examineBySecond["correct_EventsType"] = self.pd_examineBySecond["correct_EventsType"].astype(str)
self.pd_examineBySecond["isLabeled"] = self.pd_examineBySecond["isLabeled"].astype(int)
self.pd_examineBySecond.to_csv(Path(os.path.join(self.BCG_Label_Path, f"export{self.comboBox_sampID.currentText()}_addNew.csv")), index=False, encoding="gbk")
else:
self.pd_examineBySecond = pd.read_csv(Path(os.path.join(self.BCG_Label_Path, f"export{self.comboBox_sampID.currentText()}_addNew.csv")), encoding="gbk")
if (self.pd["isLabeled"] == 1).all():
self.checkBox_examineLabeled.setChecked(False)
self.msgBox.setText("该份数据打标已全部完成")
self.msgBox.setIcon(QMessageBox.Information)
self.msgBox.exec()
self.pd.to_csv(Path(os.path.join(self.BCG_Label_Path, f"export{self.comboBox_sampID.currentText()}_all.csv")), mode='w', index=None, encoding="gbk")
self.show_one_event(self.plotEventIndex, self.plotEventIndex, front_add_second=self.front_add_second, back_add_second=self.back_add_second)
if not str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "remark"]) == "nan":
self.lineEdit_remark.setText(str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "remark"]))
else:
self.lineEdit_remark.setText("")
if str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"]) == "1":
self.radioButton_1_class.setChecked(True)
elif str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"]) == "2":
self.radioButton_2_class.setChecked(True)
elif str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"]) == "3":
self.radioButton_3_class.setChecked(True)
else:
self.radioButton_2_class.setChecked(True)
self.textBrowser_update("操作:开始打标")
# 上一个事件按钮的槽函数
def slot_btn_left(self):
self.figure.clear()
self.plotEventIndex = self.plotEventIndex - 1
if self.checkBox_examineLabeled.isChecked() == True:
while self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "isLabeled"] == 1 and self.plotEventIndex > self.start_bcg_index:
self.plotEventIndex = self.plotEventIndex - 1
self.show_one_event(self.plotEventIndex, self.plotEventIndex, front_add_second=self.front_add_second,
back_add_second=self.back_add_second)
if not str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "remark"]) == "nan":
self.lineEdit_remark.setText(str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "remark"]))
else:
self.lineEdit_remark.setText("")
self.change_radioButton_events(self.bcg_event_label_index_list[self.plotEventIndex])
if str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"]) == "1":
self.radioButton_1_class.setChecked(True)
elif str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"]) == "2":
self.radioButton_2_class.setChecked(True)
elif str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"]) == "3":
self.radioButton_3_class.setChecked(True)
else:
self.radioButton_2_class.setChecked(True)
self.textBrowser_update("操作↑上一个事件index" + str(self.plotEventIndex + 1))
if self.plotEventIndex <= self.start_bcg_index:
self.pushButton_left.setEnabled(False)
self.pushButton_right.setEnabled(True)
self.msgBox.setText("你正在查看第一个事件")
self.msgBox.setIcon(QMessageBox.Information)
self.msgBox.exec()
else:
self.pushButton_left.setEnabled(True)
self.pushButton_right.setEnabled(True)
# 下一个事件按钮的槽函数
def slot_btn_right(self):
self.figure.clear()
self.plotEventIndex = self.plotEventIndex + 1
if self.checkBox_examineLabeled.isChecked() == True:
while self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "isLabeled"] == 1 and self.plotEventIndex < len(self.bcg_event_label_filtered_df) - 1:
self.plotEventIndex = self.plotEventIndex + 1
self.show_one_event(self.plotEventIndex, self.plotEventIndex, front_add_second=self.front_add_second,
back_add_second=self.back_add_second)
if not str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "remark"]) == "nan":
self.lineEdit_remark.setText(str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "remark"]))
else:
self.lineEdit_remark.setText("")
self.change_radioButton_events(self.bcg_event_label_index_list[self.plotEventIndex])
if str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"]) == "1":
self.radioButton_1_class.setChecked(True)
elif str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"]) == "2":
self.radioButton_2_class.setChecked(True)
elif str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"]) == "3":
self.radioButton_3_class.setChecked(True)
else:
self.radioButton_2_class.setChecked(True)
self.textBrowser_update("操作↓下一个事件index" + str(self.plotEventIndex + 1))
if self.plotEventIndex >= len(self.bcg_event_label_filtered_df) - 1:
self.pushButton_left.setEnabled(True)
self.pushButton_right.setEnabled(False)
self.msgBox.setText("你正在查看最后一个事件")
self.msgBox.setIcon(QMessageBox.Information)
self.msgBox.exec()
else:
self.pushButton_left.setEnabled(True)
self.pushButton_right.setEnabled(True)
# 确定打标参数按钮的槽函数
def slot_btn_confirmLabel(self):
if self.checkBox_examineBySecond.isChecked() == True:
if int(self.lineEdit_correctStart.text()) < int(self.lineEdit_correctEnd.text()):
if self.radioButton_1_class.isChecked() == True:
score = int(1)
elif self.radioButton_2_class.isChecked() == True:
score = int(2)
elif self.radioButton_3_class.isChecked() == True:
score = int(3)
remark = self.lineEdit_remark.text()
correct_Start = int(self.lineEdit_correctStart.text())
correct_End = int(self.lineEdit_correctEnd.text())
if self.radioButton_OSA.isChecked() == True:
correct_EventsType = "Obstructive apnea"
elif self.radioButton_CSA.isChecked() == True:
correct_EventsType = "Central apnea"
elif self.radioButton_MSA.isChecked() == True:
correct_EventsType = "Mixed apnea"
elif self.radioButton_HPY.isChecked() == True:
correct_EventsType = "Hypopnea"
isLabeled = int(1)
self.pd_examineBySecond = self.pd_add_new_row(self.pd_examineBySecond, score, remark, correct_Start, correct_End, correct_EventsType, isLabeled)
self.pd_examineBySecond.to_csv(Path(os.path.join(self.BCG_Label_Path, f"export{self.comboBox_sampID.currentText()}_addNew.csv")), mode='w', index=None, encoding="gbk")
self.textBrowser_update("操作保存新事件打标结果到csv。" + "score:" + str(score) + "correct_Start:" + str(correct_Start) + "correct_End:" + str(correct_End) + "correct_EventsType:" + str(correct_EventsType))
else:
self.msgBox.setText("起始时间和终止时间输入错误")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
elif self.checkBox_examineBySecond.isChecked() == False:
if int(self.lineEdit_correctStart.text()) < int(self.lineEdit_correctEnd.text()):
if self.radioButton_1_class.isChecked() == True:
self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"] = int(1)
elif self.radioButton_2_class.isChecked() == True:
self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"] = int(2)
elif self.radioButton_3_class.isChecked() == True:
self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"] = int(3)
self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "remark"] = self.lineEdit_remark.text()
self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_Start"] = int(self.lineEdit_correctStart.text())
self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_End"] = int(self.lineEdit_correctEnd.text())
if self.radioButton_OSA.isChecked() == True:
self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_EventsType"] = "Obstructive apnea"
elif self.radioButton_CSA.isChecked() == True:
self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_EventsType"] = "Central apnea"
elif self.radioButton_MSA.isChecked() == True:
self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_EventsType"] = "Mixed apnea"
elif self.radioButton_HPY.isChecked() == True:
self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_EventsType"] = "Hypopnea"
self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "isLabeled"] = int(1)
self.pd.to_csv(Path(os.path.join(self.BCG_Label_Path, f"export{self.comboBox_sampID.currentText()}_all.csv")), mode='w', index=None, encoding="gbk")
self.textBrowser_update("操作保存index" + str(self.plotEventIndex + 1) + "打标结果到csv。" + "score:" + str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"]) + "correct_Start:" + str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_Start"]) + "correct_End:" + str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_End"]) + "correct_EventsType:" + str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_EventsType"]))
if (self.pd.loc[self.bcg_event_label_index_list]["isLabeled"] == 1).all():
self.msgBox.setText("该份数据打标已全部完成")
self.msgBox.setIcon(QMessageBox.Information)
self.msgBox.exec()
else:
self.msgBox.setText("起始时间和终止时间输入错误")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
# 快速备注输入中待讨论按钮的槽函数
def slot_btn_quick_remark_input_waitingForTalk(self):
self.lineEdit_remark.setText("待讨论")
# -10s的槽函数
def slot_btn_previous10s(self):
self.figure.clear()
self.lineEdit_remark.setText("")
self.textBrowser_update("操作向前10秒")
self.radioButton_OSA.setChecked(True)
self.time_move_count = self.time_move_count - 10
self.show_new_event(self.plotEventIndex, self.plotEventIndex, front_add_second=self.front_add_second,
back_add_second=self.back_add_second, time_move_count = self.time_move_count)
# -30s的槽函数
def slot_btn_previous30s(self):
self.figure.clear()
self.lineEdit_remark.setText("")
self.textBrowser_update("操作向前30秒")
self.radioButton_OSA.setChecked(True)
self.time_move_count = self.time_move_count - 30
self.show_new_event(self.plotEventIndex, self.plotEventIndex, front_add_second=self.front_add_second,
back_add_second=self.back_add_second, time_move_count = self.time_move_count)
# -60s的槽函数
def slot_btn_previous60s(self):
self.figure.clear()
self.lineEdit_remark.setText("")
self.textBrowser_update("操作向前60秒")
self.radioButton_OSA.setChecked(True)
self.time_move_count = self.time_move_count - 60
self.show_new_event(self.plotEventIndex, self.plotEventIndex, front_add_second=self.front_add_second,
back_add_second=self.back_add_second, time_move_count = self.time_move_count)
# +10s的槽函数
def slot_btn_next10s(self):
self.figure.clear()
self.lineEdit_remark.setText("")
self.textBrowser_update("操作向后10秒")
self.radioButton_OSA.setChecked(True)
self.time_move_count = self.time_move_count + 10
self.show_new_event(self.plotEventIndex, self.plotEventIndex, front_add_second=self.front_add_second,
back_add_second=self.back_add_second, time_move_count = self.time_move_count)
# +30s的槽函数
def slot_btn_next30s(self):
self.figure.clear()
self.lineEdit_remark.setText("")
self.textBrowser_update("操作向后30秒")
self.radioButton_OSA.setChecked(True)
self.time_move_count = self.time_move_count + 30
self.show_new_event(self.plotEventIndex, self.plotEventIndex, front_add_second=self.front_add_second,
back_add_second=self.back_add_second, time_move_count = self.time_move_count)
# +30s的槽函数
def slot_btn_next60s(self):
self.figure.clear()
self.lineEdit_remark.setText("")
self.textBrowser_update("操作向后60秒")
self.radioButton_OSA.setChecked(True)
self.time_move_count = self.time_move_count + 60
self.show_new_event(self.plotEventIndex, self.plotEventIndex, front_add_second=self.front_add_second,
back_add_second=self.back_add_second, time_move_count = self.time_move_count)
# +60s的槽函数
def enable_checkBox_examineLabeled(self):
if self.checkBox_examineLabeled.isChecked():
if (self.pd.loc[self.bcg_event_label_index_list]["isLabeled"] == 1).all():
self.checkBox_examineLabeled.setChecked(False)
self.msgBox.setText("该份数据打标已全部完成")
self.msgBox.setIcon(QMessageBox.Information)
self.msgBox.exec()
# 点击了启用逐帧检查模式复选框的槽函数
def enable_checkBox_examineBySecond(self):
if self.checkBox_examineBySecond.isChecked() == True:
self.pushButton_left.setEnabled(False)
self.pushButton_right.setEnabled(False)
self.checkBox_examineLabeled.setEnabled(False)
self.radioButton_OSA.setChecked(True)
self.lineEdit_remark.setText("")
self.pushButton_previous10s.setEnabled(True)
self.pushButton_next10s.setEnabled(True)
self.pushButton_previous30s.setEnabled(True)
self.pushButton_next30s.setEnabled(True)
self.pushButton_previous60s.setEnabled(True)
self.pushButton_next60s.setEnabled(True)
self.radioButton_2_class.setChecked(True)
self.radioButton_3_class.setEnabled(False)
elif self.checkBox_examineBySecond.isChecked() == False:
self.checkBox_examineLabeled.setEnabled(True)
self.lineEdit_remark.setText("")
self.pushButton_previous10s.setEnabled(False)
self.pushButton_next10s.setEnabled(False)
self.pushButton_previous30s.setEnabled(False)
self.pushButton_next30s.setEnabled(False)
self.pushButton_previous60s.setEnabled(False)
self.pushButton_next60s.setEnabled(False)
self.radioButton_3_class.setEnabled(True)
if self.plotEventIndex <= self.start_bcg_index:
self.pushButton_left.setEnabled(False)
self.pushButton_right.setEnabled(True)
self.msgBox.setText("你正在查看第一个事件")
self.msgBox.setIcon(QMessageBox.Information)
self.msgBox.exec()
elif self.plotEventIndex >= len(self.bcg_event_label_filtered_df) - 1:
self.pushButton_left.setEnabled(True)
self.pushButton_right.setEnabled(False)
self.msgBox.setText("你正在查看最后一个事件")
self.msgBox.setIcon(QMessageBox.Information)
self.msgBox.exec()
else:
self.pushButton_left.setEnabled(True)
self.pushButton_right.setEnabled(True)
# matplot中更改x轴范围后执行的函数
def on_xlim_change(self, event_ax):
# 获取当前x轴范围
left, right = event_ax.get_xlim()
self.lineEdit_correctStart.setText(str(round(left)))
self.lineEdit_correctEnd.setText(str(round(right)))
# 点击了matplot中的HOME键后执行的槽函数
def slot_btn_resetOriginalView(self):
self.lineEdit_correctStart.setText(str(self.bcg_SP))
self.lineEdit_correctEnd.setText(str(self.bcg_EP))
# 用于更新信息输出中的textBrowser的函数
def textBrowser_update(self, message):
self.textBrowser_infoOutput.append(str(datetime.now().strftime("%H:%M:%S")) + ": " + message)
# 修改了事件类型单选框后执行的函数
def change_radioButton_events(self, index):
if self.pd.at[index, "Event type"] == "Obstructive apnea":
self.radioButton_OSA.setChecked(True)
elif self.pd.at[index, "Event type"] == "Central apnea":
self.radioButton_CSA.setChecked(True)
elif self.pd.at[index, "Event type"] == "Mixed apnea":
self.radioButton_MSA.setChecked(True)
elif self.pd.at[index, "Event type"] == "Hypopnea":
self.radioButton_HPY.setChecked(True)
# 点击开始打标按钮后的初始化变量函数
def init_variable(self):
self.sampNo = int(self.comboBox_sampID.currentText())
self.channel_list = ['Effort Tho', 'Effort Abd', 'SpO2', 'Flow Patient', 'Flow Patient']
self.focus_event_list = self.set_focus_event_list()
self.frequency = int(self.lineEdit_frequency.text())
self.bcg_frequency = int(self.lineEdit_bcg_frequency.text())
self.front_add_second = int(self.lineEdit_front_add_second.text())
self.back_add_second = int(self.lineEdit_back_add_second.text())
self.extend_second = int(self.front_add_second + self.back_add_second)
self.start_bcg_index = int(self.lineEdit_start_bcg_index.text())
self.plotEventIndex = self.start_bcg_index
self.ecg_start_time = None
# 用来显示颜色时按点匹配事件
self.ecg_event_label = None
self.bcg_event_label = None
self.spo2_event_label = None
self.artifact_event_label = None
# 仅包含关注暂停事件的列表
self.ecg_event_label_filtered_df = None
self.bcg_event_label_filtered_df = None
# 所有事件列表
self.ecg_event_label_df = None
self.bcg_event_label_df = None
# 各通道信号
self.signal_select = {}
def check_channel(self):
for i in self.channel_list:
if i not in self.base_channel:
logging.debug(f"{i} 不存在于常见通道名中")
print(f"常见通道名:{self.channel_list}")
def set_focus_event_list(self):
focus_event_list = []
if self.checkBox_HPY.isChecked() == True:
focus_event_list.append("Hypopnea")
if self.checkBox_CSA.isChecked() == True:
focus_event_list.append("Central apnea")
if self.checkBox_OSA.isChecked() == True:
focus_event_list.append("Obstructive apnea")
if self.checkBox_MSA.isChecked() == True:
focus_event_list.append("Mixed apnea")
return focus_event_list
def read_data(self):
bcg_path = self.BCG_Data_Path / f"{self.sampNo}samp.npy"
ecg_path = self.PSG_Data_Path / f"A{str(self.sampNo).rjust(7, '0')}.edf"
if not bcg_path.exists():
logging.error(f"Can't find {bcg_path} !")
raise FileNotFoundError(f"Can't find {bcg_path} !")
if not ecg_path.exists():
logging.error(f"Can't find {ecg_path} !")
raise FileNotFoundError(f"Can't find {ecg_path} !")
with pyedflib.EdfReader(str(ecg_path.resolve())) as file:
signal_num = file.signals_in_file
logging.debug(f"{self.sampNo} EDF file signal number is {signal_num}")
signal_label = file.getSignalLabels()
logging.debug(f"{self.sampNo} EDF file signal label : {signal_label}")
self.ecg_start_time = file.getStartdatetime()
# 根据PSG记录长度生成事件表
self.ecg_event_label = np.zeros(
int(file.getFileDuration()) * self.frequency + self.extend_second * self.frequency)
self.spo2_event_label = np.zeros(
int(file.getFileDuration()) * self.frequency + self.extend_second * self.frequency)
# 打印PSG信息
file.file_info_long()
# sub_index 用于区分两个flow patient
sub_index = 1
logging.info("Loading PSG signal...")
self.textBrowser_update("提示正在加载PSG信号")
for i, index in enumerate(signal_label):
# 仅加载选中的通道
if index in self.channel_list:
# 重命名flow patient通道
if index == 'Flow Patient':
if sub_index == 1:
index = "Flow T"
else:
index = "Flow P"
sub_index += 1
signal = file.readSignal(i)
sample_frequency = file.getSampleFrequency(i)
# 读取采样率 进行重采样
if sample_frequency < self.frequency:
signal = signal.repeat(self.frequency / sample_frequency)
elif sample_frequency > self.frequency:
signal = signal[::int(sample_frequency / self.frequency)]
self.signal_select[index] = signal
logging.info(f"Finished load PSG: {index}")
self.textBrowser_update("提示完成加载PSG信号")
# 加载心晓信号
logging.info("Loading XinXiao signal...")
self.textBrowser_update("提示:正在加载心晓信号")
signal = np.load(bcg_path)
preprocessing = BCG_Operation(sample_rate=self.bcg_frequency)
# 20Hz低通去噪
signal1 = preprocessing.Butterworth(signal, 'lowpass', low_cut=20, order=3)
# 0.7Hz 低通提取呼吸
signal2 = preprocessing.Butterworth(signal, 'lowpass', low_cut=0.7, order=3)
# 进行降采样
signal1 = signal1[::int(self.bcg_frequency / self.frequency)]
signal2 = signal2[::int(self.bcg_frequency / self.frequency)]
# 根据心晓事件长度生成事件记录
self.bcg_event_label = np.zeros(len(signal) + self.extend_second * self.frequency)
self.signal_select['orgdata'] = signal1
self.signal_select['0.7lowpass_resp'] = signal2
for signal_key in self.signal_select.keys():
self.signal_select[signal_key] = np.append(self.signal_select[signal_key],
self.signal_select[signal_key].mean().astype(int).repeat(
self.extend_second * self.frequency))
logging.info("Finished load XinXiao signal")
self.textBrowser_update("提示:完成加载心晓信号")
def read_event(self):
bcg_label_path = self.BCG_Label_Path / f"export{self.sampNo}_all.csv"
ecg_label_path = self.PSG_Label_Path / f"export{self.sampNo}.csv"
if not bcg_label_path.exists():
logging.error(f"Can't find {bcg_label_path} !")
raise FileNotFoundError(f"Can't find {bcg_label_path} !")
if not ecg_label_path.exists():
logging.error(f"Can't find {ecg_label_path} !")
raise FileNotFoundError(f"Can't find {ecg_label_path} !")
df = pd.read_csv(ecg_label_path, encoding="gbk")
self.ecg_event_label_df = df
# 过滤不关注的事件
df2 = df[df["Event type"].isin(self.focus_event_list)]
# 根据epoch进行排列方便索引
df2 = df2.sort_values(by='Epoch')
self.ecg_event_label_filtered_df = df2
logging.info("Traversaling PSG events...")
self.textBrowser_update("提示正在遍历PSG事件")
for one_data in tqdm(df.index, ncols=80):
one_data = df.loc[one_data]
# 通过开始时间推算事件起始点与结束点
event_start_time = datetime.strptime(one_data["Date"] + " " + one_data["Time"], '%Y/%m/%d %H:%M:%S')
SP = (event_start_time - self.ecg_start_time).seconds
# 对括号进行切分避免Duration 20 (20) 这种带括号的问题
EP = int(SP + float(one_data["Duration"].split("(")[0]))
SP *= self.frequency
EP *= self.frequency
# 对事件重新编码并存到事件记录表中
if one_data["Event type"] == "Hypopnea":
self.ecg_event_label[SP:EP] = 1
elif one_data["Event type"] == "Central apnea":
self.ecg_event_label[SP:EP] = 2
elif one_data["Event type"] == "Obstructive apnea":
self.ecg_event_label[SP:EP] = 3
elif one_data["Event type"] == "Mixed apnea":
self.ecg_event_label[SP:EP] = 4
elif one_data["Event type"] == "Desaturation":
self.spo2_event_label[SP:EP] = 5
logging.info("Finished Traversal PSG events")
self.textBrowser_update("提示完成遍历PSG事件")
# 读取心晓事件
df = pd.read_csv(bcg_label_path, encoding="gbk")
df["new_start"] = df["new_start"].astype("int")
df["new_end"] = df["new_end"].astype("int")
self.bcg_event_label_df = df
# 过滤不关注事件
df2 = df[df["Event type"].isin(self.focus_event_list)]
df2 = df2.sort_values(by='Epoch')
self.bcg_event_label_filtered_df = df2
self.bcg_event_label_index_list = df2.index.tolist()
logging.info("Traversaling XinXiao events...")
self.textBrowser_update("提示:正在遍历心晓事件")
for one_data in tqdm(df.index):
one_data = df.loc[one_data]
SP = one_data["new_start"] * self.frequency
EP = one_data["new_end"] * self.frequency
if one_data["Event type"] == "Hypopnea":
self.bcg_event_label[SP:EP] = 1
elif one_data["Event type"] == "Central apnea":
self.bcg_event_label[SP:EP] = 2
elif one_data["Event type"] == "Obstructive apnea":
self.bcg_event_label[SP:EP] = 3
elif one_data["Event type"] == "Mixed apnea":
self.bcg_event_label[SP:EP] = 4
logging.info("Finished Traversal XinXiao events")
self.textBrowser_update("提示:完成遍历心晓事件")
def read_artifact_label(self):
all_offset_length = pd.read_excel(self.Artifact_Offset_Path)
offset_length = all_offset_length[all_offset_length['数据编号'] == self.sampNo]['from_code'].values[0]
artifact_label_path = self.Artifact_Label_Path / f"Artifact_{self.sampNo}.txt"
artifact_label = pd.read_csv(artifact_label_path, header=None).to_numpy().reshape(-1, 4)
self.artifact_event_label = np.zeros(len(self.bcg_event_label) + self.extend_second * self.frequency)
for i, artifact_type, SP, EP in artifact_label:
SP = (int(SP) + offset_length) // (1000 // self.frequency)
EP = (int(EP) + offset_length) // (1000 // self.frequency)
artifact_type = int(artifact_type) + 5
SP = 0 if SP < 0 else SP
if EP < 0:
continue
if SP > len(self.bcg_event_label):
continue
self.artifact_event_label[SP:EP] = artifact_type
# assert len(self.ecg_event_label_filtered_df) == len(self.bcg_event_label_filtered_df), \
# f"PSG与心晓事件数量不一致, PSG事件数量{len(self.ecg_event_label_filtered_df)},
# 心晓事件数量{len(self.bcg_event_label_filtered_df)}"
def show_one_event(self, bcg_index: int, ecg_index: int, front_add_second: int, back_add_second: int):
"""
:param bcg_index: 心晓事件在csv中行号
:param ecg_index: PSG事件在csv中序号
:param front_add_second: 向前延伸时间
:param back_add_second: 向后延伸时间
:return:
"""
# 获取事件实际在csv文件中的序号
bcg_real_index = self.bcg_event_label_filtered_df.index[bcg_index],
ecg_real_index = self.ecg_event_label_filtered_df.index[ecg_index],
one_bcg_data = self.bcg_event_label_df.loc[bcg_real_index]
one_ecg_data = self.ecg_event_label_df.loc[ecg_real_index]
# 获取ECG事件开始与结束时间
event_start_time = datetime.strptime(one_ecg_data["Date"] + " " + one_ecg_data["Time"], '%Y/%m/%d %H:%M:%S')
ecg_SP = (event_start_time - self.ecg_start_time).seconds
ecg_duration = int(float(str(one_ecg_data["Duration"]).split("(")[0]) + 0.5)
ecg_EP = ecg_SP + ecg_duration
# 获取BCG事件开始与结束时间
bcg_SP = one_bcg_data["new_start"]
bcg_EP = one_bcg_data["new_end"]
self.bcg_SP = bcg_SP
self.bcg_EP = bcg_EP
bcg_duration = bcg_EP - bcg_SP
logging.info(f"sampNo:{self.sampNo} "
f"bcg[index:{bcg_index} epoch:{one_bcg_data['Epoch']} event:{one_bcg_data['Event type']}] "
f"ecg:[index:{ecg_index} epoch:{one_ecg_data['Epoch']} event:{one_ecg_data['Event type']}]")
if one_bcg_data['Event type'] != one_ecg_data['Event type']:
logging.error(f"sampNo:{self.sampNo} PSG事件与心晓时间不一致请排查"
f"bcg[index:{bcg_index} epoch:{one_bcg_data['Epoch']} event:{one_bcg_data['Event type']}] "
f"ecg:[index:{ecg_index} epoch:{one_ecg_data['Epoch']} event:{one_ecg_data['Event type']}]")
raise ValueError()
self.lineEdit_correctStart.setText(str(self.bcg_SP))
self.lineEdit_correctEnd.setText(str(self.bcg_EP))
# 进行向两边延展
ecg_SP = ecg_SP - front_add_second
ecg_EP = ecg_EP + back_add_second
bcg_SP = bcg_SP - front_add_second
bcg_EP = bcg_EP + back_add_second
# 各个子图之间的比例
gs = gridspec.GridSpec(7, 1, height_ratios=[1, 1, 1, 1, 1, 3, 2])
self.figure.subplots_adjust(top=1, bottom=0, right=1, left=0, hspace=0, wspace=0)
plt.margins(0, 0)
plt.tight_layout()
plt.xticks([])
plt.yticks([])
# 绘制 Flow1
self.ax0 = self.figure.add_subplot(gs[0])
self.plt_channel(plt_=plt, SP=ecg_SP, EP=ecg_EP, channel="Flow T")
# 绘制 Flow2
self.ax1 = self.figure.add_subplot(gs[1], sharex=self.ax0)
self.plt_channel(plt_=plt, SP=ecg_SP, EP=ecg_EP, channel="Flow P")
self.ax2 = self.figure.add_subplot(gs[2], sharex=self.ax0)
self.plt_channel(plt_=plt, SP=ecg_SP, EP=ecg_EP, channel="Effort Tho")
self.ax3 = self.figure.add_subplot(gs[3], sharex=self.ax0)
self.plt_channel(plt_=plt, SP=ecg_SP, EP=ecg_EP, channel="Effort Abd")
self.ax4 = self.figure.add_subplot(gs[4], sharex=self.ax0)
self.plt_channel(plt_=plt, SP=ecg_SP, EP=ecg_EP, channel="SpO2", event_code=[5])
self.ax5 = self.figure.add_subplot(gs[5], sharex=self.ax0)
self.plt_channel(plt_=plt, SP=bcg_SP, EP=bcg_EP, channel="orgdata",
event_code=[6, 7, 8, 9, 10, 1, 2, 3, 4],
event_show_under=False)
self.ax6 = self.figure.add_subplot(gs[6], sharex=self.ax0)
self.plt_channel(plt_=plt, SP=bcg_SP, EP=bcg_EP, channel="0.7lowpass_resp",
event_code=[6, 7, 8, 9, 10, 1, 2, 3, 4],
event_show_under=False,
title=f"sampNo:{self.sampNo} Index:{bcg_index + 1}/{len(self.bcg_event_label_filtered_df)}")
self.label_PSG_event.setText(f"PSG sampNo:{self.sampNo} Index:{ecg_index + 1}/{len(self.ecg_event_label_filtered_df)} "
f"Epoch:{one_ecg_data['Epoch']} Duration:{ecg_duration}s")
self.label_BCG_event.setText(f"心晓 sampNo:{self.sampNo} Index:{bcg_index + 1}/{len(self.bcg_event_label_filtered_df)} "
f"Epoch:{one_bcg_data['Epoch']} Duration:{bcg_duration}s")
# figManager = plt.get_current_fig_manager()
# figManager.window.showMaximized()
self.canvas.draw()
self.ax0.callbacks.connect('xlim_changed', lambda ax: self.on_xlim_change(ax))
def plt_channel(self, plt_, SP, EP, channel, event_code=[1, 2, 3, 4], event_show_under=False,
ax_top=True, ax_bottom=True, ax_left=True, ax_right=True, title=None):
"""
:param plt_:
:param SP: 显示开始秒数
:param EP: 显示结束秒数
:param channel: 通道名称
:param event_code: 要上色的事件编号
:param event_show_under: 事件颜色显示在信号下面
:param ax_top: 显示上框线
:param ax_bottom: 显示下框线
:param ax_left: 显示左框线
:param ax_right: 显示右框线
:return:
"""
linestyle = "-"
SP = 0 if SP < 0 else SP
plt_.plot(np.linspace(SP, EP, (EP - SP) * self.frequency),
self.signal_select[channel][SP * self.frequency:EP * self.frequency], label=channel,
color=self.color_cycle[0])
for j in event_code:
if channel == "SpO2":
mask = self.spo2_event_label[SP * self.frequency:EP * self.frequency] == j
elif channel == "orgdata" or channel == "0.7lowpass_resp":
if j <= 5:
mask = self.bcg_event_label[SP * self.frequency:EP * self.frequency] == j
else:
mask = self.artifact_event_label[SP * self.frequency:EP * self.frequency] == j
linestyle = "-"
else:
mask = self.ecg_event_label[SP * self.frequency:EP * self.frequency] == j
if event_show_under:
min_point = self.signal_select[channel][SP * self.frequency:EP * self.frequency].min()
len_segment = EP * self.frequency - SP * self.frequency
y = (min_point.repeat(len_segment) * mask).astype('float')
np.place(y, y == 0, np.nan)
else:
y = (self.signal_select[channel][SP * self.frequency:EP * self.frequency] * mask).astype('float')
np.place(y, y == 0, np.nan)
plt_.plot(np.linspace(SP, EP, (EP - SP) * self.frequency), y, color=self.color_cycle[j],
linestyle=linestyle)
plt_.legend(fontsize=8, loc=1)
if title is not None:
plt_.title(title)
ax = plt.gca()
ax.grid(True)
ax.spines["top"].set_visible(ax_top)
ax.spines["right"].set_visible(ax_right)
ax.spines["bottom"].set_visible(ax_bottom)
ax.spines["left"].set_visible(ax_left)
# xticks = [[]] if xticks else [range(SP, EP, 5), [str(i) for i in range(0, (EP - SP), 5)]]
# print(xticks)
# plt_.xticks(*xticks) # 去掉x轴
def show_new_event(self, bcg_index: int, ecg_index: int, front_add_second: int, back_add_second: int, time_move_count: int):
"""
:param bcg_index: 心晓事件在csv中行号
:param ecg_index: PSG事件在csv中序号
:param front_add_second: 向前延伸时间
:param back_add_second: 向后延伸时间
:param time_move_count: 时间轴移动的时间
:return:
"""
# 获取事件实际在csv文件中的序号
bcg_real_index = self.bcg_event_label_filtered_df.index[bcg_index],
ecg_real_index = self.ecg_event_label_filtered_df.index[ecg_index],
one_bcg_data = self.bcg_event_label_df.loc[bcg_real_index]
one_ecg_data = self.ecg_event_label_df.loc[ecg_real_index]
# 获取ECG事件开始与结束时间
event_start_time = datetime.strptime(one_ecg_data["Date"] + " " + one_ecg_data["Time"], '%Y/%m/%d %H:%M:%S')
ecg_SP = (event_start_time - self.ecg_start_time).seconds
ecg_duration = int(float(str(one_ecg_data["Duration"]).split("(")[0]) + 0.5)
ecg_EP = ecg_SP + ecg_duration
# 获取BCG事件开始与结束时间
bcg_SP = one_bcg_data["new_start"]
bcg_EP = one_bcg_data["new_end"]
bcg_duration = bcg_EP - bcg_SP
logging.info(f"sampNo:{self.sampNo} "
f"bcg[index:{bcg_index} epoch:{one_bcg_data['Epoch']} event:{one_bcg_data['Event type']}] "
f"ecg:[index:{ecg_index} epoch:{one_ecg_data['Epoch']} event:{one_ecg_data['Event type']}]")
if one_bcg_data['Event type'] != one_ecg_data['Event type']:
logging.error(f"sampNo:{self.sampNo} PSG事件与心晓时间不一致请排查"
f"bcg[index:{bcg_index} epoch:{one_bcg_data['Epoch']} event:{one_bcg_data['Event type']}] "
f"ecg:[index:{ecg_index} epoch:{one_ecg_data['Epoch']} event:{one_ecg_data['Event type']}]")
raise ValueError()
# 进行向两边延展
ecg_SP = ecg_SP - front_add_second + time_move_count
ecg_EP = ecg_EP + back_add_second + time_move_count
bcg_SP = bcg_SP - front_add_second + time_move_count
bcg_EP = bcg_EP + back_add_second + time_move_count
self.bcg_SP = bcg_SP
self.bcg_EP = bcg_EP
self.lineEdit_correctStart.setText(str(self.bcg_SP))
self.lineEdit_correctEnd.setText(str(self.bcg_EP))
# 各个子图之间的比例
gs = gridspec.GridSpec(7, 1, height_ratios=[1, 1, 1, 1, 1, 3, 2])
self.figure.subplots_adjust(top=1, bottom=0, right=1, left=0, hspace=0, wspace=0)
plt.margins(0, 0)
plt.tight_layout()
plt.xticks([])
plt.yticks([])
# 绘制 Flow1
self.ax0 = self.figure.add_subplot(gs[0])
self.plt_channel(plt_=plt, SP=ecg_SP, EP=ecg_EP, channel="Flow T")
# 绘制 Flow2
self.ax1 = self.figure.add_subplot(gs[1], sharex=self.ax0)
self.plt_channel(plt_=plt, SP=ecg_SP, EP=ecg_EP, channel="Flow P")
self.ax2 = self.figure.add_subplot(gs[2], sharex=self.ax0)
self.plt_channel(plt_=plt, SP=ecg_SP, EP=ecg_EP, channel="Effort Tho")
self.ax3 = self.figure.add_subplot(gs[3], sharex=self.ax0)
self.plt_channel(plt_=plt, SP=ecg_SP, EP=ecg_EP, channel="Effort Abd")
self.ax4 = self.figure.add_subplot(gs[4], sharex=self.ax0)
self.plt_channel(plt_=plt, SP=ecg_SP, EP=ecg_EP, channel="SpO2", event_code=[5])
self.ax5 = self.figure.add_subplot(gs[5], sharex=self.ax0)
self.plt_channel(plt_=plt, SP=bcg_SP, EP=bcg_EP, channel="orgdata",
event_code=[6, 7, 8, 9, 10, 1, 2, 3, 4],
event_show_under=False)
self.ax6 = self.figure.add_subplot(gs[6], sharex=self.ax0)
self.plt_channel(plt_=plt, SP=bcg_SP, EP=bcg_EP, channel="0.7lowpass_resp",
event_code=[6, 7, 8, 9, 10, 1, 2, 3, 4],
event_show_under=False,
title=f"sampNo:{self.sampNo} Index:{bcg_index + 1}/{len(self.bcg_event_label_filtered_df)}")
self.label_PSG_event.setText(
f"PSG sampNo:{self.sampNo} Index:{ecg_index + 1}/{len(self.ecg_event_label_filtered_df)} "
f"Epoch:{one_ecg_data['Epoch']} Duration:{ecg_duration}s")
self.label_BCG_event.setText(
f"心晓 sampNo:{self.sampNo} Index:{bcg_index + 1}/{len(self.bcg_event_label_filtered_df)} "
f"Epoch:{one_bcg_data['Epoch']} Duration:{bcg_duration}s")
# figManager = plt.get_current_fig_manager()
# figManager.window.showMaximized()
self.canvas.draw()
self.ax0.callbacks.connect('xlim_changed', lambda ax: self.on_xlim_change(ax))
def pd_add_new_row(self, df, score: int, remark: str, correct_Start: int, correct_End: int, correct_EventsType: str, isLabeled: int):
new_row = pd.Series(index=df.columns, data=np.nan)
new_row["score"] = score
new_row["remark"] = remark
new_row["correct_Start"] = correct_Start
new_row["correct_End"] = correct_End
new_row["correct_EventsType"] = correct_EventsType
new_row["isLabeled"] = isLabeled
return pd.concat([df, pd.DataFrame([new_row])], ignore_index=True)
# 此线程类用于更新textBrowser的内容
class Thread_textbrowserUpdate(QtCore.QThread):
trigger = QtCore.pyqtSignal(str)
def __init__(self):
super(Thread_textbrowserUpdate, self).__init__()
def run_(self, message):
self.trigger.emit(message)
# 此类用于继承matplot的NavigationToolbar类然后写回调函数
class CustomNavigationToolbar(NavigationToolbar2QT):
def __init__(self, canvas, parent, callback):
super().__init__(canvas, parent)
self.callback = callback
def home(self, *args):
super().home(*args) # 执行原始功能
if self.callback:
self.callback() # 执行自定义回调
# 主函数
if __name__ == '__main__':
app = QtWidgets.QApplication(sys.argv)
mainWindow = MainWindow()
mainWindow.show()
sys.exit(app.exec_())