Main_Quality_Relabel_GUI.py

修改一些pathlib和os.path复用的代码
包的引用改成from导入
减少打包后的程序体积
Preprocessing.py
较少不必要包的引入
This commit is contained in:
marques 2025-01-11 17:05:17 +08:00
parent 7b9ec0a4ac
commit 7555ca4a59
5 changed files with 344 additions and 417 deletions

1
.gitignore vendored
View File

@ -252,6 +252,7 @@ ipython_config.py
# Remove previous ipynb_checkpoints
# git rm -r .ipynb_checkpoints/
SaLabel/
data/*
logs/*
.idea/*

View File

@ -11,6 +11,7 @@
from PyQt5 import QtCore, QtGui, QtWidgets
class Ui_MainWindow(object):
def setupUi(self, MainWindow):
MainWindow.setObjectName("MainWindow")

View File

@ -5,26 +5,26 @@
@time:2025/1/4
"""
import os
import sys
import logging
import time
import numpy as np
import pandas as pd
import matplotlib
import pyedflib
import traceback
from logging import NOTSET, getLogger, FileHandler, Formatter, StreamHandler, info, error, debug
from time import time, strftime, localtime
from numpy import load, nan, zeros, append, linspace, place
from pandas import DataFrame, read_csv, read_excel, Series, concat
from matplotlib import use
from matplotlib import pyplot as plt, gridspec
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg, NavigationToolbar2QT
from pyedflib import EdfReader
from datetime import datetime
from tqdm import tqdm
from pathlib import Path
from PyQt5 import QtCore, QtWidgets
from PyQt5.QtWidgets import QFileDialog, QMainWindow, QMessageBox, QButtonGroup
from matplotlib import pyplot as plt, gridspec
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg, NavigationToolbar2QT
from MainWindow import Ui_MainWindow
from utils.Preprocessing import BCG_Operation
from PyQt5.QtCore import QCoreApplication, QThread, pyqtSignal
from PyQt5.QtWidgets import QFileDialog, QMainWindow, QMessageBox, QButtonGroup, QApplication
matplotlib.use("Qt5Agg") # 声明使用QT5
from MainWindow import Ui_MainWindow
from utils.Preprocessing import Butterworth
use("Qt5Agg") # 声明使用QT5
plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
@ -36,25 +36,25 @@ plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
# 'EEG A1-A2', 'Imp']
# 设置日志
logger = logging.getLogger()
logger.setLevel(logging.NOTSET)
realtime = time.strftime('%Y%m%d', time.localtime(time.time()))
if not os.path.exists(Path("logs")):
os.makedirs(Path("logs"))
fh = logging.FileHandler(Path("logs") / (realtime + ".log"), mode='a')
fh.setLevel(logging.NOTSET)
fh.setFormatter(logging.Formatter("%(asctime)s: %(message)s"))
logger = getLogger()
logger.setLevel(NOTSET)
realtime = strftime('%Y%m%d', localtime(time()))
if not Path("logs").exists():
Path("logs").mkdir(exist_ok=True)
fh = FileHandler(Path("logs") / (realtime + ".log"), mode='a')
fh.setLevel(NOTSET)
fh.setFormatter(Formatter("%(asctime)s: %(message)s"))
logger.addHandler(fh)
ch = logging.StreamHandler()
ch.setLevel(logging.NOTSET)
ch.setFormatter(logging.Formatter("%(asctime)s: %(message)s"))
ch = StreamHandler()
ch.setLevel(NOTSET)
ch.setFormatter(Formatter("%(asctime)s: %(message)s"))
logger.addHandler(ch)
logging.getLogger('matplotlib.font_manager').disabled = True
logging.info("------------------------------------")
getLogger('matplotlib.font_manager').disabled = True
info("------------------------------------")
class MainWindow(QMainWindow, Ui_MainWindow):
# 可选择的通道
base_channel = ['EEG F3-A2', 'EEG F4-A1', 'EEG C3-A2', 'EEG C4-A1', 'EEG O1-A2', 'EEG O2-A1', 'EOG Right',
'EOG Left', 'EMG Chin', 'ECG I', 'RR', 'ECG II', 'Effort Tho', 'Flow Patient', 'Flow Patient', 'HR',
@ -178,72 +178,76 @@ class MainWindow(QMainWindow, Ui_MainWindow):
fileDialog.setOption(QFileDialog.ShowDirsOnly, True)
if fileDialog.exec_() == QFileDialog.Accepted:
self.dir_path = fileDialog.selectedFiles()[0]
if self.dir_path:
logging.info("Loading Data Path...")
self.PSG_Data_Path = Path(os.path.join(self.dir_path, "PSG"))
self.BCG_Data_Path = Path(os.path.join(self.dir_path, "BCG"))
self.BCG_Label_Path = Path(os.path.join(self.dir_path, "BCG_label"))
self.Artifact_Label_Path = Path(os.path.join(self.dir_path, "Artifact_label"))
self.Artifact_Offset_Path = Path(os.path.join(self.dir_path, "Artifact_label", "20220421Artifact_offset_value.xlsx"))
if self.PSG_Data_Path.exists() and self.BCG_Data_Path.exists() and self.BCG_Label_Path.exists() and self.Artifact_Label_Path.exists() and self.Artifact_Label_Path.exists():
sampIDs = os.listdir(self.BCG_Data_Path)
sampID_for_comboBox = []
for sampID in sampIDs:
sampID = sampID.replace("samp.npy", "")
sampID_for_comboBox.append(sampID)
bcg_path = self.BCG_Data_Path / f"{sampID}samp.npy"
ecg_path = self.PSG_Data_Path / f"A{str(sampID).rjust(7, '0')}.edf"
bcg_label_path = self.BCG_Label_Path / f"export{sampID}_all.csv"
if not bcg_path.exists():
logging.error(f"Can't find {bcg_path}!")
self.msgBox.setText(f"找不到数据{bcg_path}")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
return
if not ecg_path.exists():
logging.error(f"Can't find {ecg_path}!")
self.msgBox.setText(f"找不到数据{ecg_path}")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
return
if not bcg_label_path.exists():
logging.error(f"Can't find {bcg_label_path}!")
self.msgBox.setText(f"找不到数据{bcg_label_path}")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
return
self.comboBox_sampID.setEnabled(True)
self.lineEdit_start_bcg_index.setEnabled(True)
self.comboBox_sampID.addItems(sampID_for_comboBox)
self.lineEdit_start_bcg_index.setEnabled(True)
self.lineEdit_frequency.setEnabled(True)
self.lineEdit_bcg_frequency.setEnabled(True)
self.lineEdit_front_add_second.setEnabled(True)
self.lineEdit_back_add_second.setEnabled(True)
self.checkBox_OSA.setEnabled(True)
self.checkBox_CSA.setEnabled(True)
self.checkBox_MSA.setEnabled(True)
self.checkBox_HPY.setEnabled(True)
self.action_selectPath.setEnabled(False)
self.pushButton_confirmLabel.setEnabled(True)
self.pushButton_confirmLabel.setText("开始打标")
MainWindow.setWindowTitle(self, QtCore.QCoreApplication.translate("MainWindow", "Main_Quality_Relabel_GUI - Data Path: " + self.dir_path))
logging.info("Successfully Loaded Data Path.")
self.textBrowser_update("操作:数据路径选择成功")
else:
logging.info("Failed to Load Data Path.")
self.textBrowser_update("操作:数据路径选择错误,缺乏必要数据文件夹")
self.msgBox.setText("数据路径选择错误,缺乏必要数据文件夹")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
else:
logging.info("Data Path not Exist...")
if not self.dir_path:
info("Data Path not Exist...")
self.textBrowser_update("操作:载入存档错误,存档不存在")
self.msgBox.setText("载入存档错误,存档不存在")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
return
self.dir_path = Path(self.dir_path)
info("Loading Data Path...")
self.PSG_Data_Path = self.dir_path / "PSG"
self.BCG_Data_Path = self.dir_path / "BCG"
self.BCG_Label_Path = self.dir_path / "BCG_label"
self.Artifact_Label_Path = self.dir_path / "Artifact_label"
self.Artifact_Offset_Path = self.dir_path / "Artifact_label" / "20220421Artifact_offset_value.xlsx"
if self.PSG_Data_Path.exists() and self.BCG_Data_Path.exists() and self.BCG_Label_Path.exists() and self.Artifact_Label_Path.exists() and self.Artifact_Label_Path.exists():
sampIDs = self.BCG_Data_Path.glob()
sampID_for_comboBox = []
for sampID in sampIDs:
sampID = sampID.replace("samp.npy", "")
sampID_for_comboBox.append(sampID)
bcg_path = self.BCG_Data_Path / f"{sampID}samp.npy"
ecg_path = self.PSG_Data_Path / f"A{str(sampID).rjust(7, '0')}.edf"
bcg_label_path = self.BCG_Label_Path / f"export{sampID}_all.csv"
if not bcg_path.exists():
error(f"Can't find {bcg_path}!")
self.msgBox.setText(f"找不到数据{bcg_path}")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
return
if not ecg_path.exists():
error(f"Can't find {ecg_path}!")
self.msgBox.setText(f"找不到数据{ecg_path}")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
return
if not bcg_label_path.exists():
error(f"Can't find {bcg_label_path}!")
self.msgBox.setText(f"找不到数据{bcg_label_path}")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
return
self.comboBox_sampID.setEnabled(True)
self.lineEdit_start_bcg_index.setEnabled(True)
self.comboBox_sampID.addItems(sampID_for_comboBox)
self.lineEdit_start_bcg_index.setEnabled(True)
self.lineEdit_frequency.setEnabled(True)
self.lineEdit_bcg_frequency.setEnabled(True)
self.lineEdit_front_add_second.setEnabled(True)
self.lineEdit_back_add_second.setEnabled(True)
self.checkBox_OSA.setEnabled(True)
self.checkBox_CSA.setEnabled(True)
self.checkBox_MSA.setEnabled(True)
self.checkBox_HPY.setEnabled(True)
self.action_selectPath.setEnabled(False)
self.pushButton_confirmLabel.setEnabled(True)
self.pushButton_confirmLabel.setText("开始打标")
MainWindow.setWindowTitle(self, QCoreApplication.translate("MainWindow",
"Main_Quality_Relabel_GUI - Data Path: " + self.dir_path))
info("Successfully Loaded Data Path.")
self.textBrowser_update("操作:数据路径选择成功")
else:
info("Failed to Load Data Path.")
self.textBrowser_update("操作:数据路径选择错误,缺乏必要数据文件夹")
self.msgBox.setText("数据路径选择错误,缺乏必要数据文件夹")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
else:
logging.info("Canceled Loading Data Path.")
info("Canceled Loading Data Path.")
self.textBrowser_update("提示:数据路径选择取消")
self.msgBox.setText("数据路径选择取消")
self.msgBox.setIcon(QMessageBox.Warning)
@ -264,7 +268,7 @@ class MainWindow(QMainWindow, Ui_MainWindow):
self.pushButton_left.setEnabled(False)
self.pushButton_right.setEnabled(True)
self.pushButton_confirmLabel.setText("确定打标参数(S)")
self.pushButton_confirmLabel.setShortcut(QtCore.QCoreApplication.translate("MainWindow", "S"))
self.pushButton_confirmLabel.setShortcut(QCoreApplication.translate("MainWindow", "S"))
self.pushButton_confirmLabel.clicked.disconnect(self.slot_btn_run)
self.pushButton_confirmLabel.clicked.connect(self.slot_btn_confirmLabel)
self.pushButton_quick_remark_input_waitingForTalk.setEnabled(True)
@ -297,49 +301,61 @@ class MainWindow(QMainWindow, Ui_MainWindow):
self.lineEdit_remark.setEnabled(True)
self.lineEdit_correctStart.setEnabled(True)
self.lineEdit_correctEnd.setEnabled(True)
self.pd = pd.read_csv(Path(os.path.join(self.BCG_Label_Path, f"export{self.comboBox_sampID.currentText()}_all.csv")), encoding="gbk")
csv_headers = self.pd.columns.tolist()
if not (("score" in csv_headers) and ("remark" in csv_headers) and ("correct_Start" in csv_headers) and ("correct_End") in csv_headers and ("isLabeled" in csv_headers)):
self.pd["score"] = "-1"
self.pd["remark"] = ""
self.pd["correct_Start"] = "-1"
self.pd["correct_End"] = "-1"
self.pd["correct_EventsType"] = ""
self.pd["isLabeled"] = "-1"
self.pd["score"] = self.pd["score"].astype(int)
self.pd["remark"] = self.pd["remark"].astype(str)
self.pd["correct_Start"] = self.pd["correct_Start"].astype(int)
self.pd["correct_End"] = self.pd["correct_End"].astype(int)
self.pd["correct_EventsType"] = self.pd["correct_EventsType"].astype(str)
self.pd["isLabeled"] = self.pd["isLabeled"].astype(int)
self.df_saLabel = read_csv(
self.BCG_Label_Path / f"export{self.comboBox_sampID.currentText()}_all.csv",
encoding="gbk")
csv_headers = self.df_saLabel.columns.tolist()
if not (("score" in csv_headers) and ("remark" in csv_headers) and ("correct_Start" in csv_headers) and (
"correct_End") in csv_headers and ("isLabeled" in csv_headers)):
self.df_saLabel["score"] = "-1"
self.df_saLabel["remark"] = ""
self.df_saLabel["correct_Start"] = "-1"
self.df_saLabel["correct_End"] = "-1"
self.df_saLabel["correct_EventsType"] = ""
self.df_saLabel["isLabeled"] = "-1"
self.df_saLabel["score"] = self.df_saLabel["score"].astype(int)
self.df_saLabel["remark"] = self.df_saLabel["remark"].astype(str)
self.df_saLabel["correct_Start"] = self.df_saLabel["correct_Start"].astype(int)
self.df_saLabel["correct_End"] = self.df_saLabel["correct_End"].astype(int)
self.df_saLabel["correct_EventsType"] = self.df_saLabel["correct_EventsType"].astype(str)
self.df_saLabel["isLabeled"] = self.df_saLabel["isLabeled"].astype(int)
self.change_radioButton_events(self.bcg_event_label_index_list[self.plotEventIndex])
if not Path(os.path.join(self.BCG_Label_Path, f"export{self.comboBox_sampID.currentText()}_addNew.csv")).exists():
self.pd_examineBySecond = pd.DataFrame(columns=self.pd.columns)
if not Path(self.BCG_Label_Path /
f"export{self.comboBox_sampID.currentText()}_addNew.csv").exists():
self.pd_examineBySecond = DataFrame(columns=self.df_saLabel.columns)
self.pd_examineBySecond["score"] = self.pd_examineBySecond["score"].astype(int)
self.pd_examineBySecond["remark"] = self.pd_examineBySecond["remark"].astype(str)
self.pd_examineBySecond["correct_Start"] = self.pd_examineBySecond["correct_Start"].astype(int)
self.pd_examineBySecond["correct_End"] = self.pd_examineBySecond["correct_End"].astype(int)
self.pd_examineBySecond["correct_EventsType"] = self.pd_examineBySecond["correct_EventsType"].astype(str)
self.pd_examineBySecond["correct_EventsType"] = self.pd_examineBySecond["correct_EventsType"].astype(
str)
self.pd_examineBySecond["isLabeled"] = self.pd_examineBySecond["isLabeled"].astype(int)
self.pd_examineBySecond.to_csv(Path(os.path.join(self.BCG_Label_Path, f"export{self.comboBox_sampID.currentText()}_addNew.csv")), index=False, encoding="gbk")
self.pd_examineBySecond.to_csv(
self.BCG_Label_Path / f"export{self.comboBox_sampID.currentText()}_addNew.csv",
index=False, encoding="gbk")
else:
self.pd_examineBySecond = pd.read_csv(Path(os.path.join(self.BCG_Label_Path, f"export{self.comboBox_sampID.currentText()}_addNew.csv")), encoding="gbk")
if (self.pd["isLabeled"] == 1).all():
self.pd_examineBySecond = read_csv(
self.BCG_Label_Path / f"export{self.comboBox_sampID.currentText()}_addNew.csv",
encoding="gbk")
if (self.df_saLabel["isLabeled"] == 1).all():
self.checkBox_examineLabeled.setChecked(False)
self.msgBox.setText("该份数据打标已全部完成")
self.msgBox.setIcon(QMessageBox.Information)
self.msgBox.exec()
self.pd.to_csv(Path(os.path.join(self.BCG_Label_Path, f"export{self.comboBox_sampID.currentText()}_all.csv")), mode='w', index=None, encoding="gbk")
self.show_one_event(self.plotEventIndex, front_add_second=self.front_add_second, back_add_second=self.back_add_second)
if not str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "remark"]) == "nan":
self.lineEdit_remark.setText(str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "remark"]))
self.df_saLabel.to_csv(self.BCG_Label_Path / f"export{self.comboBox_sampID.currentText()}_all.csv",
mode='w', index=None, encoding="gbk")
self.show_one_event(self.plotEventIndex, front_add_second=self.front_add_second,
back_add_second=self.back_add_second)
if not str(self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "remark"]) == "nan":
self.lineEdit_remark.setText(
str(self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "remark"]))
else:
self.lineEdit_remark.setText("")
if str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"]) == "1":
if str(self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"]) == "1":
self.radioButton_1_class.setChecked(True)
elif str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"]) == "2":
elif str(self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"]) == "2":
self.radioButton_2_class.setChecked(True)
elif str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"]) == "3":
elif str(self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"]) == "3":
self.radioButton_3_class.setChecked(True)
else:
self.radioButton_2_class.setChecked(True)
@ -350,20 +366,22 @@ class MainWindow(QMainWindow, Ui_MainWindow):
self.figure.clear()
self.plotEventIndex = self.plotEventIndex - 1
if self.checkBox_examineLabeled.isChecked() == True:
while self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "isLabeled"] == 1 and self.plotEventIndex > self.start_bcg_index:
while self.df_saLabel.at[self.bcg_event_label_index_list[
self.plotEventIndex], "isLabeled"] == 1 and self.plotEventIndex > self.start_bcg_index:
self.plotEventIndex = self.plotEventIndex - 1
self.show_one_event(self.plotEventIndex, front_add_second=self.front_add_second,
back_add_second=self.back_add_second)
if not str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "remark"]) == "nan":
self.lineEdit_remark.setText(str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "remark"]))
if not str(self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "remark"]) == "nan":
self.lineEdit_remark.setText(
str(self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "remark"]))
else:
self.lineEdit_remark.setText("")
self.change_radioButton_events(self.bcg_event_label_index_list[self.plotEventIndex])
if str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"]) == "1":
if str(self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"]) == "1":
self.radioButton_1_class.setChecked(True)
elif str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"]) == "2":
elif str(self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"]) == "2":
self.radioButton_2_class.setChecked(True)
elif str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"]) == "3":
elif str(self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"]) == "3":
self.radioButton_3_class.setChecked(True)
else:
self.radioButton_2_class.setChecked(True)
@ -383,20 +401,23 @@ class MainWindow(QMainWindow, Ui_MainWindow):
self.figure.clear()
self.plotEventIndex = self.plotEventIndex + 1
if self.checkBox_examineLabeled.isChecked() == True:
while self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "isLabeled"] == 1 and self.plotEventIndex < len(self.bcg_event_label_filtered_df) - 1:
while self.df_saLabel.at[
self.bcg_event_label_index_list[self.plotEventIndex], "isLabeled"] == 1 and self.plotEventIndex < len(
self.bcg_event_label_filtered_df) - 1:
self.plotEventIndex = self.plotEventIndex + 1
self.show_one_event(self.plotEventIndex, front_add_second=self.front_add_second,
back_add_second=self.back_add_second)
if not str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "remark"]) == "nan":
self.lineEdit_remark.setText(str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "remark"]))
if not str(self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "remark"]) == "nan":
self.lineEdit_remark.setText(
str(self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "remark"]))
else:
self.lineEdit_remark.setText("")
self.change_radioButton_events(self.bcg_event_label_index_list[self.plotEventIndex])
if str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"]) == "1":
if str(self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"]) == "1":
self.radioButton_1_class.setChecked(True)
elif str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"]) == "2":
elif str(self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"]) == "2":
self.radioButton_2_class.setChecked(True)
elif str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"]) == "3":
elif str(self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"]) == "3":
self.radioButton_3_class.setChecked(True)
else:
self.radioButton_2_class.setChecked(True)
@ -437,36 +458,59 @@ class MainWindow(QMainWindow, Ui_MainWindow):
elif self.radioButton_HPY.isChecked() == True:
correct_EventsType = "Hypopnea"
isLabeled = int(1)
self.pd_examineBySecond = self.pd_add_new_row(self.pd_examineBySecond, score, remark, correct_Start, correct_End, correct_EventsType, isLabeled)
self.pd_examineBySecond.to_csv(Path(os.path.join(self.BCG_Label_Path, f"export{self.comboBox_sampID.currentText()}_addNew.csv")), mode='w', index=None, encoding="gbk")
self.textBrowser_update("操作保存新事件打标结果到csv。" + "score:" + str(score) + "correct_Start:" + str(correct_Start) + "correct_End:" + str(correct_End) + "correct_EventsType:" + str(correct_EventsType))
self.pd_examineBySecond = self.pd_add_new_row(self.pd_examineBySecond, score, remark, correct_Start,
correct_End, correct_EventsType, isLabeled)
self.pd_examineBySecond.to_csv(
self.BCG_Label_Path / f"export{self.comboBox_sampID.currentText()}_addNew.csv",
mode='w', index=None, encoding="gbk")
self.textBrowser_update(
"操作保存新事件打标结果到csv。" + "score:" + str(score) + "correct_Start:" + str(
correct_Start) + "correct_End:" + str(correct_End) + "correct_EventsType:" + str(
correct_EventsType))
else:
self.msgBox.setText("起始时间和终止时间输入错误")
self.msgBox.setIcon(QMessageBox.Critical)
self.msgBox.exec()
elif self.checkBox_examineBySecond.isChecked() == False:
elif not self.checkBox_examineBySecond.isChecked():
if int(self.lineEdit_correctStart.text()) < int(self.lineEdit_correctEnd.text()):
if self.radioButton_1_class.isChecked() == True:
self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"] = int(1)
elif self.radioButton_2_class.isChecked() == True:
self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"] = int(2)
elif self.radioButton_3_class.isChecked() == True:
self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"] = int(3)
self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "remark"] = self.lineEdit_remark.text()
self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_Start"] = int(self.lineEdit_correctStart.text())
self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_End"] = int(self.lineEdit_correctEnd.text())
if self.radioButton_OSA.isChecked() == True:
self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_EventsType"] = "Obstructive apnea"
elif self.radioButton_CSA.isChecked() == True:
self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_EventsType"] = "Central apnea"
elif self.radioButton_MSA.isChecked() == True:
self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_EventsType"] = "Mixed apnea"
elif self.radioButton_HPY.isChecked() == True:
self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_EventsType"] = "Hypopnea"
self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "isLabeled"] = int(1)
self.pd.to_csv(Path(os.path.join(self.BCG_Label_Path, f"export{self.comboBox_sampID.currentText()}_all.csv")), mode='w', index=None, encoding="gbk")
self.textBrowser_update("操作保存index" + str(self.plotEventIndex + 1) + "打标结果到csv。" + "score:" + str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"]) + "correct_Start:" + str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_Start"]) + "correct_End:" + str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_End"]) + "correct_EventsType:" + str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_EventsType"]))
if (self.pd.loc[self.bcg_event_label_index_list]["isLabeled"] == 1).all():
if self.radioButton_1_class.isChecked():
self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"] = int(1)
elif self.radioButton_2_class.isChecked():
self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"] = int(2)
elif self.radioButton_3_class.isChecked():
self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "score"] = int(3)
self.df_saLabel.at[
self.bcg_event_label_index_list[self.plotEventIndex], "remark"] = self.lineEdit_remark.text()
self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_Start"] = int(
self.lineEdit_correctStart.text())
self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_End"] = int(
self.lineEdit_correctEnd.text())
if self.radioButton_OSA.isChecked():
self.df_saLabel.at[self.bcg_event_label_index_list[
self.plotEventIndex], "correct_EventsType"] = "Obstructive apnea"
elif self.radioButton_CSA.isChecked():
self.df_saLabel.at[
self.bcg_event_label_index_list[self.plotEventIndex], "correct_EventsType"] = "Central apnea"
elif self.radioButton_MSA.isChecked():
self.df_saLabel.at[
self.bcg_event_label_index_list[self.plotEventIndex], "correct_EventsType"] = "Mixed apnea"
elif self.radioButton_HPY.isChecked():
self.df_saLabel.at[
self.bcg_event_label_index_list[self.plotEventIndex], "correct_EventsType"] = "Hypopnea"
self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "isLabeled"] = int(1)
self.df_saLabel.to_csv(self.BCG_Label_Path / f"export{self.comboBox_sampID.currentText()}_all.csv",
mode='w', index=None, encoding="gbk")
self.textBrowser_update(
"操作保存index" + str(self.plotEventIndex + 1) + "打标结果到csv。" + "score:" + str(
self.df_saLabel.at[
self.bcg_event_label_index_list[
self.plotEventIndex], "score"]) + "correct_Start:" + str(
self.df_saLabel.at[self.bcg_event_label_index_list[
self.plotEventIndex], "correct_Start"]) + "correct_End:" + str(self.df_saLabel.at[
self.bcg_event_label_index_list[
self.plotEventIndex], "correct_End"]) + "correct_EventsType:" + str(
self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_EventsType"]))
if (self.df_saLabel.loc[self.bcg_event_label_index_list]["isLabeled"] == 1).all():
self.msgBox.setText("该份数据打标已全部完成")
self.msgBox.setIcon(QMessageBox.Information)
self.msgBox.exec()
@ -570,7 +614,7 @@ class MainWindow(QMainWindow, Ui_MainWindow):
# +60s的槽函数
def enable_checkBox_examineLabeled(self):
if self.checkBox_examineLabeled.isChecked():
if (self.pd.loc[self.bcg_event_label_index_list]["isLabeled"] == 1).all():
if (self.df_saLabel.loc[self.bcg_event_label_index_list]["isLabeled"] == 1).all():
self.checkBox_examineLabeled.setChecked(False)
self.msgBox.setText("该份数据打标已全部完成")
self.msgBox.setIcon(QMessageBox.Information)
@ -636,23 +680,23 @@ class MainWindow(QMainWindow, Ui_MainWindow):
# 修改了事件类型单选框后执行的函数
def change_radioButton_events(self, index):
if not (self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "isLabeled"] == -1):
if self.pd.at[index, "correct_EventsType"] == "Obstructive apnea":
if not (self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "isLabeled"] == -1):
if self.df_saLabel.at[index, "correct_EventsType"] == "Obstructive apnea":
self.radioButton_OSA.setChecked(True)
elif self.pd.at[index, "correct_EventsType"] == "Central apnea":
elif self.df_saLabel.at[index, "correct_EventsType"] == "Central apnea":
self.radioButton_CSA.setChecked(True)
elif self.pd.at[index, "correct_EventsType"] == "Mixed apnea":
elif self.df_saLabel.at[index, "correct_EventsType"] == "Mixed apnea":
self.radioButton_MSA.setChecked(True)
elif self.pd.at[index, "correct_EventsType"] == "Hypopnea":
elif self.df_saLabel.at[index, "correct_EventsType"] == "Hypopnea":
self.radioButton_HPY.setChecked(True)
else:
if self.pd.at[index, "Event type"] == "Obstructive apnea":
if self.df_saLabel.at[index, "Event type"] == "Obstructive apnea":
self.radioButton_OSA.setChecked(True)
elif self.pd.at[index, "Event type"] == "Central apnea":
elif self.df_saLabel.at[index, "Event type"] == "Central apnea":
self.radioButton_CSA.setChecked(True)
elif self.pd.at[index, "Event type"] == "Mixed apnea":
elif self.df_saLabel.at[index, "Event type"] == "Mixed apnea":
self.radioButton_MSA.setChecked(True)
elif self.pd.at[index, "Event type"] == "Hypopnea":
elif self.df_saLabel.at[index, "Event type"] == "Hypopnea":
self.radioButton_HPY.setChecked(True)
# 点击开始打标按钮后的初始化变量函数
@ -684,7 +728,7 @@ class MainWindow(QMainWindow, Ui_MainWindow):
def check_channel(self):
for i in self.channel_list:
if i not in self.base_channel:
logging.debug(f"{i} 不存在于常见通道名中")
debug(f"{i} 不存在于常见通道名中")
print(f"常见通道名:{self.channel_list}")
def set_focus_event_list(self):
@ -704,19 +748,19 @@ class MainWindow(QMainWindow, Ui_MainWindow):
ecg_path = self.PSG_Data_Path / f"A{str(self.sampNo).rjust(7, '0')}.edf"
if not bcg_path.exists():
logging.error(f"Can't find {bcg_path} !")
error(f"Can't find {bcg_path} !")
raise FileNotFoundError(f"Can't find {bcg_path} !")
if not ecg_path.exists():
logging.error(f"Can't find {ecg_path} !")
error(f"Can't find {ecg_path} !")
raise FileNotFoundError(f"Can't find {ecg_path} !")
with pyedflib.EdfReader(str(ecg_path.resolve())) as file:
with EdfReader(str(ecg_path.resolve())) as file:
signal_num = file.signals_in_file
logging.debug(f"{self.sampNo} EDF file signal number is {signal_num}")
debug(f"{self.sampNo} EDF file signal number is {signal_num}")
signal_label = file.getSignalLabels()
logging.debug(f"{self.sampNo} EDF file signal label : {signal_label}")
debug(f"{self.sampNo} EDF file signal label : {signal_label}")
# 打印PSG信息
file.file_info_long()
@ -724,7 +768,7 @@ class MainWindow(QMainWindow, Ui_MainWindow):
# sub_index 用于区分两个flow patient
sub_index = 1
logging.info("Loading PSG signal...")
info("Loading PSG signal...")
self.textBrowser_update("提示正在加载PSG信号")
for i, index in enumerate(signal_label):
# 仅加载选中的通道
@ -744,44 +788,43 @@ class MainWindow(QMainWindow, Ui_MainWindow):
elif sample_frequency > self.frequency:
signal = signal[::int(sample_frequency / self.frequency)]
self.signal_select[index] = signal
logging.info(f"Finished load PSG: {index}")
info(f"Finished load PSG: {index}")
self.textBrowser_update("提示完成加载PSG信号")
# 加载心晓信号
logging.info("Loading XinXiao signal...")
info("Loading XinXiao signal...")
self.textBrowser_update("提示:正在加载心晓信号")
signal = np.load(bcg_path)
preprocessing = BCG_Operation(sample_rate=self.bcg_frequency)
signal = load(bcg_path)
# 20Hz低通去噪
signal1 = preprocessing.Butterworth(signal, 'lowpass', low_cut=20, order=3)
signal1 = Butterworth(signal, self.bcg_frequency, 'lowpass', low_cut=20, order=3)
# 0.7Hz 低通提取呼吸
signal2 = preprocessing.Butterworth(signal, 'lowpass', low_cut=0.7, order=3)
signal2 = Butterworth(signal, self.bcg_frequency, 'lowpass', low_cut=0.7, order=3)
# 进行降采样
signal1 = signal1[::int(self.bcg_frequency / self.frequency)]
signal2 = signal2[::int(self.bcg_frequency / self.frequency)]
# 根据心晓事件长度生成事件记录
self.bcg_event_label = np.zeros(len(signal) + self.extend_second * self.frequency)
self.bcg_event_label = zeros(len(signal) + self.extend_second * self.frequency)
self.signal_select['orgdata'] = signal1
self.signal_select['0.7lowpass_resp'] = signal2
for signal_key in self.signal_select.keys():
self.signal_select[signal_key] = np.append(self.signal_select[signal_key],
self.signal_select[signal_key] = append(self.signal_select[signal_key],
self.signal_select[signal_key].mean().astype(int).repeat(
self.extend_second * self.frequency))
logging.info("Finished load XinXiao signal")
info("Finished load XinXiao signal")
self.textBrowser_update("提示:完成加载心晓信号")
def read_event(self):
bcg_label_path = self.BCG_Label_Path / f"export{self.sampNo}_all.csv"
if not bcg_label_path.exists():
logging.error(f"Can't find {bcg_label_path} !")
error(f"Can't find {bcg_label_path} !")
raise FileNotFoundError(f"Can't find {bcg_label_path} !")
# 读取心晓事件
df = pd.read_csv(bcg_label_path, encoding="gbk")
df = read_csv(bcg_label_path, encoding="gbk")
df["Start"] = df["Start"].astype("int")
df["End"] = df["End"].astype("int")
self.bcg_event_label_df = df
@ -791,7 +834,7 @@ class MainWindow(QMainWindow, Ui_MainWindow):
df2 = df2.sort_values(by='Epoch')
self.bcg_event_label_filtered_df = df2
self.bcg_event_label_index_list = df2.index.tolist()
logging.info("Traversaling XinXiao events...")
info("Traversaling XinXiao events...")
self.textBrowser_update("提示:正在遍历心晓事件")
for one_data in tqdm(df.index):
one_data = df.loc[one_data]
@ -806,16 +849,16 @@ class MainWindow(QMainWindow, Ui_MainWindow):
self.bcg_event_label[SP:EP] = 3
elif one_data["Event type"] == "Mixed apnea":
self.bcg_event_label[SP:EP] = 4
logging.info("Finished Traversal XinXiao events")
info("Finished Traversal XinXiao events")
self.textBrowser_update("提示:完成遍历心晓事件")
def read_artifact_label(self):
all_offset_length = pd.read_excel(self.Artifact_Offset_Path)
all_offset_length = read_excel(self.Artifact_Offset_Path)
offset_length = all_offset_length[all_offset_length['数据编号'] == self.sampNo]['from_code'].values[0]
artifact_label_path = self.Artifact_Label_Path / f"Artifact_{self.sampNo}.txt"
artifact_label = pd.read_csv(artifact_label_path, header=None).to_numpy().reshape(-1, 4)
artifact_label = read_csv(artifact_label_path, header=None).to_numpy().reshape(-1, 4)
self.artifact_event_label = np.zeros(len(self.bcg_event_label) + self.extend_second * self.frequency)
self.artifact_event_label = zeros(len(self.bcg_event_label) + self.extend_second * self.frequency)
for i, artifact_type, SP, EP in artifact_label:
SP = (int(SP) + offset_length) // (1000 // self.frequency)
@ -849,12 +892,10 @@ class MainWindow(QMainWindow, Ui_MainWindow):
self.bcg_EP = bcg_EP
bcg_duration = bcg_EP - bcg_SP
logging.info(f"sampNo:{self.sampNo} "
info(f"sampNo:{self.sampNo} "
f"bcg[index:{bcg_index} epoch:{one_bcg_data['Epoch']} event:{one_bcg_data['Event type']}] "
f"ecg:[index:{bcg_index} epoch:{one_bcg_data['Epoch']} event:{one_bcg_data['Event type']}]")
# 进行向两边延展
bcg_SP = bcg_SP - front_add_second
bcg_EP = bcg_EP + back_add_second
@ -867,12 +908,16 @@ class MainWindow(QMainWindow, Ui_MainWindow):
plt.xticks([])
plt.yticks([])
if not (self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "isLabeled"] == -1):
bcg_duration_new = self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_End"] - self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_Start"]
self.lineEdit_correctStart.setText(str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_Start"]))
self.lineEdit_correctEnd.setText(str(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_End"]))
self.label_BCG_event.setText(f"sampNo:{self.sampNo} Index:{bcg_index + 1}/{len(self.bcg_event_label_filtered_df)} "
f"Epoch:{one_bcg_data['Epoch']} Duration:{bcg_duration}s New Duration:{bcg_duration_new}s")
if not (self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "isLabeled"] == -1):
bcg_duration_new = self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_End"] - \
self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_Start"]
self.lineEdit_correctStart.setText(
str(self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_Start"]))
self.lineEdit_correctEnd.setText(
str(self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_End"]))
self.label_BCG_event.setText(
f"sampNo:{self.sampNo} Index:{bcg_index + 1}/{len(self.bcg_event_label_filtered_df)} "
f"Epoch:{one_bcg_data['Epoch']} Duration:{bcg_duration}s New Duration:{bcg_duration_new}s")
else:
bcg_duration_new = -1
self.lineEdit_correctStart.setText(str(self.bcg_SP))
@ -883,20 +928,25 @@ class MainWindow(QMainWindow, Ui_MainWindow):
# 绘制 Flow1
self.ax0 = self.figure.add_subplot(gs[0])
self.plt_channel(plt_=plt, SP=bcg_SP, EP=bcg_EP, channel="Flow T", bcg_duration_new=bcg_duration_new, show_mode="one")
self.plt_channel(plt_=plt, SP=bcg_SP, EP=bcg_EP, channel="Flow T", bcg_duration_new=bcg_duration_new,
show_mode="one")
# 绘制 Flow2
self.ax1 = self.figure.add_subplot(gs[1], sharex=self.ax0)
self.plt_channel(plt_=plt, SP=bcg_SP, EP=bcg_EP, channel="Flow P", bcg_duration_new=bcg_duration_new, show_mode="one")
self.plt_channel(plt_=plt, SP=bcg_SP, EP=bcg_EP, channel="Flow P", bcg_duration_new=bcg_duration_new,
show_mode="one")
self.ax2 = self.figure.add_subplot(gs[2], sharex=self.ax0)
self.plt_channel(plt_=plt, SP=bcg_SP, EP=bcg_EP, channel="Effort Tho", bcg_duration_new=bcg_duration_new, show_mode="one")
self.plt_channel(plt_=plt, SP=bcg_SP, EP=bcg_EP, channel="Effort Tho", bcg_duration_new=bcg_duration_new,
show_mode="one")
self.ax3 = self.figure.add_subplot(gs[3], sharex=self.ax0)
self.plt_channel(plt_=plt, SP=bcg_SP, EP=bcg_EP, channel="Effort Abd", bcg_duration_new=bcg_duration_new, show_mode="one")
self.plt_channel(plt_=plt, SP=bcg_SP, EP=bcg_EP, channel="Effort Abd", bcg_duration_new=bcg_duration_new,
show_mode="one")
self.ax4 = self.figure.add_subplot(gs[4], sharex=self.ax0)
self.plt_channel(plt_=plt, SP=bcg_SP, EP=bcg_EP, channel="SpO2", event_code=[5], bcg_duration_new=bcg_duration_new, show_mode="one")
self.plt_channel(plt_=plt, SP=bcg_SP, EP=bcg_EP, channel="SpO2", event_code=[5],
bcg_duration_new=bcg_duration_new, show_mode="one")
self.ax5 = self.figure.add_subplot(gs[5], sharex=self.ax0)
self.plt_channel(plt_=plt, SP=bcg_SP, EP=bcg_EP, channel="orgdata",
@ -907,7 +957,8 @@ class MainWindow(QMainWindow, Ui_MainWindow):
self.plt_channel(plt_=plt, SP=bcg_SP, EP=bcg_EP, channel="0.7lowpass_resp",
event_code=[6, 7, 8, 9, 10, 1, 2, 3, 4],
event_show_under=False,
title=f"sampNo:{self.sampNo} Index:{bcg_index + 1}/{len(self.bcg_event_label_filtered_df)}", bcg_duration_new=bcg_duration_new, show_mode="one")
title=f"sampNo:{self.sampNo} Index:{bcg_index + 1}/{len(self.bcg_event_label_filtered_df)}",
bcg_duration_new=bcg_duration_new, show_mode="one")
# figManager = plt.get_current_fig_manager()
# figManager.window.showMaximized()
@ -915,7 +966,8 @@ class MainWindow(QMainWindow, Ui_MainWindow):
self.ax0.callbacks.connect('xlim_changed', lambda ax: self.on_xlim_change(ax))
def plt_channel(self, plt_, SP, EP, channel, event_code=[1, 2, 3, 4], event_show_under=False,
ax_top=True, ax_bottom=True, ax_left=True, ax_right=True, title=None, bcg_duration_new=-1, show_mode=None):
ax_top=True, ax_bottom=True, ax_left=True, ax_right=True, title=None, bcg_duration_new=-1,
show_mode=None):
"""
:param plt_:
:param SP: 显示开始秒数
@ -931,7 +983,7 @@ class MainWindow(QMainWindow, Ui_MainWindow):
"""
linestyle = "-"
SP = 0 if SP < 0 else SP
plt_.plot(np.linspace(SP, EP, (EP - SP) * self.frequency),
plt_.plot(linspace(SP, EP, (EP - SP) * self.frequency),
self.signal_select[channel][SP * self.frequency:EP * self.frequency], label=channel,
color=self.color_cycle[0])
@ -951,12 +1003,12 @@ class MainWindow(QMainWindow, Ui_MainWindow):
min_point = self.signal_select[channel][SP * self.frequency:EP * self.frequency].min()
len_segment = EP * self.frequency - SP * self.frequency
y = (min_point.repeat(len_segment) * mask).astype('float')
np.place(y, y == 0, np.nan)
place(y, y == 0, nan)
else:
y = (self.signal_select[channel][SP * self.frequency:EP * self.frequency] * mask).astype('float')
np.place(y, y == 0, np.nan)
plt_.plot(np.linspace(SP, EP, (EP - SP) * self.frequency), y, color=self.color_cycle[j],
place(y, y == 0, nan)
plt_.plot(linspace(SP, EP, (EP - SP) * self.frequency), y, color=self.color_cycle[j],
linestyle=linestyle)
plt_.legend(fontsize=8, loc=1)
if title is not None:
@ -971,42 +1023,58 @@ class MainWindow(QMainWindow, Ui_MainWindow):
# 绘制半透明矩形框
if show_mode == "one":
if not(bcg_duration_new == -1):
if self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_EventsType"] == "Obstructive apnea":
ax.axvspan(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_Start"],
self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_End"], color='red',
alpha=0.3)
elif self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_EventsType"] == "Central apnea":
ax.axvspan(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_Start"],
self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_End"], color='blue',
alpha=0.3)
elif self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_EventsType"] == "Mixed apnea":
ax.axvspan(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_Start"],
self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_End"], color='gray',
alpha=0.3)
elif self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_EventsType"] == "Hypopnea":
ax.axvspan(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_Start"],
self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_End"], color='pink',
alpha=0.3)
if not (bcg_duration_new == -1):
if self.df_saLabel.at[
self.bcg_event_label_index_list[self.plotEventIndex], "correct_EventsType"] == "Obstructive apnea":
ax.axvspan(
self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_Start"],
self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_End"],
color='red',
alpha=0.3)
elif self.df_saLabel.at[
self.bcg_event_label_index_list[self.plotEventIndex], "correct_EventsType"] == "Central apnea":
ax.axvspan(
self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_Start"],
self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_End"],
color='blue',
alpha=0.3)
elif self.df_saLabel.at[
self.bcg_event_label_index_list[self.plotEventIndex], "correct_EventsType"] == "Mixed apnea":
ax.axvspan(
self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_Start"],
self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_End"],
color='gray',
alpha=0.3)
elif self.df_saLabel.at[
self.bcg_event_label_index_list[self.plotEventIndex], "correct_EventsType"] == "Hypopnea":
ax.axvspan(
self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_Start"],
self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "correct_End"],
color='pink',
alpha=0.3)
else:
if self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "Event type"] == "Obstructive apnea":
ax.axvspan(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "Start"],
self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "End"],
if self.df_saLabel.at[
self.bcg_event_label_index_list[self.plotEventIndex], "Event type"] == "Obstructive apnea":
ax.axvspan(self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "Start"],
self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "End"],
color='red',
alpha=0.3)
elif self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "Event type"] == "Central apnea":
ax.axvspan(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "Start"],
self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "End"],
elif self.df_saLabel.at[
self.bcg_event_label_index_list[self.plotEventIndex], "Event type"] == "Central apnea":
ax.axvspan(self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "Start"],
self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "End"],
color='blue',
alpha=0.3)
elif self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "Event type"] == "Mixed apnea":
ax.axvspan(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "Start"],
self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "End"],
elif self.df_saLabel.at[
self.bcg_event_label_index_list[self.plotEventIndex], "Event type"] == "Mixed apnea":
ax.axvspan(self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "Start"],
self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "End"],
color='gray',
alpha=0.3)
elif self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "Event type"] == "Hypopnea":
ax.axvspan(self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "Start"],
self.pd.at[self.bcg_event_label_index_list[self.plotEventIndex], "End"],
elif self.df_saLabel.at[
self.bcg_event_label_index_list[self.plotEventIndex], "Event type"] == "Hypopnea":
ax.axvspan(self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "Start"],
self.df_saLabel.at[self.bcg_event_label_index_list[self.plotEventIndex], "End"],
color='pink',
alpha=0.3)
# xticks = [[]] if xticks else [range(SP, EP, 5), [str(i) for i in range(0, (EP - SP), 5)]]
@ -1030,7 +1098,7 @@ class MainWindow(QMainWindow, Ui_MainWindow):
bcg_EP = one_bcg_data["End"]
bcg_duration = bcg_EP - bcg_SP
logging.info(f"sampNo:{self.sampNo} "
info(f"sampNo:{self.sampNo} "
f"bcg[index:{bcg_index} epoch:{one_bcg_data['Epoch']} event:{one_bcg_data['Event type']}] "
f"ecg:[index:{bcg_index} epoch:{one_bcg_data['Epoch']} event:{one_bcg_data['Event type']}]")
@ -1077,7 +1145,8 @@ class MainWindow(QMainWindow, Ui_MainWindow):
self.plt_channel(plt_=plt, SP=bcg_SP, EP=bcg_EP, channel="0.7lowpass_resp",
event_code=[6, 7, 8, 9, 10, 1, 2, 3, 4],
event_show_under=False,
title=f"sampNo:{self.sampNo} Index:{bcg_index + 1}/{len(self.bcg_event_label_filtered_df)}", show_mode="new")
title=f"sampNo:{self.sampNo} Index:{bcg_index + 1}/{len(self.bcg_event_label_filtered_df)}",
show_mode="new")
self.label_BCG_event.setText(
f"sampNo:{self.sampNo} Index:{bcg_index + 1}/{len(self.bcg_event_label_filtered_df)} "
@ -1088,20 +1157,21 @@ class MainWindow(QMainWindow, Ui_MainWindow):
self.canvas.draw()
self.ax0.callbacks.connect('xlim_changed', lambda ax: self.on_xlim_change(ax))
def pd_add_new_row(self, df, score: int, remark: str, correct_Start: int, correct_End: int, correct_EventsType: str, isLabeled: int):
new_row = pd.Series(index=df.columns, data=np.nan)
def pd_add_new_row(self, df, score: int, remark: str, correct_Start: int, correct_End: int, correct_EventsType: str,
isLabeled: int):
new_row = Series(index=df.columns, data=nan)
new_row["score"] = score
new_row["remark"] = remark
new_row["correct_Start"] = correct_Start
new_row["correct_End"] = correct_End
new_row["correct_EventsType"] = correct_EventsType
new_row["isLabeled"] = isLabeled
return pd.concat([df, pd.DataFrame([new_row])], ignore_index=True)
return concat([df, DataFrame([new_row])], ignore_index=True)
# 此线程类用于更新textBrowser的内容
class Thread_textbrowserUpdate(QtCore.QThread):
trigger = QtCore.pyqtSignal(str)
class Thread_textbrowserUpdate(QThread):
trigger = pyqtSignal(str)
def __init__(self):
super(Thread_textbrowserUpdate, self).__init__()
@ -1109,6 +1179,7 @@ class Thread_textbrowserUpdate(QtCore.QThread):
def run_(self, message):
self.trigger.emit(message)
# 此类用于继承matplot的NavigationToolbar类然后写回调函数
class CustomNavigationToolbar(NavigationToolbar2QT):
def __init__(self, canvas, parent, callback):
@ -1120,9 +1191,10 @@ class CustomNavigationToolbar(NavigationToolbar2QT):
if self.callback:
self.callback() # 执行自定义回调
# 主函数
if __name__ == '__main__':
app = QtWidgets.QApplication(sys.argv)
app = QApplication(sys.argv)
mainWindow = MainWindow()
mainWindow.show()
sys.exit(app.exec_())

View File

@ -40,7 +40,7 @@ df_PSG_label['End'] = df_PSG_label['Start'] + df_PSG_label['Duration'].astype(fl
print(df_PSG_label)
# 写入csv文件
df_PSG_label.to_csv(r"E:\data_annotation\6SleepApnea_annotation_GUI_demo\data\BCG_label\export" + str(sampID) + "_all.csv", index=False, encoding="gbk")
df_PSG_label.to_csv(dir_path + r"\BCG_label\export" + str(sampID) + "_all.csv", index=False, encoding="gbk")
# 打印结果
print("sampID_" + str(sampID) + "写入csv成功")

View File

@ -6,174 +6,27 @@
@ illustration: Pre-processing
"""
import sys
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.signal import butter, lfilter, filtfilt
import pywt
from scipy import signal
from scipy import fftpack
def Butterworth(data, sample_rate, type, low_cut=0.0, high_cut=0.0, order=10):
"""
:param type: Type of Butter. filter, lowpass, bandpass, ...
:param lowcut: Low cutoff frequency
:param highcut: High cutoff frequency
:param order: Order of filter
:return: Signal after filtering
"""
if type == "lowpass": # 低通滤波处理
b, a = butter(order, low_cut / (sample_rate * 0.5), btype='lowpass', output='ba')
return filtfilt(b, a, data)
elif type == "bandpass": # 带通滤波处理
low = low_cut / (sample_rate * 0.5)
high = high_cut / (sample_rate * 0.5)
b, a = butter(order, [low, high], btype='bandpass', output='ba')
return filtfilt(b, a, data)
elif type == "highpass": # 高通滤波处理
b, a = butter(order, high_cut / (sample_rate * 0.5), btype='highpass', output='ba')
return filtfilt(b, a, data)
else: # 警告,滤波器类型必须有
raise ValueError("Please choose a type of fliter")
def Dilate(x, N, g, M):
returndata = np.array([])
for num in range(N - M + 1):
returndata = np.append(returndata, np.min(np.array(x[num:num + M]) - np.array(g)))
return returndata
def Eorde(x, N, g, M):
returndata = np.array([])
for num in range(N - M + 1):
returndata = np.append(returndata, np.max(np.array(x[num:num + M]) - np.array(g)))
return returndata
def fin_turn(data, peak):
if len(data) == 0 or len(peak) == 0: return peak
return_peak = []
for p in peak:
minx, maxx = max(0, p - 100), min(len(data), p + 100)
return_peak.append(minx + np.argmax(data[minx: maxx]))
return return_peak
class BCG_Operation():
def __init__(self, sample_rate=1000):
self.sample_rate = sample_rate
def down_sample(self, data=None, down_radio=10):
if data is None:
raise ValueError("data is None, please given an real value!")
data = data[:len(data) // down_radio * down_radio].reshape(-1, down_radio)[:, 0]
self.sample_rate = self.sample_rate / down_radio
return data
def Splitwin(self, data=None, len_win=None, coverage=1.0, calculate_to_end=False):
"""
分窗
:param len_win: length of window
:return: signal windows
"""
if (len_win is None) or (data is None):
raise ValueError("length of window or data is None, please given an real value!")
else:
length = len_win * self.sample_rate # number point of a window
# step of split windows
step = length * coverage
start = 0
Splitdata = []
while (len(data) - start >= length):
Splitdata.append(data[int(start):int(start + length)])
start += step
if calculate_to_end and (len(data) - start > 2000):
remain = len(data) - start
start = start - step
step = int(remain / 2000)
start = start + step * 2000
Splitdata.append(data[int(start):int(start + length)])
return np.array(Splitdata), step
elif calculate_to_end:
return np.array(Splitdata), 0
else:
return np.array(Splitdata)
def Butterworth(self, data, type, low_cut=0.0, high_cut=0.0, order=10):
"""
:param type: Type of Butter. filter, lowpass, bandpass, ...
:param lowcut: Low cutoff frequency
:param highcut: High cutoff frequency
:param order: Order of filter
:return: Signal after filtering
"""
if type == "lowpass": # 低通滤波处理
b, a = signal.butter(order, low_cut / (self.sample_rate * 0.5), btype='lowpass')
return signal.filtfilt(b, a, np.array(data))
elif type == "bandpass": # 带通滤波处理
low = low_cut / (self.sample_rate * 0.5)
high = high_cut / (self.sample_rate * 0.5)
b, a = signal.butter(order, [low, high], btype='bandpass')
return signal.filtfilt(b, a, np.array(data))
elif type == "highpass": # 高通滤波处理
b, a = signal.butter(order, high_cut / (self.sample_rate * 0.5), btype='highpass')
return signal.filtfilt(b, a, np.array(data))
else: # 警告,滤波器类型必须有
raise ValueError("Please choose a type of fliter")
def MorphologicalFilter(self, data=None, M=200, get_bre=False):
"""
:param data: Input signal
:param M: Length of structural element
:return: Signal after filter
"""
if not data.any():
raise ValueError("The input data is None, please given real value data")
g = np.ones(M)
Data_pre = np.insert(data, 0, np.zeros(M))
Data_pre = np.insert(Data_pre, -1, np.zeros(M))
# Opening: 腐蚀 + 膨胀
out1 = Eorde(Data_pre, len(Data_pre), g, M)
out2 = Dilate(out1, len(out1), g, M)
out2 = np.insert(out2, 0, np.zeros(M - 2))
# Closing: 膨胀 + 腐蚀
out5 = Dilate(Data_pre, len(Data_pre), g, M)
out6 = Eorde(out5, len(out5), g, M)
out6 = np.insert(out6, 0, np.zeros(M - 2))
baseline = (out2 + out6) / 2
# -------------------------保留剩余价值------------------------
data_filtered = Data_pre[:len(baseline)] - baseline
data_filtered = data_filtered[M: M + len(data)]
baseline = baseline[M:]
data_filtered[-1] = data_filtered[-2] = data_filtered[-3]
baseline[-1] = baseline[-2] = baseline[-3]
if get_bre:
return data_filtered, baseline
else:
return data_filtered
def Iirnotch(self, data=None, cut_fre=50, quality=3):
"""陷波器"""
b, a = signal.iirnotch(cut_fre / (self.sample_rate * 0.5), quality)
return signal.filtfilt(b, a, np.array(data))
def ChebyFilter(self, data, rp=1, type=None, low_cut=0, high_cut=0, order=10):
"""
切比雪夫滤波器
:param data: Input signal
:param rp: The maximum ripple allowed
:param type: 'lowpass', 'bandpass, 'highpass'
:param low_cut: Low cut-off fre
:param high_cut: High cut-off fre
:param order: The order of filter
:return: Signal after filter
"""
if type == 'lowpass':
b, a = signal.cheby1(order, rp, low_cut, btype='lowpass', fs=self.sample_rate)
return signal.filtfilt(b, a, np.array(data))
elif type == 'bandpass':
b, a = signal.cheby1(order, rp, [low_cut, high_cut], btype='bandpass', fs=self.sample_rate)
return signal.filtfilt(b, a, np.array(data))
elif type == 'highpass':
b, a = signal.cheby1(order, rp, high_cut, btype='highpass', fs=self.sample_rate)
return signal.filtfilt(b, a, np.array(data))
else:
raise ValueError("The type of filter is None, please given the real value!")
def Envelope(self, data):
"""取信号包络"""
if len(data) <= 1: raise ValueError("Wrong input data")
hx = fftpack.hilbert(data)
return np.sqrt(hx ** 2, data ** 2)
def wavelet_trans(self, data, c_level=['aaa', 'aad'], wavelet='db4', mode='symmetric', maxlevel=10):
wp = pywt.WaveletPacket(data=data, wavelet=wavelet, mode=mode, maxlevel=maxlevel)
new_wp = pywt.WaveletPacket(data=None, wavelet=wavelet, mode=mode)
for c in c_level:
new_wp[c] = wp[c]
return new_wp.reconstruct()
# def em_decomposition(self, data):
# from pyhht.emd import EMD
# return EMD(data).decompose()