RespCoarseAlignment/RespCoarseAlign.py
2023-09-20 13:25:59 +08:00

318 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/python
# -*- coding: UTF-8 -*-
"""
@author:andrew
@file:RespCoarseAlign.py
@email:admin@marques22.com
@email:2021022362@m.scnu.edu.cn
@time:2023/09/20
"""
import sys
from pathlib import Path
import mne
import numpy as np
import pandas as pd
from PySide6.QtWidgets import QApplication, QMainWindow, QFileDialog, QMessageBox
from PySide6.QtCore import QFile
from scipy import signal
from scipy.signal import butter
from ui.Mian import Ui_mainWindow as Ui_respCoarseAlign
from ui.setings import Ui_MainWindow as Ui_Setting
import yaml
Conf = {
"PSGConfig": {
"Path": "./Data/PSG/",
"Frequency": 100,
"THOChannel": {
"auto": True,
"Channel": 3,
},
"ABDChannel": {
"auto": True,
"Channel": 4,
},
},
"XXConfig": {
"Path": "./Data/XX/",
"Frequency": 100,
},
"RespFilterConfig": {
"LowPass": 0.01,
"HighPass": 0.7,
"order": 4
},
"ApplyFrequency": 5
}
class SettingWindow(QMainWindow):
def __init__(self):
super(SettingWindow, self).__init__()
self.ui = Ui_Setting()
self.ui.setupUi(self)
self.__read_settings__()
def __read_settings__(self):
with open("./config.yaml", "r") as f:
fileConfig = yaml.load(f.read(), Loader=yaml.FullLoader)
Conf.update(fileConfig)
# print(Conf)
self.ui.lineEdit_PSGFilePath.setText(Conf["PSGConfig"]["Path"])
self.ui.lineEdit_XXFilePath.setText(Conf["XXConfig"]["Path"])
self.ui.spinBox_PSGDefaultFreq.setValue(Conf["PSGConfig"]["Frequency"])
self.ui.spinBox_XXDefaultFreq.setValue(Conf["XXConfig"]["Frequency"])
self.ui.QSpinBox_ApplyFre.setValue(Conf["ApplyFrequency"])
autoTHO = Conf["PSGConfig"]["THOChannel"]["auto"]
self.ui.checkBox_THOautoChannel.setChecked(2 if autoTHO else 0)
autoABD = Conf["PSGConfig"]["ABDChannel"]["auto"]
self.ui.checkBox_ABDautoChannel.setChecked(2 if autoABD else 0)
self.ui.spinBox_THOcustomChannel.setValue(Conf["PSGConfig"]["THOChannel"]["Channel"])
self.ui.spinBox_ABDcustomChannel.setValue(Conf["PSGConfig"]["ABDChannel"]["Channel"])
self.ui.doubleSpinBox_ButterLowPassFreq.setValue(Conf["RespFilterConfig"]["LowCut"])
self.ui.doubleSpinBox_ButterHighPassFreq.setValue(Conf["RespFilterConfig"]["HighCut"])
self.ui.spinBox_ButterOrder.setValue(Conf["RespFilterConfig"]["Order"])
self.ui.spinBox_THOcustomChannel.setEnabled(not autoTHO)
self.ui.spinBox_ABDcustomChannel.setEnabled(not autoABD)
# 绑定事件
self.ui.toolButton_PSGFilePath.clicked.connect(self.__select_file__)
self.ui.toolButton_XXFilePath.clicked.connect(self.__select_file__)
self.ui.pushButton_SaveConfig.clicked.connect(self.__write_settings__)
self.ui.pushButton_Cancel.clicked.connect(self.close)
# ABD auto checkbox和SpinBox 互斥
self.ui.checkBox_ABDautoChannel.stateChanged.connect(self.__ABDAutoChannel__)
self.ui.checkBox_THOautoChannel.stateChanged.connect(self.__THOAutoChannel__)
def __ABDAutoChannel__(self, state):
if state == 2:
self.ui.spinBox_ABDcustomChannel.setEnabled(False)
else:
self.ui.spinBox_ABDcustomChannel.setEnabled(True)
def __THOAutoChannel__(self, state):
if state == 2:
self.ui.spinBox_THOcustomChannel.setEnabled(False)
else:
self.ui.spinBox_THOcustomChannel.setEnabled(True)
def __select_file__(self, event):
sender = self.sender()
if sender.objectName() == "toolButton_PSGFilePath":
path = QFileDialog.getExistingDirectory(self, "选择PSG数据文件夹", "./Data/PSG/")
self.ui.lineEdit_PSGFilePath.setText(path)
elif sender.objectName() == "toolButton_XXFilePath":
path = QFileDialog.getExistingDirectory(self, "选择XX数据文件夹", "./Data/XX/")
self.ui.lineEdit_XXFilePath.setText(path)
def __write_settings__(self):
# 从界面读取配置
Conf["PSGConfig"]["Path"] = self.ui.lineEdit_PSGFilePath.text()
Conf["XXConfig"]["Path"] = self.ui.lineEdit_XXFilePath.text()
Conf["PSGConfig"]["Frequency"] = self.ui.spinBox_PSGDefaultFreq.value()
Conf["XXConfig"]["Frequency"] = self.ui.spinBox_XXDefaultFreq.value()
Conf["ApplyFrequency"] = self.ui.QSpinBox_ApplyFre.value()
Conf["PSGConfig"]["THOChannel"]["auto"] = self.ui.checkBox_THOautoChannel.isChecked()
Conf["PSGConfig"]["ABDChannel"]["auto"] = self.ui.checkBox_ABDautoChannel.isChecked()
Conf["PSGConfig"]["THOChannel"]["Channel"] = self.ui.spinBox_THOcustomChannel.value()
Conf["PSGConfig"]["ABDChannel"]["Channel"] = self.ui.spinBox_ABDcustomChannel.value()
Conf["RespFilterConfig"]["LowCut"] = self.ui.doubleSpinBox_ButterLowPassFreq.value()
Conf["RespFilterConfig"]["HighCut"] = self.ui.doubleSpinBox_ButterHighPassFreq.value()
Conf["RespFilterConfig"]["Order"] = self.ui.spinBox_ButterOrder.value()
with open("./config.yaml", "w") as f:
yaml.dump(Conf, f)
self.close()
class Data:
def __init__(self, PSGDataPath, XXDataPath, sampNo, Config):
self.PSGDataPath = PSGDataPath / f"{sampNo}.edf"
if (XXDataPath / f"{sampNo}.npy").exists():
self.XXDataPath = XXDataPath / f"{sampNo}.npy"
elif (XXDataPath / f"{sampNo}.txt").exists():
self.XXDataPath = XXDataPath / f"{sampNo}.txt"
else:
self.XXDataPath = None
self.Config = Config
self.raw_THO = None
self.raw_ABD = None
self.raw_XX = None
self.PSG_minutes = None
self.XX_minutes = None
def OpenFile(self):
# 判断是edf还是npy或txt
if self.PSGDataPath.suffix == ".edf":
with mne.io.read_raw_edf(self.PSGDataPath) as PSG:
# read THO ABD
if self.Config["PSGConfig"]["THOChannel"]["auto"]:
self.raw_THO = PSG.pick_channels(["THO"]).get_data()[0]
else:
self.raw_THO = PSG.pick_channels([self.Config["PSGConfig"]["THOChannel"]["Channel"]]).get_data()[0]
if self.Config["PSGConfig"]["ABDChannel"]["auto"]:
self.raw_ABD = PSG.pick_channels(["ABD"]).get_data()[0]
else:
self.raw_ABD = PSG.pick_channels([self.Config["PSGConfig"]["ABDChannel"]["Channel"]]).get_data()[0]
else:
return False, "PSG文件格式错误"
if self.XXDataPath.suffix == ".npy":
self.raw_XX = np.load(self.XXDataPath)
elif self.XXDataPath.suffix == ".txt":
self.raw_XX = pd.read_csv(self.XXDataPath, sep="\t", header=None).values
else:
return False, "XX文件格式错误"
# 获取时长
self.PSG_minutes = self.raw_THO.shape[0] / self.Config["PSGConfig"]["Frequency"] / 60
self.XX_minutes = self.raw_XX.shape[0] / self.Config["XXConfig"]["Frequency"] / 60
return True, "读取成功"
def __Filter__(self):
def butter_bandpass_filter(data, lowCut, highCut, fs, order):
low = lowCut / (fs * 0.5)
high = highCut / (fs * 0.5)
sos = signal.butter(order, [low, high], btype="bandpass", output='sos')
return signal.sosfilt(sos, data)
# 滤波
self.raw_THO = butter_bandpass_filter(self.raw_THO, self.Config["RespFilterConfig"]["LowCut"],
self.Config["RespFilterConfig"]["HighCut"],
self.Config["PSGConfig"]["Frequency"],
self.Config["RespFilterConfig"]["Order"])
self.raw_ABD = butter_bandpass_filter(self.raw_ABD, self.Config["RespFilterConfig"]["LowCut"],
self.Config["RespFilterConfig"]["HighCut"],
self.Config["PSGConfig"]["Frequency"],
self.Config["RespFilterConfig"]["Order"])
self.raw_XX = butter_bandpass_filter(self.raw_XX, self.Config["RespFilterConfig"]["LowCut"],
self.Config["RespFilterConfig"]["HighCut"],
self.Config["XXConfig"]["Frequency"],
self.Config["RespFilterConfig"]["Order"])
def Standardize(self):
if self.Config["RawSignal"]:
# 重采样
self.raw_THO = signal.resample(self.raw_THO, int(self.PSG_minutes * 60 * self.Config["ApplyFrequency"]))
self.raw_ABD = signal.resample(self.raw_ABD, int(self.PSG_minutes * 60 * self.Config["ApplyFrequency"]))
self.raw_XX = signal.resample(self.raw_XX, int(self.XX_minutes * 60 * self.Config["ApplyFrequency"]))
else:
# 滤波
self.__Filter__()
# 重采样
self.raw_THO = signal.resample(self.raw_THO, int(self.PSG_minutes * 60 * self.Config["ApplyFrequency"]))
self.raw_ABD = signal.resample(self.raw_ABD, int(self.PSG_minutes * 60 * self.Config["ApplyFrequency"]))
self.raw_XX = signal.resample(self.raw_XX, int(self.XX_minutes * 60 * self.Config["ApplyFrequency"]))
# 判断是否去基线
# 判断是否标准化
class MainWindow(QMainWindow):
def __init__(self):
super(MainWindow, self).__init__()
self.ui = Ui_respCoarseAlign()
self.setting = SettingWindow()
self.ui.setupUi(self)
self.ui.actionDefault_Configuration.triggered.connect(self.setting.show)
# checkbox custom 和SpinBox 互斥
self.ui.checkBox_custom.stateChanged.connect(self.__customChannel__)
# 绑定事件
# 刷新键分别获取PSG和XX文件夹里面的数据获取所有编号显示在下拉框比对编号同时存在的可选仅存在一个文件夹的编号不可选
self.ui.pushButton_Refresh.clicked.connect(self.__refresh__)
self.ui.pushButton_OpenFile.clicked.connect(self.__openFile__)
self.ui.pushButton_Standardize.clicked.connect(self.__standardize__)
def __customChannel__(self, state):
if state == 2:
self.ui.spinBox_custom.setEnabled(True)
else:
self.ui.spinBox_custom.setEnabled(False)
# 刷新键分别获取PSG和XX文件夹里面的数据获取所有编号显示在下拉框比对编号同时存在的可选仅存在一个文件夹的编号不可选
def __refresh__(self):
# 检查两文件夹是否存在
PSGPath = Path(self.setting.ui.lineEdit_PSGFilePath.text())
XXPath = Path(self.setting.ui.lineEdit_XXFilePath.text())
if not PSGPath.exists():
QMessageBox.warning(self, "警告", "PSG文件夹不存在")
return
if not XXPath.exists():
QMessageBox.warning(self, "警告", "XX文件夹不存在")
return
# 获取两文件夹下所有的txt和npy文件编号
PSGFiles = [file.stem for file in PSGPath.glob("*.edf")]
XXFiles = [file.stem for file in XXPath.glob("*.txt")] + [file.stem for file in XXPath.glob("*.npy")]
# 获取两文件夹下同时存在的编号
print(PSGFiles, XXFiles)
Files = list(set(PSGFiles).intersection(set(XXFiles)))
# 获取两文件夹下仅存在一个的编号
FilesOnlyInPSG = list(set(PSGFiles).difference(set(XXFiles)))
FilesOnlyInXX = list(set(XXFiles).difference(set(PSGFiles)))
print(Files, FilesOnlyInPSG, FilesOnlyInXX)
# 均显示到下拉框
self.ui.comboBox_SelectFile.clear()
self.ui.comboBox_SelectFile.addItems([file for file in Files])
self.ui.comboBox_SelectFile.addItems([file + " (仅PSG)" for file in FilesOnlyInPSG])
self.ui.comboBox_SelectFile.addItems([file + " (仅XX)" for file in FilesOnlyInXX])
self.ui.comboBox_SelectFile.setCurrentIndex(0)
# # 仅存在一个文件夹的编号不可选
for file in FilesOnlyInPSG:
self.ui.comboBox_SelectFile.model().item(FilesOnlyInPSG.index(file) + len(Files)).setEnabled(False)
for file in FilesOnlyInXX:
self.ui.comboBox_SelectFile.model().item(
FilesOnlyInXX.index(file) + len(Files) + len(FilesOnlyInPSG)).setEnabled(False)
def __openFile__(self):
# 获取checkbox状态
self.data = Data(Path(self.setting.ui.lineEdit_PSGFilePath.text()),
Path(self.setting.ui.lineEdit_XXFilePath.text()),
self.ui.comboBox_SelectFile.currentText().split(" ")[0],
Conf)
opened, info = self.data.OpenFile()
if not opened:
QMessageBox.warning(self, "警告", info)
self.ui.label_Info.setText(info)
return
else:
self.ui.label_PSGmins.setText(str(self.data.PSG_minutes))
self.ui.label_XXmins.setText(str(self.data.XX_minutes))
self.ui.label_Info.setText(info)
def __standardize__(self):
Conf2 = self.data.Config.copy()
Conf2["RawSignal"] = self.ui.checkBox_RawSignal.isChecked()
Conf2["PSGConfig"].update({
"PSGDelBase": self.ui.checkBox_PSGDelBase.isChecked(),
"PSGZScore": self.ui.checkBox_PSGZScore.isChecked(),
"Frequency": self.ui.spinBox_PSGFreq.value(),
})
Conf2["XXConfig"].update({
"XXDelBase": self.ui.checkBox_XXDelBase.isChecked(),
"XXZScore": self.ui.checkBox_XXZScore.isChecked(),
"Frequency": self.ui.spinBox_XXFreq.value(),
})
self.data.Config = Conf2
self.data.Standardize()
if __name__ == "__main__":
app = QApplication(sys.argv)
window = MainWindow()
window.show()
sys.exit(app.exec())