#!/usr/bin/python # -*- coding: UTF-8 -*- """ @author:andrew @file:RespCoarseAlign.py @email:admin@marques22.com @email:2021022362@m.scnu.edu.cn @time:2023/09/20 """ import sys from datetime import datetime from pathlib import Path import pyedflib import numpy as np import pandas as pd from PySide6.QtGui import QPixmap, QImage from PySide6.QtWidgets import QApplication, QMainWindow, QFileDialog, QMessageBox, QWidget, QPushButton from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas from matplotlib.figure import Figure from numba import njit, prange from scipy import signal from ui.Mian import Ui_mainWindow as Ui_respCoarseAlign from ui.setings import Ui_MainWindow as Ui_Setting import yaml Conf = { "PSGConfig": { "Path": "./Data/PSG/", "Frequency": 100, "THOChannel": { "auto": True, "Channel": 3, }, "ABDChannel": { "auto": True, "Channel": 4, }, }, "XXConfig": { "Path": "./Data/XX/", "Frequency": 100, }, "RespFilterConfig": { "LowCut": 0.01, "HighCut": 0.7, "Order": 4 }, "ApplyFrequency": 5 } ButtonState = { "Default": { "pushButton_Refresh": True, "pushButton_OpenFile": True, "pushButton_Standardize": False, "pushButton_CutOff": False, "pushButton_GetPos": False, "pushButton_JUMP": False, "pushButton_EM1": False, "pushButton_EM10": False, "pushButton_EM100": False, "pushButton_EP1": False, "pushButton_EP10": False, "pushButton_EP100": False, "pushButton_SaveInfo": False, "pushButton_ReadInfo": False, "pushButton_Exit": True}, "Current": { "pushButton_Refresh": True, "pushButton_OpenFile": True, "pushButton_Standardize": False, "pushButton_CutOff": False, "pushButton_GetPos": False, "pushButton_JUMP": False, "pushButton_EM1": False, "pushButton_EM10": False, "pushButton_EM100": False, "pushButton_EP1": False, "pushButton_EP10": False, "pushButton_EP100": False, "pushButton_SaveInfo": False, "pushButton_ReadInfo": False, "pushButton_Exit": True} } class SettingWindow(QMainWindow): def __init__(self): super(SettingWindow, self).__init__() self.ui = Ui_Setting() self.ui.setupUi(self) self.__read_settings__() def __read_settings__(self): if not Path("./config.yaml").exists(): with open("./config.yaml", "w") as f: yaml.dump(Conf, f) with open("./config.yaml", "r") as f: fileConfig = yaml.load(f.read(), Loader=yaml.FullLoader) Conf.update(fileConfig) # print(Conf) self.ui.lineEdit_PSGFilePath.setText(Conf["PSGConfig"]["Path"]) self.ui.lineEdit_XXFilePath.setText(Conf["XXConfig"]["Path"]) self.ui.spinBox_PSGDefaultFreq.setValue(Conf["PSGConfig"]["Frequency"]) self.ui.spinBox_XXDefaultFreq.setValue(Conf["XXConfig"]["Frequency"]) self.ui.QSpinBox_ApplyFre.setValue(Conf["ApplyFrequency"]) autoTHO = Conf["PSGConfig"]["THOChannel"]["auto"] self.ui.checkBox_THOautoChannel.setChecked(2 if autoTHO else 0) autoABD = Conf["PSGConfig"]["ABDChannel"]["auto"] self.ui.checkBox_ABDautoChannel.setChecked(2 if autoABD else 0) self.ui.spinBox_THOcustomChannel.setValue(Conf["PSGConfig"]["THOChannel"]["Channel"]) self.ui.spinBox_ABDcustomChannel.setValue(Conf["PSGConfig"]["ABDChannel"]["Channel"]) self.ui.doubleSpinBox_ButterLowPassFreq.setValue(Conf["RespFilterConfig"]["LowCut"]) self.ui.doubleSpinBox_ButterHighPassFreq.setValue(Conf["RespFilterConfig"]["HighCut"]) self.ui.spinBox_ButterOrder.setValue(Conf["RespFilterConfig"]["Order"]) self.ui.spinBox_THOcustomChannel.setEnabled(not autoTHO) self.ui.spinBox_ABDcustomChannel.setEnabled(not autoABD) # 绑定事件 self.ui.toolButton_PSGFilePath.clicked.connect(self.__select_file__) self.ui.toolButton_XXFilePath.clicked.connect(self.__select_file__) self.ui.pushButton_SaveConfig.clicked.connect(self.__write_settings__) self.ui.pushButton_Cancel.clicked.connect(self.close) # ABD auto checkbox和SpinBox 互斥 self.ui.checkBox_ABDautoChannel.stateChanged.connect(self.__ABDAutoChannel__) self.ui.checkBox_THOautoChannel.stateChanged.connect(self.__THOAutoChannel__) def __ABDAutoChannel__(self, state): if state == 2: self.ui.spinBox_ABDcustomChannel.setEnabled(False) else: self.ui.spinBox_ABDcustomChannel.setEnabled(True) def __THOAutoChannel__(self, state): if state == 2: self.ui.spinBox_THOcustomChannel.setEnabled(False) else: self.ui.spinBox_THOcustomChannel.setEnabled(True) def __select_file__(self, event): sender = self.sender() if sender.objectName() == "toolButton_PSGFilePath": path = QFileDialog.getExistingDirectory(self, "选择PSG数据文件夹", "./Data/PSG/") self.ui.lineEdit_PSGFilePath.setText(path) elif sender.objectName() == "toolButton_XXFilePath": path = QFileDialog.getExistingDirectory(self, "选择XX数据文件夹", "./Data/XX/") self.ui.lineEdit_XXFilePath.setText(path) def __write_settings__(self): # 从界面读取配置 Conf["PSGConfig"]["Path"] = self.ui.lineEdit_PSGFilePath.text() Conf["XXConfig"]["Path"] = self.ui.lineEdit_XXFilePath.text() Conf["PSGConfig"]["Frequency"] = self.ui.spinBox_PSGDefaultFreq.value() Conf["XXConfig"]["Frequency"] = self.ui.spinBox_XXDefaultFreq.value() Conf["ApplyFrequency"] = self.ui.QSpinBox_ApplyFre.value() Conf["PSGConfig"]["THOChannel"]["auto"] = self.ui.checkBox_THOautoChannel.isChecked() Conf["PSGConfig"]["ABDChannel"]["auto"] = self.ui.checkBox_ABDautoChannel.isChecked() Conf["PSGConfig"]["THOChannel"]["Channel"] = self.ui.spinBox_THOcustomChannel.value() Conf["PSGConfig"]["ABDChannel"]["Channel"] = self.ui.spinBox_ABDcustomChannel.value() Conf["RespFilterConfig"]["LowCut"] = self.ui.doubleSpinBox_ButterLowPassFreq.value() Conf["RespFilterConfig"]["HighCut"] = self.ui.doubleSpinBox_ButterHighPassFreq.value() Conf["RespFilterConfig"]["Order"] = self.ui.spinBox_ButterOrder.value() with open("./config.yaml", "w") as f: yaml.dump(Conf, f) self.close() class Data: def __init__(self, PSGDataPath, XXDataPath, sampNo, Config): self.PSGDataPath = PSGDataPath / f"{sampNo}.edf" if (XXDataPath / f"{sampNo}.npy").exists(): self.XXDataPath = XXDataPath / f"{sampNo}.npy" elif (XXDataPath / f"{sampNo}.txt").exists(): self.XXDataPath = XXDataPath / f"{sampNo}.txt" else: self.XXDataPath = None self.Config = Config self.raw_THO = None self.raw_ABD = None self.raw_XX = None self.processed_THO = None self.processed_ABD = None self.processed_XX = None self.PSG_minutes = None self.XX_minutes = None def OpenFile(self): # 判断是edf还是npy或txt if self.PSGDataPath.suffix == ".edf": PSG = pyedflib.EdfReader(self.PSGDataPath.__str__()) if self.Config["PSGConfig"]["THOChannel"]["auto"]: self.raw_THO = PSG.readSignal(PSG.getSignalLabels().index('Effort THO')) else: self.raw_THO = PSG.readSignal(self.Config["PSGConfig"]["THOChannel"]["Channel"]) if self.Config["PSGConfig"]["ABDChannel"]["auto"]: self.raw_ABD = PSG.readSignal(PSG.getSignalLabels().index('Effort ABD')) else: self.raw_ABD = PSG.readSignal(self.Config["PSGConfig"]["ABDChannel"]["Channel"]) PSG.close() else: return False, "PSG文件格式错误" if self.XXDataPath.suffix == ".npy": self.raw_XX = np.load(self.XXDataPath) elif self.XXDataPath.suffix == ".txt": self.raw_XX = pd.read_csv(self.XXDataPath, sep="\t", header=None).values else: return False, "XX文件格式错误" # 获取时长 # print(self.raw_THO.shape, self.raw_XX.shape) # print(self.Config["PSGConfig"]["Frequency"], self.Config["XXConfig"]["Frequency"]) self.PSG_minutes = round(self.raw_THO.shape[0] / self.Config["PSGConfig"]["Frequency"] / 60) self.XX_minutes = round(self.raw_XX.shape[0] / self.Config["XXConfig"]["Frequency"] / 60) return True, "读取成功" def __Filter__(self): def butter_bandpass_filter(data, lowCut, highCut, fs, order): low = lowCut / (fs * 0.5) high = highCut / (fs * 0.5) sos = signal.butter(order, [low, high], btype="bandpass", output='sos') return signal.sosfiltfilt(sos, data) # 滤波 self.processed_THO = butter_bandpass_filter(self.raw_THO, self.Config["RespFilterConfig"]["LowCut"], self.Config["RespFilterConfig"]["HighCut"], self.Config["PSGConfig"]["Frequency"], self.Config["RespFilterConfig"]["Order"]) self.processed_ABD = butter_bandpass_filter(self.raw_ABD, self.Config["RespFilterConfig"]["LowCut"], self.Config["RespFilterConfig"]["HighCut"], self.Config["PSGConfig"]["Frequency"], self.Config["RespFilterConfig"]["Order"]) self.processed_XX = butter_bandpass_filter(self.raw_XX, self.Config["RespFilterConfig"]["LowCut"], self.Config["RespFilterConfig"]["HighCut"], self.Config["XXConfig"]["Frequency"], self.Config["RespFilterConfig"]["Order"]) return def Standardize_0(self): # 重采样 self.processed_THO = signal.resample(self.raw_THO, int(self.PSG_minutes * 60 * self.Config["ApplyFrequency"])) self.processed_ABD = signal.resample(self.raw_ABD, int(self.PSG_minutes * 60 * self.Config["ApplyFrequency"])) self.processed_XX = signal.resample(self.raw_XX, int(self.XX_minutes * 60 * self.Config["ApplyFrequency"])) return True, "原始信号仅重采样" def Standardize_1(self): self.__Filter__() return True, "呼吸提取完成 " def Standardize_2(self): # 如果XX采样率大于PSG采样率,那么XX重采样到PSG采样率 if self.Config["XXConfig"]["Frequency"] > self.Config["PSGConfig"]["Frequency"]: # 用[::]完成 self.processed_XX = self.processed_XX[ ::int(self.Config["XXConfig"]["Frequency"] / self.Config["PSGConfig"]["Frequency"])] # 如果XX采样率小于PSG采样率,那么XX重采样到PSG采样率 elif self.Config["XXConfig"]["Frequency"] < self.Config["PSGConfig"]["Frequency"] < 100: # 用repeat完成 self.processed_XX = np.repeat(self.processed_XX, int(self.Config["PSGConfig"]["Frequency"] / self.Config["XXConfig"][ "Frequency"]), axis=0) # 修改Config self.Config.update({"temp_frequency": self.Config["PSGConfig"]["Frequency"]}) return True, "预重采样完成" def Standardize_3(self): temp_frequency = self.Config["temp_frequency"] # 判断是否去基线 if self.Config["PSGConfig"]["PSGDelBase"]: # 减去四秒钟平均滤波 self.processed_THO = self.processed_THO - np.convolve(self.processed_THO, np.ones(int(4 * temp_frequency)) / int( 4 * temp_frequency), mode='same') self.processed_ABD = self.processed_ABD - np.convolve(self.processed_ABD, np.ones(int(4 * temp_frequency)) / int( 4 * temp_frequency), mode='same') if self.Config["XXConfig"]["XXDelBase"]: self.processed_XX = self.processed_XX - np.convolve(self.processed_XX, np.ones(int(4 * temp_frequency)) / int( 4 * temp_frequency), mode='same') return True, "去基线完成" def Standardize_4(self): # 判断是否标准化 if self.Config["PSGConfig"]["PSGZScore"]: self.processed_THO = (self.processed_THO - np.mean(self.processed_THO)) / np.std(self.processed_THO) self.processed_ABD = (self.processed_ABD - np.mean(self.processed_ABD)) / np.std(self.processed_ABD) if self.Config["XXConfig"]["XXZScore"]: self.processed_XX = (self.processed_XX - np.mean(self.processed_XX)) / np.std(self.processed_XX) return True, "标准化完成" def Standardize_5(self): # 重采样 用[::]完成 temp_frequency = self.Config["temp_frequency"] self.processed_THO = self.processed_THO[::int(temp_frequency / self.Config["ApplyFrequency"])] self.processed_ABD = self.processed_ABD[::int(temp_frequency / self.Config["ApplyFrequency"])] self.processed_XX = self.processed_XX[::int(temp_frequency / self.Config["ApplyFrequency"])] # self.processed_THO = signal.resample(self.processed_THO, # int(self.PSG_minutes * 60 * self.Config["ApplyFrequency"])) # self.processed_ABD = signal.resample(self.processed_ABD, # int(self.PSG_minutes * 60 * self.Config["ApplyFrequency"])) # self.processed_XX = signal.resample(self.processed_XX, # int(self.XX_minutes * 60 * self.Config["ApplyFrequency"])) return True, "最终重采样完成" @staticmethod def DrawPicCorrelate(tho_pxx, tho_nxx, abd_pxx, abd_nxx): fig = Figure(figsize=(8, 7), dpi=100) canvas = FigureCanvas(fig) ax1 = fig.add_subplot(221) ax1.plot(tho_pxx, color='blue') ax1.set_title("The Correlation of THO and XinXiao") ax2 = fig.add_subplot(222) ax2.plot(tho_nxx, color='blue') ax2.set_title("The Correlation of THO and Reverse XinXiao") ax3 = fig.add_subplot(223) ax3.plot(abd_pxx, color='blue') ax3.set_title("The Correlation of ABD and XinXiao") ax4 = fig.add_subplot(224) ax4.plot(abd_nxx, color='blue') ax4.set_title("The Correlation of ABD and Reverse XinXiao") width, height = fig.figbbox.width, fig.figbbox.height fig.canvas.draw() # 返回图片以便存到QPixImage return canvas.buffer_rgba(), width, height def DrawPicRawOverview(self): fig = Figure(figsize=(8, 7), dpi=100) canvas = FigureCanvas(fig) max_x = max(self.processed_THO.shape[0], self.processed_ABD.shape[0], self.processed_XX.shape[0]) ax1 = fig.add_subplot(311) ax1.plot(self.processed_THO, color='blue') ax1.set_xlim(0, max_x) ax1.set_title("THO") ax2 = fig.add_subplot(312) ax2.plot(self.processed_XX, color='blue') ax2.set_xlim(0, max_x) ax2.set_title("xinxiao") ax3 = fig.add_subplot(313) ax3.plot(self.processed_ABD, color='blue') ax3.set_xlim(0, max_x) ax3.set_title("ABD") width, height = fig.figbbox.width, fig.figbbox.height fig.canvas.draw() # 返回图片以便存到QPixImage return canvas.buffer_rgba(), width, height def DrawPicOverviewWithCutOff(self): fig = Figure(figsize=(8, 7), dpi=100) canvas = FigureCanvas(fig) max_x = max(self.processed_THO.shape[0] + self.Config["PSGConfig"]["PreA"], self.processed_XX.shape[0] + self.Config["XXConfig"]["PreA"]) min_x = min(self.Config["PSGConfig"]["PreA"], self.Config["XXConfig"]["PreA"], 0) ax1 = fig.add_subplot(311) ax1.plot( np.linspace(self.Config["PSGConfig"]["PreA"], len(self.processed_THO) + self.Config["PSGConfig"]["PreA"], len(self.processed_THO)), self.processed_THO, color='blue') # 绘制x = PreCut的线 和 x = PostCut的虚线 ax1.axvline(x=self.Config["PSGConfig"]["PreCut"] + self.Config["PSGConfig"]["PreA"], color='red', linestyle='--') ax1.axvline(x=len(self.processed_THO) - self.Config["PSGConfig"]["PostCut"] + self.Config["PSGConfig"]["PreA"], color='red', linestyle='--') ax1.set_xlim(min_x, max_x) ax1.set_title("THO") ax2 = fig.add_subplot(312) ax2.plot(np.linspace(self.Config["XXConfig"]["PreA"], len(self.processed_XX) + self.Config["XXConfig"]["PreA"], len(self.processed_XX)), self.processed_XX, color='blue') ax2.axvline(x=self.Config["XXConfig"]["PreCut"] + self.Config["XXConfig"]["PreA"], color='red', linestyle='--') ax2.axvline(x=len(self.processed_XX) - self.Config["XXConfig"]["PostCut"] + self.Config["XXConfig"]["PreA"], color='red', linestyle='--') ax2.set_xlim(min_x, max_x) ax2.set_title("xinxiao") ax3 = fig.add_subplot(313) ax3.plot( np.linspace(self.Config["PSGConfig"]["PreA"], len(self.processed_ABD) + self.Config["PSGConfig"]["PreA"], len(self.processed_ABD)), self.processed_ABD, color='blue') ax3.axvline(x=self.Config["PSGConfig"]["PreCut"] + self.Config["PSGConfig"]["PreA"], color='red', linestyle='--') ax3.axvline(x=len(self.processed_THO) - self.Config["PSGConfig"]["PostCut"] + self.Config["PSGConfig"]["PreA"], color='red', linestyle='--') ax3.set_xlim(min_x, max_x) ax3.set_title("ABD") width, height = fig.figbbox.width, fig.figbbox.height fig.canvas.draw() # 返回图片以便存到QPixImage return canvas.buffer_rgba(), width, height def DrawPicTryAlign(self): fig = Figure(figsize=(8, 7), dpi=100) canvas = FigureCanvas(fig) max_x = max(self.processed_THO.shape[0], self.processed_XX.shape[0] + self.Config["pos"]) min_x = min(self.Config["PSGConfig"]["PreA"], self.Config["XXConfig"]["PreA"] + self.Config["pos"], 0) ax1 = fig.add_subplot(311) ax1.plot( np.linspace(0, len(self.processed_THO), len(self.processed_THO)), self.processed_THO, color='blue') # 绘制x = PreCut的线 和 x = PostCut的虚线 ax1.set_xlim(min_x, max_x) ax1.set_title("THO") ax2 = fig.add_subplot(312) ax2.plot(np.linspace(self.Config["pos"], len(self.processed_XX) + self.Config["pos"], len(self.processed_XX)), self.processed_XX, color='blue') ax2.set_xlim(min_x, max_x) ax2.set_title("xinxiao") ax3 = fig.add_subplot(313) ax3.plot( np.linspace(0, len(self.processed_ABD), len(self.processed_ABD)), self.processed_ABD, color='blue') ax3.set_xlim(min_x, max_x) ax3.set_title("ABD") width, height = fig.figbbox.width, fig.figbbox.height fig.canvas.draw() # 返回图片以便存到QPixImage return canvas.buffer_rgba(), width, height def DrawPicByEpoch(self, epoch): PSG_SP = epoch * 30 * self.Config["ApplyFrequency"] PSG_EP = (epoch + 6) * 30 * self.Config["ApplyFrequency"] XX_SP = PSG_SP - self.Config["pos"] XX_EP = PSG_EP - self.Config["pos"] tho_seg = self.processed_THO[PSG_SP:PSG_EP] xx_seg = self.processed_XX[XX_SP:XX_EP] * self.Config["XX_reverse"] abd_seg = self.processed_ABD[PSG_SP:PSG_EP] fig = Figure(figsize=(8, 7), dpi=100) canvas = FigureCanvas(fig) # 根据PSG来和绘制 ax1 = fig.add_subplot(321) ax1.plot(np.linspace(PSG_SP, PSG_EP, len(tho_seg)), tho_seg) tho_peaks, _ = signal.find_peaks(tho_seg, prominence=0, distance=3 * self.Config["ApplyFrequency"]) ax1.plot(np.linspace(PSG_SP, PSG_EP, len(tho_seg))[tho_peaks], tho_seg[tho_peaks], "x") ax3 = fig.add_subplot(323) ax3.plot(np.linspace(XX_SP, XX_EP, len(xx_seg)), xx_seg) xx_peaks, _ = signal.find_peaks(xx_seg, prominence=0, distance=3 * self.Config["ApplyFrequency"]) ax3.plot(np.linspace(XX_SP, XX_EP, len(xx_seg))[xx_peaks], xx_seg[xx_peaks], "x") ax2 = fig.add_subplot(325) ax2.plot(np.linspace(PSG_SP, PSG_EP, len(abd_seg)), abd_seg) abd_peaks, _ = signal.find_peaks(abd_seg, prominence=0, distance=3 * self.Config["ApplyFrequency"]) ax2.plot(np.linspace(PSG_SP, PSG_EP, len(abd_seg))[abd_peaks], abd_seg[abd_peaks], "x") # 绘制间期 ax4 = fig.add_subplot(322) ax4.plot(np.linspace(PSG_SP, PSG_EP, len(np.diff(tho_peaks).repeat(self.Config["ApplyFrequency"]))), np.diff(tho_peaks).repeat(self.Config["ApplyFrequency"]), alpha=0.5, label="tho") ax4.plot(np.linspace(PSG_SP, PSG_EP, len(np.diff(xx_peaks).repeat(self.Config["ApplyFrequency"]))), np.diff(xx_peaks).repeat(self.Config["ApplyFrequency"]), label="resp") ax4.set_title("tho_interval") ax4.legend() ax4.set_ylim((10, 50)) ax5 = fig.add_subplot(324) ax5.plot(np.linspace(XX_SP, XX_EP, len(np.diff(tho_peaks).repeat(self.Config["ApplyFrequency"]))), np.diff(tho_peaks).repeat(self.Config["ApplyFrequency"])) ax5.plot(np.linspace(XX_SP, XX_EP, len(np.diff(xx_peaks).repeat(self.Config["ApplyFrequency"]))), np.diff(xx_peaks).repeat(self.Config["ApplyFrequency"]), label="resp") ax5.set_title("resp_interval") ax5.set_ylim((10, 50)) ax6 = fig.add_subplot(326) ax6.plot(np.linspace(PSG_SP, PSG_EP, len(np.diff(abd_peaks).repeat(self.Config["ApplyFrequency"]))), np.diff(abd_peaks).repeat(self.Config["ApplyFrequency"])) ax6.plot(np.linspace(PSG_SP, PSG_EP, len(np.diff(xx_peaks).repeat(self.Config["ApplyFrequency"]))), np.diff(xx_peaks).repeat(self.Config["ApplyFrequency"]), label="resp") ax6.set_title("abd_interval") ax6.set_ylim((10, 50)) width, height = fig.figbbox.width, fig.figbbox.height fig.canvas.draw() # 返回图片以便存到QPixImage return canvas.buffer_rgba(), width, height @njit("int64[:](int64[:],int64[:], int32[:])", nogil=True, parallel=True) def get_Correlate(a, v, between): begin, end = between if end == 0: end = len(a) - len(v) - 1 result = np.empty(end - begin, dtype=np.int64) for i in prange(end - begin): result[i] = np.sum(a[begin + i: begin + i + len(v)] * v) return result get_Correlate(np.array([0, 0, 0, 0, 1, 2, 3, 4, 5, 0, 0, 0, 0], dtype=np.int64), np.array([1, 2, 3, 4, 5], dtype=np.int64), between=np.array([0, 0])) class MainWindow(QMainWindow): def __init__(self): super(MainWindow, self).__init__() self.ui = Ui_respCoarseAlign() self.setting = SettingWindow() self.ui.setupUi(self) self.ui.actionDefault_Configuration.triggered.connect(self.setting.show) # checkbox custom 和SpinBox 互斥 self.ui.checkBox_custom.stateChanged.connect(self.__customChannel__) self.__disableAllButton__() self.ui.pushButton_Refresh.setEnabled(True) self.ui.pushButton_OpenFile.setEnabled(True) self.ui.checkBox_NABD.setEnabled(False) self.ui.checkBox_NTHO.setEnabled(False) self.ui.checkBox_PABD.setEnabled(False) self.ui.checkBox_PTHO.setEnabled(False) self.ui.checkBox_custom.setEnabled(False) # 绑定事件 # 刷新键分别获取PSG和XX文件夹里面的数据,获取所有编号显示在下拉框,比对编号同时存在的可选,仅存在一个文件夹的编号不可选 self.ui.pushButton_Refresh.clicked.connect(self.__refresh__) self.ui.pushButton_OpenFile.clicked.connect(self.__openFile__) self.ui.pushButton_Standardize.clicked.connect(self.__standardize__) self.ui.pushButton_CutOff.clicked.connect(self.__cutOff__) self.ui.pushButton_GetPos.clicked.connect(self.__getPosition__) self.ui.pushButton_JUMP.clicked.connect(self.__jump__) # spin_custom 按下回车绑定事件 self.ui.spinBox_SelectEpoch.editingFinished.connect(self.__jump__) self.ui.pushButton_EM1.clicked.connect(self.__EpochChange__) self.ui.pushButton_EM10.clicked.connect(self.__EpochChange__) self.ui.pushButton_EM100.clicked.connect(self.__EpochChange__) self.ui.pushButton_EP1.clicked.connect(self.__EpochChange__) self.ui.pushButton_EP10.clicked.connect(self.__EpochChange__) self.ui.pushButton_EP100.clicked.connect(self.__EpochChange__) self.ui.pushButton_SaveInfo.clicked.connect(self.__saveInfo__) self.ui.pushButton_ReadInfo.clicked.connect(self.__readInfo__) self.ui.pushButton_Exit.clicked.connect(self.__exit__) # 按下checkbox_NTHO、PTHO、 PABD、NABD后启用pushbutton_Align self.ui.checkBox_NTHO.stateChanged.connect(self.__enableAlign__) self.ui.checkBox_PTHO.stateChanged.connect(self.__enableAlign__) self.ui.checkBox_PABD.stateChanged.connect(self.__enableAlign__) self.ui.checkBox_NABD.stateChanged.connect(self.__enableAlign__) self.ui.spinBox_custom.editingFinished.connect(self.__enableAlign__) def __readInfo__(self): """ 读取信息 判断是否存在,不存在不读取并提示 :return: """ sampNo = self.ui.comboBox_SelectFile.currentText() if Path("./RespCoarseAlignInfo.csv").exists(): df = pd.read_csv("./RespCoarseAlignInfo.csv") # print(df) df["sampNo"] = df["sampNo"].astype(str) df = df[df["sampNo"] == sampNo] # print(df) if len(df) == 0: QMessageBox.warning(self, "警告", f"记录中无编号{sampNo},请计算对齐") return # 去最后一个 PSG_SampRate = df["PSG_SampRate"].values[-1] XX_SampRate = df["XX_SampRate"].values[-1] pos = df["pos"].values[-1] epoch = df["epoch"].values[-1] # 将信息写入界面 self.ui.spinBox_PSGFreq.setValue(PSG_SampRate) self.ui.spinBox_XXFreq.setValue(XX_SampRate) self.ui.spinBox_SelectEpoch.setValue(epoch) self.ui.spinBox_custom.setValue(pos) if pos > 0: self.ui.spinBox_XXPreA.setValue(pos) else: self.ui.spinBox_PSGPreA.setValue(-pos) # 加载到Conf中 self.data.Config["PSGConfig"].update({"PreA": self.ui.spinBox_PSGPreA.value(), "PreCut": self.ui.spinBox_PSGPreCut.value(), "PostCut": self.ui.spinBox_PSGPostCut.value()}) self.data.Config["XXConfig"].update({"PreA": self.ui.spinBox_XXPreA.value(), "PreCut": self.ui.spinBox_XXPreCut.value(), "PostCut": self.ui.spinBox_XXPostCut.value()}) relate = pos reverse = 1 self.data.Config["XX_reverse"] = reverse self.data.Config["pos"] = relate epoch_min = max(0, relate // 30 // self.data.Config["ApplyFrequency"] + 1) epoch_max = min(len(self.data.processed_THO) // 30 // self.data.Config["ApplyFrequency"] - 1, (len(self.data.processed_XX) - relate) // 30 // self.data.Config["ApplyFrequency"] - 1) self.ui.spinBox_SelectEpoch.setMinimum(epoch_min) self.ui.spinBox_SelectEpoch.setMaximum(epoch_max) self.ui.spinBox_SelectEpoch.setValue(epoch_min) # set tooltip self.ui.spinBox_SelectEpoch.setToolTip(f"最小值:{epoch_min}\n最大值:{epoch_max}") self.ui.spinBox_SelectEpoch.setEnabled(True) # 执行按下checkBox_custom self.ui.checkBox_custom.setChecked(True) ButtonState["Current"]["pushButton_JUMP"] = True ButtonState["Current"]["pushButton_EM1"] = True ButtonState["Current"]["pushButton_EM10"] = True ButtonState["Current"]["pushButton_EM100"] = True ButtonState["Current"]["pushButton_EP1"] = True ButtonState["Current"]["pushButton_EP10"] = True ButtonState["Current"]["pushButton_EP100"] = True self.__enableAllButton__() else: QMessageBox.warning(self, "警告", f"没有保存记录,请检查当前路径是否有RespCoarseAlignInfo.csv") def __saveInfo__(self): """ 保存信息 编号、PSG采样率、XinXiao采样率、 相差位置pos、观测Epoch、保存时间 使用dataframe 保存到 csv中 首先判断csv是否存在,不存在则新建,存在则追加 :return: """ sampNo = self.ui.comboBox_SelectFile.currentText() PSG_SampRate = self.ui.spinBox_PSGFreq.value() XX_SampRate = self.ui.spinBox_XXFreq.value() pos = self.data.Config["pos"] epoch = self.ui.spinBox_SelectEpoch.value() save_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 保存到csv中 df = pd.DataFrame({"sampNo": [sampNo], "PSG_SampRate": [PSG_SampRate], "XX_SampRate": [XX_SampRate], "pos": [pos], "epoch": [epoch], "save_time": [save_time]}) if Path("./RespCoarseAlignInfo.csv").exists(): df.to_csv("./RespCoarseAlignInfo.csv", mode="a", header=False, index=False) else: df.to_csv("./RespCoarseAlignInfo.csv", mode="w", header=True, index=False) # noinspection PyUnresolvedReferences QMessageBox.information(self, "提示", "保存成功", QMessageBox.Ok) def __EpochChange__(self): # 获取当前值 value = self.ui.spinBox_SelectEpoch.value() # print(self.sender()) # print(self.sender() == self.ui.pushButton_EM1) # 判断按键 if self.sender() == self.ui.pushButton_EM1: epoch = max(self.ui.spinBox_SelectEpoch.minimum(), value - 1) elif self.sender() == self.ui.pushButton_EM10: epoch = max(self.ui.spinBox_SelectEpoch.minimum(), value - 10) elif self.sender() == self.ui.pushButton_EM100: epoch = max(self.ui.spinBox_SelectEpoch.minimum(), value - 100) elif self.sender() == self.ui.pushButton_EP1: epoch = min(self.ui.spinBox_SelectEpoch.maximum(), value + 1) elif self.sender() == self.ui.pushButton_EP10: epoch = min(self.ui.spinBox_SelectEpoch.maximum(), value + 10) elif self.sender() == self.ui.pushButton_EP100: epoch = min(self.ui.spinBox_SelectEpoch.maximum(), value + 100) else: return # print(epoch) self.ui.spinBox_SelectEpoch.setValue(epoch) QApplication.processEvents() self.__jump__() def __jump__(self): self.__disableAllButton__() self.ui.label_Info.setText("开始绘制···") self.ui.progressBar.setValue(0) # 获取选择的Epoch epoch = self.ui.spinBox_SelectEpoch.value() # 绘制图像 self.__plot__(epoch=epoch) self.ui.progressBar.setValue(100) self.ui.label_Info.setText(f"Epoch: {epoch}, 绘制完成") self.__enableAllButton__() def __enableAlign__(self): self.__disableAllButton__() self.ui.label_Info.setText("绘制对齐图像···") self.ui.progressBar.setValue(0) QApplication.processEvents() if self.ui.checkBox_NTHO.isChecked(): relate = int(self.ui.checkBox_NTHO.text()) reverse = -1 elif self.ui.checkBox_PTHO.isChecked(): relate = int(self.ui.checkBox_PTHO.text()) reverse = 1 elif self.ui.checkBox_PABD.isChecked(): relate = int(self.ui.checkBox_PABD.text()) reverse = 1 elif self.ui.checkBox_NABD.isChecked(): relate = int(self.ui.checkBox_NABD.text()) reverse = -1 elif self.ui.checkBox_custom.isChecked(): relate = self.ui.spinBox_custom.value() reverse = 1 else: return # 最大相关系数值相对于PSG的位置 self.data.Config["pos"] = relate self.__plot__() self.ui.label_Info.setText("相对位置:{}".format(relate)) # 设置epoch上下限 self.data.Config["XX_reverse"] = reverse epoch_min = max(0, relate // 30 // self.data.Config["ApplyFrequency"] + 1) epoch_max = min(len(self.data.processed_THO) // 30 // self.data.Config["ApplyFrequency"] - 1, (len(self.data.processed_XX) + relate) // 30 // self.data.Config["ApplyFrequency"] - 1) self.ui.spinBox_SelectEpoch.setMinimum(epoch_min) self.ui.spinBox_SelectEpoch.setMaximum(epoch_max) self.ui.spinBox_SelectEpoch.setValue(epoch_min) # set tooltip self.ui.spinBox_SelectEpoch.setToolTip(f"最小值:{epoch_min}\n最大值:{epoch_max}") self.ui.spinBox_SelectEpoch.setEnabled(True) ButtonState["Current"]["pushButton_JUMP"] = True ButtonState["Current"]["pushButton_EM1"] = True ButtonState["Current"]["pushButton_EM10"] = True ButtonState["Current"]["pushButton_EM100"] = True ButtonState["Current"]["pushButton_EP1"] = True ButtonState["Current"]["pushButton_EP10"] = True ButtonState["Current"]["pushButton_EP100"] = True ButtonState["Current"]["pushButton_SaveInfo"] = True self.__enableAllButton__() def __getPosition__(self): # 根据截断选择,对信号进行互相关操作,得到最大值的位置 # 计算互相关 self.ui.progressBar.setValue(0) self.ui.label_Info.setText("计算互相关1/2...") self.__disableAllButton__() QApplication.processEvents() a = self.data.processed_THO[ self.data.Config["PSGConfig"]["PreCut"]:len(self.data.processed_THO) - self.data.Config["PSGConfig"][ "PostCut"]].copy() v = self.data.processed_XX[ self.data.Config["XXConfig"]["PreCut"]:len(self.data.processed_XX) - self.data.Config["XXConfig"][ "PostCut"]].copy() a *= 100 v *= 100 a = a.astype(np.int64) v = v.astype(np.int64) a = np.pad(a, (len(v) - 1, len(v) - 1), mode='constant') tho_relate = np.zeros(len(a) - len(v) - 1, dtype=np.int64) len_calc = len(a) - len(v) - 1 # 将序列分成一百份来计算 for i in range(100): tho_relate[i * len_calc // 100:(i + 1) * len_calc // 100] = get_Correlate(a, v, np.array( [i * len_calc // 100, (i + 1) * len_calc // 100])) self.ui.progressBar.setValue(i) QApplication.processEvents() tho_relate = tho_relate / 10000 tho_relate2 = - tho_relate self.ui.progressBar.setValue(0) self.ui.label_Info.setText("计算互相关2/2...") QApplication.processEvents() a = self.data.processed_ABD[ self.data.Config["PSGConfig"]["PreCut"]:len(self.data.processed_ABD) - self.data.Config["PSGConfig"][ "PostCut"]].copy() v = self.data.processed_XX[ self.data.Config["XXConfig"]["PreCut"]:len(self.data.processed_XX) - self.data.Config["XXConfig"][ "PostCut"]].copy() a *= 100 v *= 100 a = a.astype(np.int64) v = v.astype(np.int64) a = np.pad(a, (len(v) - 1, len(v) - 1), mode='constant') abd_relate = np.zeros(len(a) - len(v) - 1, dtype=np.int64) len_calc = len(a) - len(v) - 1 # 将序列分成一百份来计算 for i in range(100): abd_relate[i * len_calc // 100:(i + 1) * len_calc // 100] = get_Correlate(a, v, np.array( [i * len_calc // 100, (i + 1) * len_calc // 100])) self.ui.progressBar.setValue(i) QApplication.processEvents() abd_relate = abd_relate / 10000 abd_relate2 = - abd_relate self.ui.progressBar.setValue(80) self.ui.label_Info.setText("绘制相关系数曲线...") QApplication.processEvents() self.__plot__(tho_relate, tho_relate2, abd_relate, abd_relate2) self.ui.progressBar.setValue(90) self.ui.label_Info.setText("计算最大值位置...") QApplication.processEvents() # 计算最大值位置 tho_max = np.argmax(tho_relate) tho_max2 = np.argmax(tho_relate2) abd_max = np.argmax(abd_relate) abd_max2 = np.argmax(abd_relate2) pre = self.data.Config["PSGConfig"]["PreCut"] + self.data.Config["XXConfig"]["PostCut"] bias = pre - len(self.data.processed_XX) + 1 self.ui.checkBox_PABD.setText(str(abd_max + bias)) self.ui.checkBox_PTHO.setText(str(tho_max + bias)) self.ui.checkBox_NABD.setText(str(abd_max2 + bias)) self.ui.checkBox_NTHO.setText(str(tho_max2 + bias)) self.ui.checkBox_PABD.setEnabled(True) self.ui.checkBox_PTHO.setEnabled(True) self.ui.checkBox_NABD.setEnabled(True) self.ui.checkBox_NTHO.setEnabled(True) self.ui.checkBox_custom.setEnabled(True) # print(abd_max2) self.ui.progressBar.setValue(100) self.ui.label_Info.setText("计算完成") self.__enableAllButton__() def __reset__(self): ButtonState["Current"].update(ButtonState["Default"].copy()) ButtonState["Current"]["pushButton_Standardize"] = True self.ui.spinBox_PSGPreA.setValue(0) self.ui.spinBox_PSGPreCut.setValue(0) self.ui.spinBox_PSGPostCut.setValue(0) self.ui.spinBox_XXPreA.setValue(0) self.ui.spinBox_XXPreCut.setValue(0) self.ui.spinBox_XXPostCut.setValue(0) self.ui.checkBox_NABD.setChecked(False) self.ui.checkBox_NTHO.setChecked(False) self.ui.checkBox_PABD.setChecked(False) self.ui.checkBox_PTHO.setChecked(False) self.ui.checkBox_custom.setChecked(False) self.ui.spinBox_SelectEpoch.setValue(0) self.ui.spinBox_custom.setValue(0) self.ui.spinBox_custom.setEnabled(False) self.ui.spinBox_SelectEpoch.setToolTip("") self.ui.checkBox_PABD.setText("备选3") self.ui.checkBox_PTHO.setText("备选1") self.ui.checkBox_NABD.setText("备选4") self.ui.checkBox_NTHO.setText("备选2") self.ui.checkBox_PABD.setEnabled(False) self.ui.checkBox_PTHO.setEnabled(False) self.ui.checkBox_NABD.setEnabled(False) self.ui.checkBox_NTHO.setEnabled(False) self.ui.checkBox_custom.setEnabled(False) self.ui.spinBox_SelectEpoch.setMinimum(0) def __cutOff__(self): self.ui.label_Info.setText("开始绘制···") self.__disableAllButton__() Conf2 = self.data.Config.copy() self.ui.progressBar.setValue(20) QApplication.processEvents() Conf2["PSGConfig"].update({"PreA": self.ui.spinBox_PSGPreA.value(), "PreCut": self.ui.spinBox_PSGPreCut.value(), "PostCut": self.ui.spinBox_PSGPostCut.value()}) Conf2["XXConfig"].update({"PreA": self.ui.spinBox_XXPreA.value(), "PreCut": self.ui.spinBox_XXPreCut.value(), "PostCut": self.ui.spinBox_XXPostCut.value()}) self.data.Config = Conf2 self.__plot__() self.ui.progressBar.setValue(100) self.ui.label_Info.setText("绘制完成") ButtonState["Current"]["pushButton_GetPos"] = True self.__enableAllButton__() # matplotlib 绘制图像 def __plot__(self, *args, **kwargs): # 判读是哪个按钮点击调用的本程序 if self.sender() == self.ui.pushButton_Standardize: buffer, width, height = self.data.DrawPicRawOverview() elif self.sender() == self.ui.pushButton_CutOff: buffer, width, height = self.data.DrawPicOverviewWithCutOff() elif self.sender() == self.ui.pushButton_GetPos: buffer, width, height = self.data.DrawPicCorrelate(*args, **kwargs) elif self.sender() == self.ui.checkBox_NTHO: buffer, width, height = self.data.DrawPicTryAlign() elif self.sender() == self.ui.checkBox_NABD: buffer, width, height = self.data.DrawPicTryAlign() elif self.sender() == self.ui.checkBox_PTHO: buffer, width, height = self.data.DrawPicTryAlign() elif self.sender() == self.ui.checkBox_PABD: buffer, width, height = self.data.DrawPicTryAlign() elif self.sender() == self.ui.spinBox_custom: buffer, width, height = self.data.DrawPicTryAlign() elif self.sender() == self.ui.pushButton_JUMP: buffer, width, height = self.data.DrawPicByEpoch(*args, **kwargs) elif self.sender() == self.ui.pushButton_EM1: buffer, width, height = self.data.DrawPicByEpoch(*args, **kwargs) elif self.sender() == self.ui.pushButton_EM10: buffer, width, height = self.data.DrawPicByEpoch(*args, **kwargs) elif self.sender() == self.ui.pushButton_EM100: buffer, width, height = self.data.DrawPicByEpoch(*args, **kwargs) elif self.sender() == self.ui.pushButton_EP1: buffer, width, height = self.data.DrawPicByEpoch(*args, **kwargs) elif self.sender() == self.ui.pushButton_EP10: buffer, width, height = self.data.DrawPicByEpoch(*args, **kwargs) elif self.sender() == self.ui.pushButton_EP100: buffer, width, height = self.data.DrawPicByEpoch(*args, **kwargs) else: return # 显示到labelPic上 img = QImage(buffer, width, height, QImage.Format_RGBA8888) self.ui.label_Pic.setPixmap(QPixmap(img)) # noinspection PyUnresolvedReferences def __exit__(self): # 选择是否确认退出 reply = QMessageBox.question(self, '确认', '确认退出吗?', QMessageBox.Yes | QMessageBox.No, QMessageBox.No) if reply == QMessageBox.Yes: self.close() def __customChannel__(self, state): if state == 2: self.ui.spinBox_custom.setEnabled(True) else: self.ui.spinBox_custom.setEnabled(False) # 刷新键分别获取PSG和XX文件夹里面的数据,获取所有编号显示在下拉框,比对编号同时存在的可选,仅存在一个文件夹的编号不可选 def __refresh__(self): # 检查两文件夹是否存在 PSGPath = Path(self.setting.ui.lineEdit_PSGFilePath.text()) XXPath = Path(self.setting.ui.lineEdit_XXFilePath.text()) if not PSGPath.exists(): QMessageBox.warning(self, "警告", "PSG文件夹不存在") return if not XXPath.exists(): QMessageBox.warning(self, "警告", "XX文件夹不存在") return # 获取两文件夹下所有的txt和npy文件编号 PSGFiles = [file.stem for file in PSGPath.glob("*.edf")] XXFiles = [file.stem for file in XXPath.glob("*.txt")] + [file.stem for file in XXPath.glob("*.npy")] PSGFiles = sorted(PSGFiles) XXFiles = sorted(XXFiles) # 获取两文件夹下同时存在的编号 # print(PSGFiles, XXFiles) Files = list(set(PSGFiles).intersection(set(XXFiles))) Files = sorted(Files) # 获取两文件夹下仅存在一个的编号 FilesOnlyInPSG = list(set(PSGFiles).difference(set(XXFiles))) FilesOnlyInXX = list(set(XXFiles).difference(set(PSGFiles))) # print(Files, FilesOnlyInPSG, FilesOnlyInXX) # 均显示到下拉框 self.ui.comboBox_SelectFile.clear() self.ui.comboBox_SelectFile.addItems([file for file in Files]) self.ui.comboBox_SelectFile.addItems([file + " (仅PSG)" for file in FilesOnlyInPSG]) self.ui.comboBox_SelectFile.addItems([file + " (仅XX)" for file in FilesOnlyInXX]) self.ui.comboBox_SelectFile.setCurrentIndex(0) # # 仅存在一个文件夹的编号不可选 for file in FilesOnlyInPSG: self.ui.comboBox_SelectFile.model().item(FilesOnlyInPSG.index(file) + len(Files)).setEnabled(False) for file in FilesOnlyInXX: self.ui.comboBox_SelectFile.model().item( FilesOnlyInXX.index(file) + len(Files) + len(FilesOnlyInPSG)).setEnabled(False) def __disableAllButton__(self): # 禁用所有按钮 all_widgets = self.centralWidget().findChildren(QWidget) # 迭代所有部件,查找按钮并禁用它们 for widget in all_widgets: if isinstance(widget, QPushButton): if widget.objectName() in ButtonState["Current"].keys(): widget.setEnabled(False) def __enableAllButton__(self): # 启用按钮 all_widgets = self.centralWidget().findChildren(QWidget) # 迭代所有部件,查找按钮并启用它们 for widget in all_widgets: if isinstance(widget, QPushButton): if widget.objectName() in ButtonState["Current"].keys(): widget.setEnabled(ButtonState["Current"][widget.objectName()]) def __openFile__(self): self.ui.label_Info.setText("正在打开文件...") self.__disableAllButton__() # 长时间操作,刷新界面防止卡顿 QApplication.processEvents() # 获取checkbox状态 self.data = Data(Path(self.setting.ui.lineEdit_PSGFilePath.text()), Path(self.setting.ui.lineEdit_XXFilePath.text()), self.ui.comboBox_SelectFile.currentText().split(" ")[0], Conf) opened, info = self.data.OpenFile() if not opened: QMessageBox.warning(self, "警告", info) self.ui.label_Info.setText(info) else: self.ui.label_PSGmins.setText(str(self.data.PSG_minutes)) self.ui.label_XXmins.setText(str(self.data.XX_minutes)) self.ui.progressBar.setValue(100) self.ui.label_Info.setText(info) self.ui.label_Pic.setText("读取成功") self.__reset__() self.__enableAllButton__() def __standardize__(self): self.ui.progressBar.setValue(0) self.ui.label_Info.setText("正在标准化...") self.__disableAllButton__() QApplication.processEvents() Conf2 = self.data.Config.copy() Conf2["RawSignal"] = self.ui.checkBox_RawSignal.isChecked() Conf2["PSGConfig"].update({ "PSGDelBase": self.ui.checkBox_PSGDelBase.isChecked(), "PSGZScore": self.ui.checkBox_PSGZScore.isChecked(), "Frequency": self.ui.spinBox_PSGFreq.value(), }) Conf2["XXConfig"].update({ "XXDelBase": self.ui.checkBox_XXDelBase.isChecked(), "XXZScore": self.ui.checkBox_XXZScore.isChecked(), "Frequency": self.ui.spinBox_XXFreq.value(), }) self.data.Config = Conf2 if self.data.Config["RawSignal"]: opened, info = self.data.Standardize_0() if not opened: QMessageBox.warning(self, "警告", info) self.ui.label_Info.setText(info) else: self.ui.progressBar.setValue(100) self.ui.label_Info.setText(info) else: self.ui.label_Info.setText('正在进行呼吸提取...') QApplication.processEvents() opened, info = self.data.Standardize_1() self.ui.label_Info.setText(info) self.ui.progressBar.setValue(10) QApplication.processEvents() opened, info = self.data.Standardize_2() self.ui.progressBar.setValue(30) self.ui.label_Info.setText(info) QApplication.processEvents() opened, info = self.data.Standardize_3() self.ui.label_Info.setText(info) self.ui.progressBar.setValue(50) QApplication.processEvents() opened, info = self.data.Standardize_4() self.ui.label_Info.setText(info) self.ui.progressBar.setValue(70) QApplication.processEvents() opened, info = self.data.Standardize_5() self.ui.label_Info.setText(info) self.ui.progressBar.setValue(90) QApplication.processEvents() self.__plot__() self.ui.progressBar.setValue(100) ButtonState["Current"]["pushButton_CutOff"] = True ButtonState["Current"]["pushButton_ReadInfo"] = True self.__enableAllButton__() if __name__ == "__main__": app = QApplication(sys.argv) window = MainWindow() window.show() sys.exit(app.exec())