1098 lines
50 KiB
Python
1098 lines
50 KiB
Python
#!/usr/bin/python
|
||
# -*- coding: UTF-8 -*-
|
||
"""
|
||
@author:andrew
|
||
@file:RespCoarseAlign.py
|
||
@email:admin@marques22.com
|
||
@email:2021022362@m.scnu.edu.cn
|
||
@time:2023/09/20
|
||
"""
|
||
|
||
import sys
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
import pyedflib
|
||
import numpy as np
|
||
import pandas as pd
|
||
from PySide6.QtGui import QPixmap, QImage
|
||
from PySide6.QtWidgets import QApplication, QMainWindow, QFileDialog, QMessageBox, QWidget, QPushButton
|
||
from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas
|
||
from matplotlib.figure import Figure
|
||
from numba import njit, prange
|
||
from scipy import signal
|
||
|
||
from ui.Mian import Ui_mainWindow as Ui_respCoarseAlign
|
||
from ui.setings import Ui_MainWindow as Ui_Setting
|
||
import yaml
|
||
|
||
Conf = {
|
||
"PSGConfig": {
|
||
"Path": "./Data/PSG/",
|
||
"Frequency": 100,
|
||
"THOChannel": {
|
||
"auto": True,
|
||
"Channel": 3,
|
||
},
|
||
"ABDChannel": {
|
||
"auto": True,
|
||
"Channel": 4,
|
||
},
|
||
},
|
||
"XXConfig": {
|
||
"Path": "./Data/XX/",
|
||
"Frequency": 100,
|
||
},
|
||
"RespFilterConfig": {
|
||
"LowCut": 0.01,
|
||
"HighCut": 0.7,
|
||
"Order": 4
|
||
},
|
||
"ApplyFrequency": 5
|
||
}
|
||
|
||
ButtonState = {
|
||
"Default": {
|
||
"pushButton_Refresh": True,
|
||
"pushButton_OpenFile": True,
|
||
"pushButton_Standardize": False,
|
||
"pushButton_CutOff": False,
|
||
"pushButton_GetPos": False,
|
||
"pushButton_JUMP": False,
|
||
"pushButton_EM1": False,
|
||
"pushButton_EM10": False,
|
||
"pushButton_EM100": False,
|
||
"pushButton_EP1": False,
|
||
"pushButton_EP10": False,
|
||
"pushButton_EP100": False,
|
||
"pushButton_SaveInfo": False,
|
||
"pushButton_ReadInfo": False,
|
||
"pushButton_Exit": True},
|
||
"Current": {
|
||
"pushButton_Refresh": True,
|
||
"pushButton_OpenFile": True,
|
||
"pushButton_Standardize": False,
|
||
"pushButton_CutOff": False,
|
||
"pushButton_GetPos": False,
|
||
"pushButton_JUMP": False,
|
||
"pushButton_EM1": False,
|
||
"pushButton_EM10": False,
|
||
"pushButton_EM100": False,
|
||
"pushButton_EP1": False,
|
||
"pushButton_EP10": False,
|
||
"pushButton_EP100": False,
|
||
"pushButton_SaveInfo": False,
|
||
"pushButton_ReadInfo": False,
|
||
"pushButton_Exit": True}
|
||
}
|
||
|
||
|
||
class SettingWindow(QMainWindow):
|
||
def __init__(self):
|
||
super(SettingWindow, self).__init__()
|
||
self.ui = Ui_Setting()
|
||
self.ui.setupUi(self)
|
||
self.__read_settings__()
|
||
|
||
def __read_settings__(self):
|
||
if not Path("./config.yaml").exists():
|
||
with open("./config.yaml", "w") as f:
|
||
yaml.dump(Conf, f)
|
||
|
||
with open("./config.yaml", "r") as f:
|
||
fileConfig = yaml.load(f.read(), Loader=yaml.FullLoader)
|
||
Conf.update(fileConfig)
|
||
# print(Conf)
|
||
self.ui.lineEdit_PSGFilePath.setText(Conf["PSGConfig"]["Path"])
|
||
self.ui.lineEdit_XXFilePath.setText(Conf["XXConfig"]["Path"])
|
||
self.ui.spinBox_PSGDefaultFreq.setValue(Conf["PSGConfig"]["Frequency"])
|
||
self.ui.spinBox_XXDefaultFreq.setValue(Conf["XXConfig"]["Frequency"])
|
||
self.ui.QSpinBox_ApplyFre.setValue(Conf["ApplyFrequency"])
|
||
autoTHO = Conf["PSGConfig"]["THOChannel"]["auto"]
|
||
self.ui.checkBox_THOautoChannel.setChecked(2 if autoTHO else 0)
|
||
autoABD = Conf["PSGConfig"]["ABDChannel"]["auto"]
|
||
self.ui.checkBox_ABDautoChannel.setChecked(2 if autoABD else 0)
|
||
self.ui.spinBox_THOcustomChannel.setValue(Conf["PSGConfig"]["THOChannel"]["Channel"])
|
||
self.ui.spinBox_ABDcustomChannel.setValue(Conf["PSGConfig"]["ABDChannel"]["Channel"])
|
||
self.ui.doubleSpinBox_ButterLowPassFreq.setValue(Conf["RespFilterConfig"]["LowCut"])
|
||
self.ui.doubleSpinBox_ButterHighPassFreq.setValue(Conf["RespFilterConfig"]["HighCut"])
|
||
self.ui.spinBox_ButterOrder.setValue(Conf["RespFilterConfig"]["Order"])
|
||
self.ui.spinBox_THOcustomChannel.setEnabled(not autoTHO)
|
||
self.ui.spinBox_ABDcustomChannel.setEnabled(not autoABD)
|
||
|
||
# 绑定事件
|
||
self.ui.toolButton_PSGFilePath.clicked.connect(self.__select_file__)
|
||
self.ui.toolButton_XXFilePath.clicked.connect(self.__select_file__)
|
||
|
||
self.ui.pushButton_SaveConfig.clicked.connect(self.__write_settings__)
|
||
self.ui.pushButton_Cancel.clicked.connect(self.close)
|
||
|
||
# ABD auto checkbox和SpinBox 互斥
|
||
self.ui.checkBox_ABDautoChannel.stateChanged.connect(self.__ABDAutoChannel__)
|
||
self.ui.checkBox_THOautoChannel.stateChanged.connect(self.__THOAutoChannel__)
|
||
|
||
def __ABDAutoChannel__(self, state):
|
||
if state == 2:
|
||
self.ui.spinBox_ABDcustomChannel.setEnabled(False)
|
||
else:
|
||
self.ui.spinBox_ABDcustomChannel.setEnabled(True)
|
||
|
||
def __THOAutoChannel__(self, state):
|
||
if state == 2:
|
||
self.ui.spinBox_THOcustomChannel.setEnabled(False)
|
||
else:
|
||
self.ui.spinBox_THOcustomChannel.setEnabled(True)
|
||
|
||
def __select_file__(self, event):
|
||
sender = self.sender()
|
||
if sender.objectName() == "toolButton_PSGFilePath":
|
||
path = QFileDialog.getExistingDirectory(self, "选择PSG数据文件夹", "./Data/PSG/")
|
||
self.ui.lineEdit_PSGFilePath.setText(path)
|
||
elif sender.objectName() == "toolButton_XXFilePath":
|
||
path = QFileDialog.getExistingDirectory(self, "选择XX数据文件夹", "./Data/XX/")
|
||
self.ui.lineEdit_XXFilePath.setText(path)
|
||
|
||
def __write_settings__(self):
|
||
# 从界面读取配置
|
||
Conf["PSGConfig"]["Path"] = self.ui.lineEdit_PSGFilePath.text()
|
||
Conf["XXConfig"]["Path"] = self.ui.lineEdit_XXFilePath.text()
|
||
Conf["PSGConfig"]["Frequency"] = self.ui.spinBox_PSGDefaultFreq.value()
|
||
Conf["XXConfig"]["Frequency"] = self.ui.spinBox_XXDefaultFreq.value()
|
||
Conf["ApplyFrequency"] = self.ui.QSpinBox_ApplyFre.value()
|
||
Conf["PSGConfig"]["THOChannel"]["auto"] = self.ui.checkBox_THOautoChannel.isChecked()
|
||
Conf["PSGConfig"]["ABDChannel"]["auto"] = self.ui.checkBox_ABDautoChannel.isChecked()
|
||
Conf["PSGConfig"]["THOChannel"]["Channel"] = self.ui.spinBox_THOcustomChannel.value()
|
||
Conf["PSGConfig"]["ABDChannel"]["Channel"] = self.ui.spinBox_ABDcustomChannel.value()
|
||
Conf["RespFilterConfig"]["LowCut"] = self.ui.doubleSpinBox_ButterLowPassFreq.value()
|
||
Conf["RespFilterConfig"]["HighCut"] = self.ui.doubleSpinBox_ButterHighPassFreq.value()
|
||
Conf["RespFilterConfig"]["Order"] = self.ui.spinBox_ButterOrder.value()
|
||
|
||
with open("./config.yaml", "w") as f:
|
||
yaml.dump(Conf, f)
|
||
|
||
self.close()
|
||
|
||
|
||
class Data:
|
||
def __init__(self, PSGDataPath, XXDataPath, sampNo, Config):
|
||
self.PSGDataPath = PSGDataPath / f"{sampNo}.edf"
|
||
if (XXDataPath / f"{sampNo}.npy").exists():
|
||
self.XXDataPath = XXDataPath / f"{sampNo}.npy"
|
||
elif (XXDataPath / f"{sampNo}.txt").exists():
|
||
self.XXDataPath = XXDataPath / f"{sampNo}.txt"
|
||
else:
|
||
self.XXDataPath = None
|
||
|
||
self.Config = Config
|
||
|
||
self.raw_THO = None
|
||
self.raw_ABD = None
|
||
self.raw_XX = None
|
||
|
||
self.processed_THO = None
|
||
self.processed_ABD = None
|
||
self.processed_XX = None
|
||
|
||
self.PSG_minutes = None
|
||
self.XX_minutes = None
|
||
|
||
def OpenFile(self):
|
||
# 判断是edf还是npy或txt
|
||
if self.PSGDataPath.suffix == ".edf":
|
||
PSG = pyedflib.EdfReader(self.PSGDataPath.__str__())
|
||
if self.Config["PSGConfig"]["THOChannel"]["auto"]:
|
||
self.raw_THO = PSG.readSignal(PSG.getSignalLabels().index('Effort THO'))
|
||
else:
|
||
self.raw_THO = PSG.readSignal(self.Config["PSGConfig"]["THOChannel"]["Channel"])
|
||
if self.Config["PSGConfig"]["ABDChannel"]["auto"]:
|
||
self.raw_ABD = PSG.readSignal(PSG.getSignalLabels().index('Effort ABD'))
|
||
else:
|
||
self.raw_ABD = PSG.readSignal(self.Config["PSGConfig"]["ABDChannel"]["Channel"])
|
||
PSG.close()
|
||
else:
|
||
return False, "PSG文件格式错误"
|
||
|
||
if self.XXDataPath.suffix == ".npy":
|
||
self.raw_XX = np.load(self.XXDataPath)
|
||
elif self.XXDataPath.suffix == ".txt":
|
||
self.raw_XX = pd.read_csv(self.XXDataPath, sep="\t", header=None).values
|
||
else:
|
||
return False, "XX文件格式错误"
|
||
|
||
# 获取时长
|
||
# print(self.raw_THO.shape, self.raw_XX.shape)
|
||
# print(self.Config["PSGConfig"]["Frequency"], self.Config["XXConfig"]["Frequency"])
|
||
self.PSG_minutes = round(self.raw_THO.shape[0] / self.Config["PSGConfig"]["Frequency"] / 60)
|
||
self.XX_minutes = round(self.raw_XX.shape[0] / self.Config["XXConfig"]["Frequency"] / 60)
|
||
|
||
return True, "读取成功"
|
||
|
||
def __Filter__(self):
|
||
def butter_bandpass_filter(data, lowCut, highCut, fs, order):
|
||
low = lowCut / (fs * 0.5)
|
||
high = highCut / (fs * 0.5)
|
||
sos = signal.butter(order, [low, high], btype="bandpass", output='sos')
|
||
return signal.sosfilt(sos, data)
|
||
|
||
# 滤波
|
||
self.processed_THO = butter_bandpass_filter(self.raw_THO, self.Config["RespFilterConfig"]["LowCut"],
|
||
self.Config["RespFilterConfig"]["HighCut"],
|
||
self.Config["PSGConfig"]["Frequency"],
|
||
self.Config["RespFilterConfig"]["Order"])
|
||
self.processed_ABD = butter_bandpass_filter(self.raw_ABD, self.Config["RespFilterConfig"]["LowCut"],
|
||
self.Config["RespFilterConfig"]["HighCut"],
|
||
self.Config["PSGConfig"]["Frequency"],
|
||
self.Config["RespFilterConfig"]["Order"])
|
||
self.processed_XX = butter_bandpass_filter(self.raw_XX, self.Config["RespFilterConfig"]["LowCut"],
|
||
self.Config["RespFilterConfig"]["HighCut"],
|
||
self.Config["XXConfig"]["Frequency"],
|
||
self.Config["RespFilterConfig"]["Order"])
|
||
|
||
return
|
||
|
||
def Standardize_0(self):
|
||
# 重采样
|
||
self.processed_THO = signal.resample(self.raw_THO, int(self.PSG_minutes * 60 * self.Config["ApplyFrequency"]))
|
||
self.processed_ABD = signal.resample(self.raw_ABD, int(self.PSG_minutes * 60 * self.Config["ApplyFrequency"]))
|
||
self.processed_XX = signal.resample(self.raw_XX, int(self.XX_minutes * 60 * self.Config["ApplyFrequency"]))
|
||
return True, "原始信号仅重采样"
|
||
|
||
def Standardize_1(self):
|
||
self.__Filter__()
|
||
return True, "呼吸提取完成 "
|
||
|
||
def Standardize_2(self):
|
||
# 如果XX采样率大于PSG采样率,那么XX重采样到PSG采样率
|
||
if self.Config["XXConfig"]["Frequency"] > self.Config["PSGConfig"]["Frequency"]:
|
||
# 用[::]完成
|
||
self.processed_XX = self.processed_XX[
|
||
::int(self.Config["XXConfig"]["Frequency"] / self.Config["PSGConfig"]["Frequency"])]
|
||
# 如果XX采样率小于PSG采样率,那么XX重采样到PSG采样率
|
||
elif self.Config["XXConfig"]["Frequency"] < self.Config["PSGConfig"]["Frequency"] < 100:
|
||
# 用repeat完成
|
||
self.processed_XX = np.repeat(self.processed_XX,
|
||
int(self.Config["PSGConfig"]["Frequency"] / self.Config["XXConfig"][
|
||
"Frequency"]),
|
||
axis=0)
|
||
# 修改Config
|
||
self.Config.update({"temp_frequency": self.Config["PSGConfig"]["Frequency"]})
|
||
return True, "预重采样完成"
|
||
|
||
def Standardize_3(self):
|
||
temp_frequency = self.Config["temp_frequency"]
|
||
# 判断是否去基线
|
||
if self.Config["PSGConfig"]["PSGDelBase"]:
|
||
# 减去四秒钟平均滤波
|
||
self.processed_THO = self.processed_THO - np.convolve(self.processed_THO,
|
||
np.ones(int(4 * temp_frequency)) / int(
|
||
4 * temp_frequency),
|
||
mode='same')
|
||
self.processed_ABD = self.processed_ABD - np.convolve(self.processed_ABD,
|
||
np.ones(int(4 * temp_frequency)) / int(
|
||
4 * temp_frequency),
|
||
mode='same')
|
||
if self.Config["XXConfig"]["XXDelBase"]:
|
||
self.processed_XX = self.processed_XX - np.convolve(self.processed_XX,
|
||
np.ones(int(4 * temp_frequency)) / int(
|
||
4 * temp_frequency),
|
||
mode='same')
|
||
return True, "去基线完成"
|
||
|
||
def Standardize_4(self):
|
||
# 判断是否标准化
|
||
if self.Config["PSGConfig"]["PSGZScore"]:
|
||
self.processed_THO = (self.processed_THO - np.mean(self.processed_THO)) / np.std(self.processed_THO)
|
||
self.processed_ABD = (self.processed_ABD - np.mean(self.processed_ABD)) / np.std(self.processed_ABD)
|
||
if self.Config["XXConfig"]["XXZScore"]:
|
||
self.processed_XX = (self.processed_XX - np.mean(self.processed_XX)) / np.std(self.processed_XX)
|
||
|
||
return True, "标准化完成"
|
||
|
||
def Standardize_5(self):
|
||
# 重采样 用[::]完成
|
||
temp_frequency = self.Config["temp_frequency"]
|
||
self.processed_THO = self.processed_THO[::int(temp_frequency / self.Config["ApplyFrequency"])]
|
||
self.processed_ABD = self.processed_ABD[::int(temp_frequency / self.Config["ApplyFrequency"])]
|
||
self.processed_XX = self.processed_XX[::int(temp_frequency / self.Config["ApplyFrequency"])]
|
||
|
||
# self.processed_THO = signal.resample(self.processed_THO,
|
||
# int(self.PSG_minutes * 60 * self.Config["ApplyFrequency"]))
|
||
# self.processed_ABD = signal.resample(self.processed_ABD,
|
||
# int(self.PSG_minutes * 60 * self.Config["ApplyFrequency"]))
|
||
# self.processed_XX = signal.resample(self.processed_XX,
|
||
# int(self.XX_minutes * 60 * self.Config["ApplyFrequency"]))
|
||
|
||
return True, "最终重采样完成"
|
||
|
||
@staticmethod
|
||
def DrawPicCorrelate(tho_pxx, tho_nxx, abd_pxx, abd_nxx):
|
||
fig = Figure(figsize=(8, 7), dpi=100)
|
||
canvas = FigureCanvas(fig)
|
||
ax1 = fig.add_subplot(221)
|
||
ax1.plot(tho_pxx, color='blue')
|
||
ax1.set_title("The Correlation of THO and XinXiao")
|
||
|
||
ax2 = fig.add_subplot(222)
|
||
ax2.plot(tho_nxx, color='blue')
|
||
ax2.set_title("The Correlation of THO and Reverse XinXiao")
|
||
|
||
ax3 = fig.add_subplot(223)
|
||
ax3.plot(abd_pxx, color='blue')
|
||
ax3.set_title("The Correlation of ABD and XinXiao")
|
||
|
||
ax4 = fig.add_subplot(224)
|
||
ax4.plot(abd_nxx, color='blue')
|
||
ax4.set_title("The Correlation of ABD and Reverse XinXiao")
|
||
|
||
width, height = fig.figbbox.width, fig.figbbox.height
|
||
fig.canvas.draw()
|
||
# 返回图片以便存到QPixImage
|
||
return canvas.buffer_rgba(), width, height
|
||
|
||
def DrawPicRawOverview(self):
|
||
fig = Figure(figsize=(8, 7), dpi=100)
|
||
canvas = FigureCanvas(fig)
|
||
max_x = max(self.processed_THO.shape[0], self.processed_ABD.shape[0], self.processed_XX.shape[0])
|
||
ax1 = fig.add_subplot(311)
|
||
ax1.plot(self.processed_THO, color='blue')
|
||
ax1.set_xlim(0, max_x)
|
||
ax1.set_title("THO")
|
||
|
||
ax2 = fig.add_subplot(312)
|
||
ax2.plot(self.processed_XX, color='blue')
|
||
ax2.set_xlim(0, max_x)
|
||
ax2.set_title("xinxiao")
|
||
|
||
ax3 = fig.add_subplot(313)
|
||
ax3.plot(self.processed_ABD, color='blue')
|
||
ax3.set_xlim(0, max_x)
|
||
ax3.set_title("ABD")
|
||
|
||
width, height = fig.figbbox.width, fig.figbbox.height
|
||
fig.canvas.draw()
|
||
# 返回图片以便存到QPixImage
|
||
return canvas.buffer_rgba(), width, height
|
||
|
||
def DrawPicOverviewWithCutOff(self):
|
||
fig = Figure(figsize=(8, 7), dpi=100)
|
||
canvas = FigureCanvas(fig)
|
||
max_x = max(self.processed_THO.shape[0] + self.Config["PSGConfig"]["PreA"],
|
||
self.processed_XX.shape[0] + self.Config["XXConfig"]["PreA"])
|
||
min_x = min(self.Config["PSGConfig"]["PreA"], self.Config["XXConfig"]["PreA"], 0)
|
||
ax1 = fig.add_subplot(311)
|
||
ax1.plot(
|
||
np.linspace(self.Config["PSGConfig"]["PreA"], len(self.processed_THO) + self.Config["PSGConfig"]["PreA"],
|
||
len(self.processed_THO)), self.processed_THO, color='blue')
|
||
# 绘制x = PreCut的线 和 x = PostCut的虚线
|
||
ax1.axvline(x=self.Config["PSGConfig"]["PreCut"] + self.Config["PSGConfig"]["PreA"], color='red',
|
||
linestyle='--')
|
||
ax1.axvline(x=len(self.processed_THO) - self.Config["PSGConfig"]["PostCut"] + self.Config["PSGConfig"]["PreA"],
|
||
color='red', linestyle='--')
|
||
|
||
ax1.set_xlim(min_x, max_x)
|
||
ax1.set_title("THO")
|
||
|
||
ax2 = fig.add_subplot(312)
|
||
ax2.plot(np.linspace(self.Config["XXConfig"]["PreA"], len(self.processed_XX) + self.Config["XXConfig"]["PreA"],
|
||
len(self.processed_XX)), self.processed_XX, color='blue')
|
||
ax2.axvline(x=self.Config["XXConfig"]["PreCut"] + self.Config["XXConfig"]["PreA"], color='red', linestyle='--')
|
||
ax2.axvline(x=len(self.processed_XX) - self.Config["XXConfig"]["PostCut"] + self.Config["XXConfig"]["PreA"],
|
||
color='red', linestyle='--')
|
||
ax2.set_xlim(min_x, max_x)
|
||
ax2.set_title("xinxiao")
|
||
|
||
ax3 = fig.add_subplot(313)
|
||
ax3.plot(
|
||
np.linspace(self.Config["PSGConfig"]["PreA"], len(self.processed_ABD) + self.Config["PSGConfig"]["PreA"],
|
||
len(self.processed_ABD)), self.processed_ABD, color='blue')
|
||
ax3.axvline(x=self.Config["PSGConfig"]["PreCut"] + self.Config["PSGConfig"]["PreA"], color='red',
|
||
linestyle='--')
|
||
ax3.axvline(x=len(self.processed_THO) - self.Config["PSGConfig"]["PostCut"] + self.Config["PSGConfig"]["PreA"],
|
||
color='red', linestyle='--')
|
||
ax3.set_xlim(min_x, max_x)
|
||
ax3.set_title("ABD")
|
||
|
||
width, height = fig.figbbox.width, fig.figbbox.height
|
||
fig.canvas.draw()
|
||
# 返回图片以便存到QPixImage
|
||
return canvas.buffer_rgba(), width, height
|
||
|
||
def DrawPicTryAlign(self):
|
||
fig = Figure(figsize=(8, 7), dpi=100)
|
||
canvas = FigureCanvas(fig)
|
||
max_x = max(self.processed_THO.shape[0],
|
||
self.processed_XX.shape[0] + self.Config["pos"])
|
||
min_x = min(self.Config["PSGConfig"]["PreA"], self.Config["XXConfig"]["PreA"] + self.Config["pos"], 0)
|
||
|
||
ax1 = fig.add_subplot(311)
|
||
ax1.plot(
|
||
np.linspace(0, len(self.processed_THO),
|
||
len(self.processed_THO)), self.processed_THO, color='blue')
|
||
# 绘制x = PreCut的线 和 x = PostCut的虚线
|
||
ax1.set_xlim(min_x, max_x)
|
||
ax1.set_title("THO")
|
||
|
||
ax2 = fig.add_subplot(312)
|
||
ax2.plot(np.linspace(self.Config["pos"],
|
||
len(self.processed_XX) + self.Config["pos"],
|
||
len(self.processed_XX)), self.processed_XX, color='blue')
|
||
ax2.set_xlim(min_x, max_x)
|
||
ax2.set_title("xinxiao")
|
||
|
||
ax3 = fig.add_subplot(313)
|
||
ax3.plot(
|
||
np.linspace(0, len(self.processed_ABD),
|
||
len(self.processed_ABD)), self.processed_ABD, color='blue')
|
||
ax3.set_xlim(min_x, max_x)
|
||
ax3.set_title("ABD")
|
||
|
||
width, height = fig.figbbox.width, fig.figbbox.height
|
||
fig.canvas.draw()
|
||
# 返回图片以便存到QPixImage
|
||
return canvas.buffer_rgba(), width, height
|
||
|
||
def DrawPicByEpoch(self, epoch):
|
||
|
||
PSG_SP = epoch * 30 * self.Config["ApplyFrequency"]
|
||
PSG_EP = (epoch + 6) * 30 * self.Config["ApplyFrequency"]
|
||
|
||
XX_SP = PSG_SP - self.Config["pos"]
|
||
XX_EP = PSG_EP - self.Config["pos"]
|
||
|
||
tho_seg = self.processed_THO[PSG_SP:PSG_EP]
|
||
xx_seg = self.processed_XX[XX_SP:XX_EP] * self.Config["XX_reverse"]
|
||
abd_seg = self.processed_ABD[PSG_SP:PSG_EP]
|
||
|
||
fig = Figure(figsize=(8, 7), dpi=100)
|
||
canvas = FigureCanvas(fig)
|
||
# 根据PSG来和绘制
|
||
ax1 = fig.add_subplot(321)
|
||
ax1.plot(np.linspace(PSG_SP, PSG_EP, len(tho_seg)), tho_seg)
|
||
tho_peaks, _ = signal.find_peaks(tho_seg, prominence=0, distance=3 * self.Config["ApplyFrequency"])
|
||
ax1.plot(np.linspace(PSG_SP, PSG_EP, len(tho_seg))[tho_peaks], tho_seg[tho_peaks], "x")
|
||
|
||
ax3 = fig.add_subplot(323)
|
||
ax3.plot(np.linspace(XX_SP, XX_EP, len(xx_seg)), xx_seg)
|
||
xx_peaks, _ = signal.find_peaks(xx_seg, prominence=0, distance=3 * self.Config["ApplyFrequency"])
|
||
ax3.plot(np.linspace(XX_SP, XX_EP, len(xx_seg))[xx_peaks], xx_seg[xx_peaks], "x")
|
||
|
||
ax2 = fig.add_subplot(325)
|
||
ax2.plot(np.linspace(PSG_SP, PSG_EP, len(abd_seg)), abd_seg)
|
||
abd_peaks, _ = signal.find_peaks(abd_seg, prominence=0, distance=3 * self.Config["ApplyFrequency"])
|
||
ax2.plot(np.linspace(PSG_SP, PSG_EP, len(abd_seg))[abd_peaks], abd_seg[abd_peaks], "x")
|
||
|
||
# 绘制间期
|
||
ax4 = fig.add_subplot(322)
|
||
ax4.plot(np.linspace(PSG_SP, PSG_EP, len(np.diff(tho_peaks).repeat(self.Config["ApplyFrequency"]))),
|
||
np.diff(tho_peaks).repeat(self.Config["ApplyFrequency"]), alpha=0.5, label="tho")
|
||
ax4.plot(np.linspace(PSG_SP, PSG_EP, len(np.diff(xx_peaks).repeat(self.Config["ApplyFrequency"]))),
|
||
np.diff(xx_peaks).repeat(self.Config["ApplyFrequency"]), label="resp")
|
||
ax4.set_title("tho_interval")
|
||
ax4.legend()
|
||
ax4.set_ylim((10, 50))
|
||
|
||
ax5 = fig.add_subplot(324)
|
||
ax5.plot(np.linspace(XX_SP, XX_EP, len(np.diff(tho_peaks).repeat(self.Config["ApplyFrequency"]))),
|
||
np.diff(tho_peaks).repeat(self.Config["ApplyFrequency"]))
|
||
ax5.plot(np.linspace(XX_SP, XX_EP, len(np.diff(xx_peaks).repeat(self.Config["ApplyFrequency"]))),
|
||
np.diff(xx_peaks).repeat(self.Config["ApplyFrequency"]), label="resp")
|
||
ax5.set_title("resp_interval")
|
||
ax5.set_ylim((10, 50))
|
||
|
||
ax6 = fig.add_subplot(326)
|
||
ax6.plot(np.linspace(PSG_SP, PSG_EP, len(np.diff(abd_peaks).repeat(self.Config["ApplyFrequency"]))),
|
||
np.diff(abd_peaks).repeat(self.Config["ApplyFrequency"]))
|
||
ax6.plot(np.linspace(PSG_SP, PSG_EP, len(np.diff(xx_peaks).repeat(self.Config["ApplyFrequency"]))),
|
||
np.diff(xx_peaks).repeat(self.Config["ApplyFrequency"]), label="resp")
|
||
ax6.set_title("abd_interval")
|
||
ax6.set_ylim((10, 50))
|
||
|
||
width, height = fig.figbbox.width, fig.figbbox.height
|
||
fig.canvas.draw()
|
||
# 返回图片以便存到QPixImage
|
||
return canvas.buffer_rgba(), width, height
|
||
|
||
|
||
@njit("int64[:](int64[:],int64[:], int32[:])", nogil=True, parallel=True)
|
||
def get_Correlate(a, v, between):
|
||
begin, end = between
|
||
if end == 0:
|
||
end = len(a) - len(v) - 1
|
||
result = np.empty(end - begin, dtype=np.int64)
|
||
for i in prange(end - begin):
|
||
result[i] = np.sum(a[begin + i: begin + i + len(v)] * v)
|
||
return result
|
||
|
||
|
||
get_Correlate(np.array([0, 0, 0, 0, 1, 2, 3, 4, 5, 0, 0, 0, 0], dtype=np.int64),
|
||
np.array([1, 2, 3, 4, 5], dtype=np.int64), between=np.array([0, 0]))
|
||
|
||
|
||
class MainWindow(QMainWindow):
|
||
def __init__(self):
|
||
super(MainWindow, self).__init__()
|
||
self.ui = Ui_respCoarseAlign()
|
||
self.setting = SettingWindow()
|
||
self.ui.setupUi(self)
|
||
|
||
self.ui.actionDefault_Configuration.triggered.connect(self.setting.show)
|
||
|
||
# checkbox custom 和SpinBox 互斥
|
||
self.ui.checkBox_custom.stateChanged.connect(self.__customChannel__)
|
||
self.__disableAllButton__()
|
||
|
||
self.ui.pushButton_Refresh.setEnabled(True)
|
||
self.ui.pushButton_OpenFile.setEnabled(True)
|
||
self.ui.checkBox_NABD.setEnabled(False)
|
||
self.ui.checkBox_NTHO.setEnabled(False)
|
||
self.ui.checkBox_PABD.setEnabled(False)
|
||
self.ui.checkBox_PTHO.setEnabled(False)
|
||
self.ui.checkBox_custom.setEnabled(False)
|
||
|
||
# 绑定事件
|
||
# 刷新键分别获取PSG和XX文件夹里面的数据,获取所有编号显示在下拉框,比对编号同时存在的可选,仅存在一个文件夹的编号不可选
|
||
self.ui.pushButton_Refresh.clicked.connect(self.__refresh__)
|
||
self.ui.pushButton_OpenFile.clicked.connect(self.__openFile__)
|
||
self.ui.pushButton_Standardize.clicked.connect(self.__standardize__)
|
||
self.ui.pushButton_CutOff.clicked.connect(self.__cutOff__)
|
||
self.ui.pushButton_GetPos.clicked.connect(self.__getPosition__)
|
||
self.ui.pushButton_JUMP.clicked.connect(self.__jump__)
|
||
# spin_custom 按下回车绑定事件
|
||
self.ui.spinBox_SelectEpoch.editingFinished.connect(self.__jump__)
|
||
self.ui.pushButton_EM1.clicked.connect(self.__EpochChange__)
|
||
self.ui.pushButton_EM10.clicked.connect(self.__EpochChange__)
|
||
self.ui.pushButton_EM100.clicked.connect(self.__EpochChange__)
|
||
self.ui.pushButton_EP1.clicked.connect(self.__EpochChange__)
|
||
self.ui.pushButton_EP10.clicked.connect(self.__EpochChange__)
|
||
self.ui.pushButton_EP100.clicked.connect(self.__EpochChange__)
|
||
self.ui.pushButton_SaveInfo.clicked.connect(self.__saveInfo__)
|
||
self.ui.pushButton_ReadInfo.clicked.connect(self.__readInfo__)
|
||
self.ui.pushButton_Exit.clicked.connect(self.__exit__)
|
||
|
||
# 按下checkbox_NTHO、PTHO、 PABD、NABD后启用pushbutton_Align
|
||
self.ui.checkBox_NTHO.stateChanged.connect(self.__enableAlign__)
|
||
self.ui.checkBox_PTHO.stateChanged.connect(self.__enableAlign__)
|
||
self.ui.checkBox_PABD.stateChanged.connect(self.__enableAlign__)
|
||
self.ui.checkBox_NABD.stateChanged.connect(self.__enableAlign__)
|
||
self.ui.spinBox_custom.editingFinished.connect(self.__enableAlign__)
|
||
|
||
def __readInfo__(self):
|
||
"""
|
||
读取信息
|
||
判断是否存在,不存在不读取并提示
|
||
:return:
|
||
"""
|
||
sampNo = self.ui.comboBox_SelectFile.currentText()
|
||
if Path("./RespCoarseAlignInfo.csv").exists():
|
||
df = pd.read_csv("./RespCoarseAlignInfo.csv")
|
||
# print(df)
|
||
df["sampNo"] = df["sampNo"].astype(str)
|
||
df = df[df["sampNo"] == sampNo]
|
||
# print(df)
|
||
if len(df) == 0:
|
||
QMessageBox.warning(self, "警告", f"记录中无编号{sampNo},请计算对齐")
|
||
return
|
||
# 去最后一个
|
||
PSG_SampRate = df["PSG_SampRate"].values[-1]
|
||
XX_SampRate = df["XX_SampRate"].values[-1]
|
||
pos = df["pos"].values[-1]
|
||
epoch = df["epoch"].values[-1]
|
||
# 将信息写入界面
|
||
self.ui.spinBox_PSGFreq.setValue(PSG_SampRate)
|
||
self.ui.spinBox_XXFreq.setValue(XX_SampRate)
|
||
self.ui.spinBox_SelectEpoch.setValue(epoch)
|
||
self.ui.spinBox_custom.setValue(pos)
|
||
|
||
if pos > 0:
|
||
self.ui.spinBox_XXPreA.setValue(pos)
|
||
else:
|
||
self.ui.spinBox_PSGPreA.setValue(-pos)
|
||
# 加载到Conf中
|
||
self.data.Config["PSGConfig"].update({"PreA": self.ui.spinBox_PSGPreA.value(),
|
||
"PreCut": self.ui.spinBox_PSGPreCut.value(),
|
||
"PostCut": self.ui.spinBox_PSGPostCut.value()})
|
||
self.data.Config["XXConfig"].update({"PreA": self.ui.spinBox_XXPreA.value(),
|
||
"PreCut": self.ui.spinBox_XXPreCut.value(),
|
||
"PostCut": self.ui.spinBox_XXPostCut.value()})
|
||
|
||
relate = pos
|
||
reverse = 1
|
||
self.data.Config["XX_reverse"] = reverse
|
||
self.data.Config["pos"] = relate
|
||
epoch_min = max(0, relate // 30 // self.data.Config["ApplyFrequency"] + 1)
|
||
epoch_max = min(len(self.data.processed_THO) // 30 // self.data.Config["ApplyFrequency"] - 1,
|
||
(len(self.data.processed_XX) - relate) // 30 // self.data.Config["ApplyFrequency"] - 1)
|
||
|
||
self.ui.spinBox_SelectEpoch.setMinimum(epoch_min)
|
||
self.ui.spinBox_SelectEpoch.setMaximum(epoch_max)
|
||
self.ui.spinBox_SelectEpoch.setValue(epoch_min)
|
||
# set tooltip
|
||
self.ui.spinBox_SelectEpoch.setToolTip(f"最小值:{epoch_min}\n最大值:{epoch_max}")
|
||
self.ui.spinBox_SelectEpoch.setEnabled(True)
|
||
# 执行按下checkBox_custom
|
||
self.ui.checkBox_custom.setChecked(True)
|
||
ButtonState["Current"]["pushButton_JUMP"] = True
|
||
ButtonState["Current"]["pushButton_EM1"] = True
|
||
ButtonState["Current"]["pushButton_EM10"] = True
|
||
ButtonState["Current"]["pushButton_EM100"] = True
|
||
ButtonState["Current"]["pushButton_EP1"] = True
|
||
ButtonState["Current"]["pushButton_EP10"] = True
|
||
ButtonState["Current"]["pushButton_EP100"] = True
|
||
self.__enableAllButton__()
|
||
|
||
else:
|
||
QMessageBox.warning(self, "警告", f"没有保存记录,请检查当前路径是否有RespCoarseAlignInfo.csv")
|
||
|
||
def __saveInfo__(self):
|
||
"""
|
||
保存信息
|
||
编号、PSG采样率、XinXiao采样率、 相差位置pos、观测Epoch、保存时间
|
||
使用dataframe 保存到 csv中
|
||
首先判断csv是否存在,不存在则新建,存在则追加
|
||
:return:
|
||
"""
|
||
sampNo = self.ui.comboBox_SelectFile.currentText()
|
||
PSG_SampRate = self.ui.spinBox_PSGFreq.value()
|
||
XX_SampRate = self.ui.spinBox_XXFreq.value()
|
||
pos = self.data.Config["pos"]
|
||
epoch = self.ui.spinBox_SelectEpoch.value()
|
||
save_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
# 保存到csv中
|
||
df = pd.DataFrame({"sampNo": [sampNo], "PSG_SampRate": [PSG_SampRate], "XX_SampRate": [XX_SampRate],
|
||
"pos": [pos], "epoch": [epoch], "save_time": [save_time]})
|
||
if Path("./RespCoarseAlignInfo.csv").exists():
|
||
df.to_csv("./RespCoarseAlignInfo.csv", mode="a", header=False, index=False)
|
||
else:
|
||
df.to_csv("./RespCoarseAlignInfo.csv", mode="w", header=True, index=False)
|
||
# noinspection PyUnresolvedReferences
|
||
QMessageBox.information(self, "提示", "保存成功", QMessageBox.Ok)
|
||
|
||
def __EpochChange__(self):
|
||
# 获取当前值
|
||
value = self.ui.spinBox_SelectEpoch.value()
|
||
# print(self.sender())
|
||
# print(self.sender() == self.ui.pushButton_EM1)
|
||
# 判断按键
|
||
if self.sender() == self.ui.pushButton_EM1:
|
||
epoch = max(self.ui.spinBox_SelectEpoch.minimum(), value - 1)
|
||
elif self.sender() == self.ui.pushButton_EM10:
|
||
epoch = max(self.ui.spinBox_SelectEpoch.minimum(), value - 10)
|
||
elif self.sender() == self.ui.pushButton_EM100:
|
||
epoch = max(self.ui.spinBox_SelectEpoch.minimum(), value - 100)
|
||
elif self.sender() == self.ui.pushButton_EP1:
|
||
epoch = min(self.ui.spinBox_SelectEpoch.maximum(), value + 1)
|
||
elif self.sender() == self.ui.pushButton_EP10:
|
||
epoch = min(self.ui.spinBox_SelectEpoch.maximum(), value + 10)
|
||
elif self.sender() == self.ui.pushButton_EP100:
|
||
epoch = min(self.ui.spinBox_SelectEpoch.maximum(), value + 100)
|
||
else:
|
||
return
|
||
# print(epoch)
|
||
self.ui.spinBox_SelectEpoch.setValue(epoch)
|
||
QApplication.processEvents()
|
||
self.__jump__()
|
||
|
||
def __jump__(self):
|
||
|
||
self.__disableAllButton__()
|
||
self.ui.label_Info.setText("开始绘制···")
|
||
self.ui.progressBar.setValue(0)
|
||
# 获取选择的Epoch
|
||
epoch = self.ui.spinBox_SelectEpoch.value()
|
||
# 绘制图像
|
||
self.__plot__(epoch=epoch)
|
||
self.ui.progressBar.setValue(100)
|
||
self.ui.label_Info.setText(f"Epoch: {epoch}, 绘制完成")
|
||
self.__enableAllButton__()
|
||
|
||
def __enableAlign__(self):
|
||
self.__disableAllButton__()
|
||
self.ui.label_Info.setText("绘制对齐图像···")
|
||
self.ui.progressBar.setValue(0)
|
||
QApplication.processEvents()
|
||
if self.ui.checkBox_NTHO.isChecked():
|
||
relate = int(self.ui.checkBox_NTHO.text())
|
||
reverse = -1
|
||
elif self.ui.checkBox_PTHO.isChecked():
|
||
relate = int(self.ui.checkBox_PTHO.text())
|
||
reverse = 1
|
||
elif self.ui.checkBox_PABD.isChecked():
|
||
relate = int(self.ui.checkBox_PABD.text())
|
||
reverse = 1
|
||
elif self.ui.checkBox_NABD.isChecked():
|
||
relate = int(self.ui.checkBox_NABD.text())
|
||
reverse = -1
|
||
elif self.ui.checkBox_custom.isChecked():
|
||
relate = self.ui.spinBox_custom.value()
|
||
reverse = 1
|
||
else:
|
||
return
|
||
|
||
# 最大相关系数值相对于PSG的位置
|
||
self.data.Config["pos"] = relate
|
||
self.__plot__()
|
||
self.ui.label_Info.setText("相对位置:{}".format(relate))
|
||
|
||
# 设置epoch上下限
|
||
self.data.Config["XX_reverse"] = reverse
|
||
epoch_min = max(0, relate // 30 // self.data.Config["ApplyFrequency"] + 1)
|
||
epoch_max = min(len(self.data.processed_THO) // 30 // self.data.Config["ApplyFrequency"] - 1,
|
||
(len(self.data.processed_XX) + relate) // 30 // self.data.Config["ApplyFrequency"] - 1)
|
||
self.ui.spinBox_SelectEpoch.setMinimum(epoch_min)
|
||
self.ui.spinBox_SelectEpoch.setMaximum(epoch_max)
|
||
self.ui.spinBox_SelectEpoch.setValue(epoch_min)
|
||
# set tooltip
|
||
self.ui.spinBox_SelectEpoch.setToolTip(f"最小值:{epoch_min}\n最大值:{epoch_max}")
|
||
self.ui.spinBox_SelectEpoch.setEnabled(True)
|
||
ButtonState["Current"]["pushButton_JUMP"] = True
|
||
ButtonState["Current"]["pushButton_EM1"] = True
|
||
ButtonState["Current"]["pushButton_EM10"] = True
|
||
ButtonState["Current"]["pushButton_EM100"] = True
|
||
ButtonState["Current"]["pushButton_EP1"] = True
|
||
ButtonState["Current"]["pushButton_EP10"] = True
|
||
ButtonState["Current"]["pushButton_EP100"] = True
|
||
ButtonState["Current"]["pushButton_SaveInfo"] = True
|
||
self.__enableAllButton__()
|
||
|
||
def __getPosition__(self):
|
||
# 根据截断选择,对信号进行互相关操作,得到最大值的位置
|
||
# 计算互相关
|
||
self.ui.progressBar.setValue(0)
|
||
self.ui.label_Info.setText("计算互相关1/2...")
|
||
self.__disableAllButton__()
|
||
QApplication.processEvents()
|
||
|
||
a = self.data.processed_THO[
|
||
self.data.Config["PSGConfig"]["PreCut"]:len(self.data.processed_THO) - self.data.Config["PSGConfig"][
|
||
"PostCut"]].copy()
|
||
v = self.data.processed_XX[
|
||
self.data.Config["XXConfig"]["PreCut"]:len(self.data.processed_XX) - self.data.Config["XXConfig"][
|
||
"PostCut"]].copy()
|
||
a *= 100
|
||
v *= 100
|
||
a = a.astype(np.int64)
|
||
v = v.astype(np.int64)
|
||
a = np.pad(a, (len(v) - 1, len(v) - 1), mode='constant')
|
||
tho_relate = np.zeros(len(a) - len(v) - 1, dtype=np.int64)
|
||
len_calc = len(a) - len(v) - 1
|
||
# 将序列分成一百份来计算
|
||
for i in range(100):
|
||
tho_relate[i * len_calc // 100:(i + 1) * len_calc // 100] = get_Correlate(a, v, np.array(
|
||
[i * len_calc // 100, (i + 1) * len_calc // 100]))
|
||
|
||
self.ui.progressBar.setValue(i)
|
||
QApplication.processEvents()
|
||
|
||
tho_relate = tho_relate / 10000
|
||
tho_relate2 = - tho_relate
|
||
|
||
self.ui.progressBar.setValue(0)
|
||
self.ui.label_Info.setText("计算互相关2/2...")
|
||
QApplication.processEvents()
|
||
|
||
a = self.data.processed_ABD[
|
||
self.data.Config["PSGConfig"]["PreCut"]:len(self.data.processed_ABD) - self.data.Config["PSGConfig"][
|
||
"PostCut"]].copy()
|
||
v = self.data.processed_XX[
|
||
self.data.Config["XXConfig"]["PreCut"]:len(self.data.processed_XX) - self.data.Config["XXConfig"][
|
||
"PostCut"]].copy()
|
||
a *= 100
|
||
v *= 100
|
||
a = a.astype(np.int64)
|
||
v = v.astype(np.int64)
|
||
a = np.pad(a, (len(v) - 1, len(v) - 1), mode='constant')
|
||
|
||
abd_relate = np.zeros(len(a) - len(v) - 1, dtype=np.int64)
|
||
len_calc = len(a) - len(v) - 1
|
||
# 将序列分成一百份来计算
|
||
for i in range(100):
|
||
abd_relate[i * len_calc // 100:(i + 1) * len_calc // 100] = get_Correlate(a, v, np.array(
|
||
[i * len_calc // 100, (i + 1) * len_calc // 100]))
|
||
self.ui.progressBar.setValue(i)
|
||
QApplication.processEvents()
|
||
|
||
abd_relate = abd_relate / 10000
|
||
abd_relate2 = - abd_relate
|
||
|
||
self.ui.progressBar.setValue(80)
|
||
self.ui.label_Info.setText("绘制相关系数曲线...")
|
||
QApplication.processEvents()
|
||
self.__plot__(tho_relate, tho_relate2, abd_relate, abd_relate2)
|
||
|
||
self.ui.progressBar.setValue(90)
|
||
self.ui.label_Info.setText("计算最大值位置...")
|
||
QApplication.processEvents()
|
||
# 计算最大值位置
|
||
tho_max = np.argmax(tho_relate)
|
||
tho_max2 = np.argmax(tho_relate2)
|
||
abd_max = np.argmax(abd_relate)
|
||
abd_max2 = np.argmax(abd_relate2)
|
||
pre = self.data.Config["PSGConfig"]["PreCut"] + self.data.Config["XXConfig"]["PostCut"]
|
||
|
||
bias = pre - len(self.data.processed_XX) + 1
|
||
self.ui.checkBox_PABD.setText(str(abd_max + bias))
|
||
self.ui.checkBox_PTHO.setText(str(tho_max + bias))
|
||
self.ui.checkBox_NABD.setText(str(abd_max2 + bias))
|
||
self.ui.checkBox_NTHO.setText(str(tho_max2 + bias))
|
||
self.ui.checkBox_PABD.setEnabled(True)
|
||
self.ui.checkBox_PTHO.setEnabled(True)
|
||
self.ui.checkBox_NABD.setEnabled(True)
|
||
self.ui.checkBox_NTHO.setEnabled(True)
|
||
self.ui.checkBox_custom.setEnabled(True)
|
||
# print(abd_max2)
|
||
self.ui.progressBar.setValue(100)
|
||
self.ui.label_Info.setText("计算完成")
|
||
self.__enableAllButton__()
|
||
|
||
def __reset__(self):
|
||
ButtonState["Current"].update(ButtonState["Default"].copy())
|
||
ButtonState["Current"]["pushButton_Standardize"] = True
|
||
self.ui.spinBox_PSGPreA.setValue(0)
|
||
self.ui.spinBox_PSGPreCut.setValue(0)
|
||
self.ui.spinBox_PSGPostCut.setValue(0)
|
||
self.ui.spinBox_XXPreA.setValue(0)
|
||
self.ui.spinBox_XXPreCut.setValue(0)
|
||
self.ui.spinBox_XXPostCut.setValue(0)
|
||
self.ui.checkBox_NABD.setChecked(False)
|
||
self.ui.checkBox_NTHO.setChecked(False)
|
||
self.ui.checkBox_PABD.setChecked(False)
|
||
self.ui.checkBox_PTHO.setChecked(False)
|
||
self.ui.checkBox_custom.setChecked(False)
|
||
self.ui.spinBox_SelectEpoch.setValue(0)
|
||
self.ui.spinBox_custom.setValue(0)
|
||
self.ui.spinBox_custom.setEnabled(False)
|
||
self.ui.spinBox_SelectEpoch.setToolTip("")
|
||
self.ui.checkBox_PABD.setText("备选3")
|
||
self.ui.checkBox_PTHO.setText("备选1")
|
||
self.ui.checkBox_NABD.setText("备选4")
|
||
self.ui.checkBox_NTHO.setText("备选2")
|
||
self.ui.checkBox_PABD.setEnabled(False)
|
||
self.ui.checkBox_PTHO.setEnabled(False)
|
||
self.ui.checkBox_NABD.setEnabled(False)
|
||
self.ui.checkBox_NTHO.setEnabled(False)
|
||
self.ui.checkBox_custom.setEnabled(False)
|
||
self.ui.spinBox_SelectEpoch.setMinimum(0)
|
||
|
||
def __cutOff__(self):
|
||
self.ui.label_Info.setText("开始绘制···")
|
||
self.__disableAllButton__()
|
||
Conf2 = self.data.Config.copy()
|
||
self.ui.progressBar.setValue(20)
|
||
QApplication.processEvents()
|
||
Conf2["PSGConfig"].update({"PreA": self.ui.spinBox_PSGPreA.value(),
|
||
"PreCut": self.ui.spinBox_PSGPreCut.value(),
|
||
"PostCut": self.ui.spinBox_PSGPostCut.value()})
|
||
Conf2["XXConfig"].update({"PreA": self.ui.spinBox_XXPreA.value(),
|
||
"PreCut": self.ui.spinBox_XXPreCut.value(),
|
||
"PostCut": self.ui.spinBox_XXPostCut.value()})
|
||
|
||
self.data.Config = Conf2
|
||
self.__plot__()
|
||
self.ui.progressBar.setValue(100)
|
||
self.ui.label_Info.setText("绘制完成")
|
||
ButtonState["Current"]["pushButton_GetPos"] = True
|
||
self.__enableAllButton__()
|
||
|
||
# matplotlib 绘制图像
|
||
def __plot__(self, *args, **kwargs):
|
||
# 判读是哪个按钮点击调用的本程序
|
||
if self.sender() == self.ui.pushButton_Standardize:
|
||
buffer, width, height = self.data.DrawPicRawOverview()
|
||
elif self.sender() == self.ui.pushButton_CutOff:
|
||
buffer, width, height = self.data.DrawPicOverviewWithCutOff()
|
||
elif self.sender() == self.ui.pushButton_GetPos:
|
||
buffer, width, height = self.data.DrawPicCorrelate(*args, **kwargs)
|
||
elif self.sender() == self.ui.checkBox_NTHO:
|
||
buffer, width, height = self.data.DrawPicTryAlign()
|
||
elif self.sender() == self.ui.checkBox_NABD:
|
||
buffer, width, height = self.data.DrawPicTryAlign()
|
||
elif self.sender() == self.ui.checkBox_PTHO:
|
||
buffer, width, height = self.data.DrawPicTryAlign()
|
||
elif self.sender() == self.ui.checkBox_PABD:
|
||
buffer, width, height = self.data.DrawPicTryAlign()
|
||
elif self.sender() == self.ui.spinBox_custom:
|
||
buffer, width, height = self.data.DrawPicTryAlign()
|
||
elif self.sender() == self.ui.pushButton_JUMP:
|
||
buffer, width, height = self.data.DrawPicByEpoch(*args, **kwargs)
|
||
elif self.sender() == self.ui.pushButton_EM1:
|
||
buffer, width, height = self.data.DrawPicByEpoch(*args, **kwargs)
|
||
elif self.sender() == self.ui.pushButton_EM10:
|
||
buffer, width, height = self.data.DrawPicByEpoch(*args, **kwargs)
|
||
elif self.sender() == self.ui.pushButton_EM100:
|
||
buffer, width, height = self.data.DrawPicByEpoch(*args, **kwargs)
|
||
elif self.sender() == self.ui.pushButton_EP1:
|
||
buffer, width, height = self.data.DrawPicByEpoch(*args, **kwargs)
|
||
elif self.sender() == self.ui.pushButton_EP10:
|
||
buffer, width, height = self.data.DrawPicByEpoch(*args, **kwargs)
|
||
elif self.sender() == self.ui.pushButton_EP100:
|
||
buffer, width, height = self.data.DrawPicByEpoch(*args, **kwargs)
|
||
else:
|
||
return
|
||
# 显示到labelPic上
|
||
img = QImage(buffer, width, height, QImage.Format_RGBA8888)
|
||
self.ui.label_Pic.setPixmap(QPixmap(img))
|
||
|
||
# noinspection PyUnresolvedReferences
|
||
def __exit__(self):
|
||
# 选择是否确认退出
|
||
reply = QMessageBox.question(self, '确认', '确认退出吗?', QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
|
||
if reply == QMessageBox.Yes:
|
||
self.close()
|
||
|
||
def __customChannel__(self, state):
|
||
if state == 2:
|
||
self.ui.spinBox_custom.setEnabled(True)
|
||
else:
|
||
self.ui.spinBox_custom.setEnabled(False)
|
||
|
||
# 刷新键分别获取PSG和XX文件夹里面的数据,获取所有编号显示在下拉框,比对编号同时存在的可选,仅存在一个文件夹的编号不可选
|
||
def __refresh__(self):
|
||
# 检查两文件夹是否存在
|
||
PSGPath = Path(self.setting.ui.lineEdit_PSGFilePath.text())
|
||
XXPath = Path(self.setting.ui.lineEdit_XXFilePath.text())
|
||
if not PSGPath.exists():
|
||
QMessageBox.warning(self, "警告", "PSG文件夹不存在")
|
||
return
|
||
if not XXPath.exists():
|
||
QMessageBox.warning(self, "警告", "XX文件夹不存在")
|
||
return
|
||
|
||
# 获取两文件夹下所有的txt和npy文件编号
|
||
PSGFiles = [file.stem for file in PSGPath.glob("*.edf")]
|
||
XXFiles = [file.stem for file in XXPath.glob("*.txt")] + [file.stem for file in XXPath.glob("*.npy")]
|
||
PSGFiles = sorted(PSGFiles)
|
||
XXFiles = sorted(XXFiles)
|
||
# 获取两文件夹下同时存在的编号
|
||
# print(PSGFiles, XXFiles)
|
||
Files = list(set(PSGFiles).intersection(set(XXFiles)))
|
||
Files = sorted(Files)
|
||
# 获取两文件夹下仅存在一个的编号
|
||
FilesOnlyInPSG = list(set(PSGFiles).difference(set(XXFiles)))
|
||
FilesOnlyInXX = list(set(XXFiles).difference(set(PSGFiles)))
|
||
# print(Files, FilesOnlyInPSG, FilesOnlyInXX)
|
||
# 均显示到下拉框
|
||
self.ui.comboBox_SelectFile.clear()
|
||
self.ui.comboBox_SelectFile.addItems([file for file in Files])
|
||
self.ui.comboBox_SelectFile.addItems([file + " (仅PSG)" for file in FilesOnlyInPSG])
|
||
self.ui.comboBox_SelectFile.addItems([file + " (仅XX)" for file in FilesOnlyInXX])
|
||
self.ui.comboBox_SelectFile.setCurrentIndex(0)
|
||
|
||
# # 仅存在一个文件夹的编号不可选
|
||
for file in FilesOnlyInPSG:
|
||
self.ui.comboBox_SelectFile.model().item(FilesOnlyInPSG.index(file) + len(Files)).setEnabled(False)
|
||
for file in FilesOnlyInXX:
|
||
self.ui.comboBox_SelectFile.model().item(
|
||
FilesOnlyInXX.index(file) + len(Files) + len(FilesOnlyInPSG)).setEnabled(False)
|
||
|
||
def __disableAllButton__(self):
|
||
# 禁用所有按钮
|
||
all_widgets = self.centralWidget().findChildren(QWidget)
|
||
|
||
# 迭代所有部件,查找按钮并禁用它们
|
||
for widget in all_widgets:
|
||
if isinstance(widget, QPushButton):
|
||
if widget.objectName() in ButtonState["Current"].keys():
|
||
widget.setEnabled(False)
|
||
|
||
def __enableAllButton__(self):
|
||
# 启用按钮
|
||
all_widgets = self.centralWidget().findChildren(QWidget)
|
||
|
||
# 迭代所有部件,查找按钮并启用它们
|
||
for widget in all_widgets:
|
||
if isinstance(widget, QPushButton):
|
||
if widget.objectName() in ButtonState["Current"].keys():
|
||
widget.setEnabled(ButtonState["Current"][widget.objectName()])
|
||
|
||
def __openFile__(self):
|
||
self.ui.label_Info.setText("正在打开文件...")
|
||
self.__disableAllButton__()
|
||
# 长时间操作,刷新界面防止卡顿
|
||
QApplication.processEvents()
|
||
# 获取checkbox状态
|
||
self.data = Data(Path(self.setting.ui.lineEdit_PSGFilePath.text()),
|
||
Path(self.setting.ui.lineEdit_XXFilePath.text()),
|
||
self.ui.comboBox_SelectFile.currentText().split(" ")[0],
|
||
Conf)
|
||
opened, info = self.data.OpenFile()
|
||
if not opened:
|
||
QMessageBox.warning(self, "警告", info)
|
||
self.ui.label_Info.setText(info)
|
||
else:
|
||
self.ui.label_PSGmins.setText(str(self.data.PSG_minutes))
|
||
self.ui.label_XXmins.setText(str(self.data.XX_minutes))
|
||
self.ui.progressBar.setValue(100)
|
||
self.ui.label_Info.setText(info)
|
||
self.ui.label_Pic.setText("读取成功")
|
||
self.__reset__()
|
||
|
||
self.__enableAllButton__()
|
||
|
||
def __standardize__(self):
|
||
self.ui.progressBar.setValue(0)
|
||
self.ui.label_Info.setText("正在标准化...")
|
||
self.__disableAllButton__()
|
||
QApplication.processEvents()
|
||
Conf2 = self.data.Config.copy()
|
||
Conf2["RawSignal"] = self.ui.checkBox_RawSignal.isChecked()
|
||
Conf2["PSGConfig"].update({
|
||
"PSGDelBase": self.ui.checkBox_PSGDelBase.isChecked(),
|
||
"PSGZScore": self.ui.checkBox_PSGZScore.isChecked(),
|
||
"Frequency": self.ui.spinBox_PSGFreq.value(),
|
||
})
|
||
Conf2["XXConfig"].update({
|
||
"XXDelBase": self.ui.checkBox_XXDelBase.isChecked(),
|
||
"XXZScore": self.ui.checkBox_XXZScore.isChecked(),
|
||
"Frequency": self.ui.spinBox_XXFreq.value(),
|
||
})
|
||
self.data.Config = Conf2
|
||
if self.data.Config["RawSignal"]:
|
||
opened, info = self.data.Standardize_0()
|
||
if not opened:
|
||
QMessageBox.warning(self, "警告", info)
|
||
self.ui.label_Info.setText(info)
|
||
else:
|
||
self.ui.progressBar.setValue(100)
|
||
self.ui.label_Info.setText(info)
|
||
else:
|
||
self.ui.label_Info.setText('正在进行呼吸提取...')
|
||
QApplication.processEvents()
|
||
|
||
opened, info = self.data.Standardize_1()
|
||
self.ui.label_Info.setText(info)
|
||
self.ui.progressBar.setValue(10)
|
||
QApplication.processEvents()
|
||
|
||
opened, info = self.data.Standardize_2()
|
||
self.ui.progressBar.setValue(30)
|
||
self.ui.label_Info.setText(info)
|
||
QApplication.processEvents()
|
||
|
||
opened, info = self.data.Standardize_3()
|
||
self.ui.label_Info.setText(info)
|
||
self.ui.progressBar.setValue(50)
|
||
QApplication.processEvents()
|
||
|
||
opened, info = self.data.Standardize_4()
|
||
self.ui.label_Info.setText(info)
|
||
self.ui.progressBar.setValue(70)
|
||
QApplication.processEvents()
|
||
|
||
opened, info = self.data.Standardize_5()
|
||
self.ui.label_Info.setText(info)
|
||
self.ui.progressBar.setValue(90)
|
||
QApplication.processEvents()
|
||
|
||
self.__plot__()
|
||
self.ui.progressBar.setValue(100)
|
||
ButtonState["Current"]["pushButton_CutOff"] = True
|
||
ButtonState["Current"]["pushButton_ReadInfo"] = True
|
||
self.__enableAllButton__()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
app = QApplication(sys.argv)
|
||
window = MainWindow()
|
||
window.show()
|
||
sys.exit(app.exec())
|