Quality_Relabel/utils/Prepare_Data.py
2022-03-27 22:10:32 +08:00

411 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: cp936 -*-
# 使用gbk编码才能显示
"""
@author:Marques
@file:Prepare_Data.py
@email:admin@marques22.com
@email:2021022362@m.scnu.edu.cn
@time:2022/03/26
"""
from datetime import datetime
from typing import Union, List
import pyedflib
from pathlib import Path
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt, gridspec
from Preprocessing import BCG_Operation
from tqdm import tqdm
plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
# ['EEG F3-A2', 'EEG F4-A1', 'EEG C3-A2', 'EEG C4-A1', 'EEG O1-A2',
# 'EEG O2-A1', 'EOG Right', 'EOG Left', 'EMG Chin', 'ECG I', 'RR',
# 'ECG II', 'Effort Tho', 'Flow Patient', 'Flow Patient', 'Effort Abd',
# 'SpO2', 'Pleth', 'Snore', 'Body', 'Pulse', 'Leg LEG1', 'Leg LEG2',
# 'EEG A1-A2', 'Imp']
class Prepare_Data:
# 可选择的通道
base_channel = ['EEG F3-A2', 'EEG F4-A1', 'EEG C3-A2', 'EEG C4-A1', 'EEG O1-A2', 'EEG O2-A1', 'EOG Right',
'EOG Left', 'EMG Chin', 'ECG I', 'RR', 'ECG II', 'Effort Tho', 'Flow Patient', 'Flow Patient', 'HR',
'Effort Abd', 'SpO2', 'Pleth', 'Snore', 'Body', 'Pulse', 'Leg LEG1', 'Leg LEG2', 'EEG A1-A2', 'Imp']
# 显示事件
base_event = ["Hypopnea", "Central apnea", "Obstructive apnea", "Mixed apnea"]
# 设定事件和其对应颜色
# 蓝色 背景
# 粉色 低通气
# 橙色 中枢性
# 红色 阻塞型 与 混合型
color_cycle = ["blue", "pink", "orange", "red", "red"]
assert len(color_cycle) == len(base_event) + 1, "基础事件数量与颜色数量不一致"
def __init__(self, sampNo: int, frequency: int = 100, bcg_frequency: int = 1000,
channel_list: List[str] = ['Effort Tho', 'Effort Abd', 'SpO2', 'Flow Patient', 'Flow Patient'],
focus_event_list: List[str] = ["Obstructive apnea"]):
"""
:param sampNo: 编号选择
:param frequency: 显示采样率
:param bcg_frequency: BCG信号采样率
:param channel_list: 显示的通道
:param focus_event_list: 关注暂停事件
"""
self.sampNo = sampNo
self.channel_list = channel_list
self.focus_event_list = focus_event_list
self.frequency = frequency
self.bcg_frequency = bcg_frequency
self.ecg_start_time = None
# 用来显示颜色时按点匹配事件
self.ecg_event_label = None
self.bcg_event_label = None
# 仅包含关注暂停事件的列表
self.ecg_event_label_filtered_df = None
self.bcg_event_label_filtered_df = None
# 所有事件列表
self.ecg_event_label_df = None
self.bcg_event_label_df = None
# 各通道信号
self.signal_select = {}
self.check_channel()
self.read_data(frequency, bcg_frequency)
self.read_event()
def check_channel(self):
for i in self.channel_list:
if i not in self.base_channel:
print(f"{i} 不存在于常见通道名中")
print(f"常见通道名:{self.channel_list}")
def read_data(self, frequency: int = 100, bcg_frequency: int = 1000):
bcg_path = Path(f"../Data/BCG/{self.sampNo}samp.npy")
ecg_path = Path(f"../Data/ECG/A{str(self.sampNo).rjust(7, '0')}.edf")
if not bcg_path.exists():
raise FileNotFoundError(f"{bcg_path} 不存在!")
if not ecg_path.exists():
raise FileNotFoundError(f"{ecg_path} 不存在!")
with pyedflib.EdfReader(str(ecg_path.resolve())) as file:
signal_num = file.signals_in_file
print(f"{self.sampNo} EDF file signal number is {signal_num}")
signal_label = file.getSignalLabels()
print(f"{self.sampNo} EDF file signal label : {signal_label}")
self.ecg_start_time = file.getStartdatetime()
# 根据PSG记录长度生成事件表
self.ecg_event_label = np.zeros(file.getFileDuration() * self.frequency)
# 打印PSG信息
file.file_info_long()
# sub_index 用于区分两个flow patient
sub_index = 1
for i, index in enumerate(signal_label):
# 仅加载选中的通道
if index in self.channel_list:
# 重命名flow patient通道
if index == 'Flow Patient':
index = index + str(sub_index)
sub_index += 1
signal = file.readSignal(i)
sample_frequency = file.getSampleFrequency(i)
# 读取采样率 进行重采样
if sample_frequency < frequency:
signal = signal.repeat(frequency / sample_frequency)
elif sample_frequency > frequency:
signal = signal[::int(sample_frequency / frequency)]
self.signal_select[index] = signal
# 加载心晓信号
signal = np.load(bcg_path)
preprocessing = BCG_Operation(sample_rate=bcg_frequency)
# 20Hz低通去噪
signal1 = preprocessing.Butterworth(signal, 'lowpass', low_cut=20, order=3)
# 0.7Hz 低通提取呼吸
signal2 = preprocessing.Butterworth(signal, 'lowpass', low_cut=0.7, order=3)
# 进行降采样
signal1 = signal1[::int(bcg_frequency / frequency)]
signal2 = signal2[::int(bcg_frequency / frequency)]
# 根据心晓事件长度生成事件记录
self.bcg_event_label = np.zeros(len(signal))
self.signal_select['xin_xiao'] = signal1
self.signal_select['xin_xiao_respire'] = signal2
def read_event(self):
bcg_label_path = Path(f"../Data/BCG_label/{self.sampNo}_label_all.csv")
ecg_label_path = Path(f"../Data/ECG_label/export{self.sampNo}.csv")
if not bcg_label_path.exists():
raise FileNotFoundError(f"{bcg_label_path} 不存在!")
if not ecg_label_path.exists():
raise FileNotFoundError(f"{ecg_label_path} 不存在!")
df = pd.read_csv(ecg_label_path, encoding='gbk')
self.ecg_event_label_df = df
# 过滤不关注的事件
df = df[df["Event type"].isin(self.focus_event_list)]
# 根据epoch进行排列方便索引
df = df.sort_values(by='Epoch')
self.ecg_event_label_filtered_df = df
for one_data in df.index:
one_data = df.loc[one_data]
# 通过开始事件推算事件起始点与结束点
event_start_time = datetime.strptime(one_data["Date"] + " " + one_data["Time"], '%Y/%m/%d %H:%M:%S')
SP = (event_start_time - self.ecg_start_time).seconds
# 对括号进行切分避免Duration 20 (20) 这种带括号的问题
EP = int(SP + float(one_data["Duration"].split("(")[0]))
SP *= self.frequency
EP *= self.frequency
# 对事件重新编码并存到事件记录表中
if one_data["Event type"] == "Hypopnea":
self.ecg_event_label[SP:EP] = 1
elif one_data["Event type"] == "Central apnea":
self.ecg_event_label[SP:EP] = 2
elif one_data["Event type"] == "Obstructive apnea":
self.ecg_event_label[SP:EP] = 3
elif one_data["Event type"] == "Mixed apnea":
self.ecg_event_label[SP:EP] = 4
# 读取心晓事件
df = pd.read_csv(bcg_label_path, encoding='gbk')
df["new_start"] = df["new_start"].astype("int")
df["new_end"] = df["new_end"].astype("int")
self.bcg_event_label_df = df
# 过滤不关注事件
df = df[df["Event type"].isin(self.focus_event_list)]
df = df.sort_values(by='Epoch')
self.bcg_event_label_filtered_df = df
for one_data in df.index:
one_data = df.loc[one_data]
SP = one_data["new_start"] * self.frequency
EP = one_data["new_end"] * self.frequency
if one_data["Event type"] == "Hypopnea":
self.bcg_event_label[SP:EP] = 1
elif one_data["Event type"] == "Central apnea":
self.bcg_event_label[SP:EP] = 2
elif one_data["Event type"] == "Obstructive apnea":
self.bcg_event_label[SP:EP] = 3
elif one_data["Event type"] == "Mixed apnea":
self.bcg_event_label[SP:EP] = 4
# assert len(self.ecg_event_label_filtered_df) == len(self.bcg_event_label_filtered_df), \
# f"PSG与心晓事件数量不一致, PSG事件数量{len(self.ecg_event_label_filtered_df)},
# 心晓事件数量{len(self.bcg_event_label_filtered_df)}"
def show_one_event(self, bcg_index: int, ecg_index: int, front_add_second: int = 60,
back_add_second: int = 60, main_SA_visual: int = 1):
"""
:param bcg_index: 心晓事件在csv中行号
:param ecg_index: PSG事件在csv中序号
:param front_add_second: 向前延伸时间
:param back_add_second: 向后延伸时间
:param main_SA_visual: 1仅当前事件上色 0不上色 2所有事件上色
:return:
"""
one_bcg_data = self.bcg_event_label_df.loc[bcg_index]
one_ecg_data = self.ecg_event_label_df.loc[ecg_index]
# 获取ECG事件开始与结束时间
event_start_time = datetime.strptime(one_ecg_data["Date"] + " " + one_ecg_data["Time"], '%Y/%m/%d %H:%M:%S')
ecg_SP = (event_start_time - self.ecg_start_time).seconds
ecg_duration = int(float(str(one_ecg_data["Duration"]).split("(")[0]) + 0.5)
ecg_EP = ecg_SP + ecg_duration
# 获取BCG事件开始与结束时间
bcg_SP = one_bcg_data["new_start"]
bcg_EP = one_bcg_data["new_end"]
bcg_duration = int(float(str(one_bcg_data["Duration"]).split("(")[0]))
print(ecg_SP, ecg_EP, bcg_SP, bcg_EP)
# 进行向两边延展
ecg_SP = ecg_SP - front_add_second
ecg_EP = ecg_EP + back_add_second
bcg_SP = bcg_SP - front_add_second - (ecg_duration - bcg_duration) // 2
bcg_EP = bcg_EP + back_add_second + (ecg_duration - bcg_duration) // 2
# 绘图
plt.figure(figsize=(12, 6), dpi=150)
gs = gridspec.GridSpec(7, 1, height_ratios=[1, 1, 1, 3, 1, 1, 1])
plt.subplots_adjust(top=1, bottom=0, right=1, left=0, hspace=0, wspace=0)
plt.margins(0, 0)
plt.tight_layout()
plt.subplot(gs[0])
# ['Effort Tho', 'Effort Abd', 'SpO2', 'Flow Patient', 'Flow Patient']
plt.plot(np.linspace(ecg_SP, ecg_EP, (ecg_EP - ecg_SP) * self.frequency),
self.signal_select["Effort Tho"][ecg_SP * self.frequency:ecg_EP * self.frequency], label="Effort Tho")
# 进行事件颜色标注
for j in range(1, 5):
mask = self.ecg_event_label[ecg_SP * self.frequency:ecg_EP * self.frequency] == j
y = (self.signal_select["Effort Tho"][ecg_SP * self.frequency:ecg_EP * self.frequency] * mask).astype(
'float')
np.place(y, y == 0, np.nan)
plt.plot(np.linspace(ecg_SP, ecg_EP, (ecg_EP - ecg_SP) * self.frequency), y, color=self.color_cycle[j])
# 显示图注
plt.legend(loc=1)
# 隐藏部分边框
ax = plt.gca()
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.spines["bottom"].set_visible(False)
# 去掉x轴
plt.xticks([])
plt.subplot(gs[1])
# ['Effort Tho', 'Effort Abd', 'SpO2', 'Flow Patient', 'Flow Patient']
plt.plot(np.linspace(ecg_SP, ecg_EP, (ecg_EP - ecg_SP) * self.frequency),
self.signal_select["Effort Abd"][ecg_SP * self.frequency:ecg_EP * self.frequency], label="Effort Abd")
for j in range(1, 5):
mask = self.ecg_event_label[ecg_SP * self.frequency:ecg_EP * self.frequency] == j
y = (self.signal_select["Effort Abd"][ecg_SP * self.frequency:ecg_EP * self.frequency] * mask).astype(
'float')
np.place(y, y == 0, np.nan)
plt.plot(np.linspace(ecg_SP, ecg_EP, (ecg_EP - ecg_SP) * self.frequency), y, color=self.color_cycle[j])
plt.title(f"sampNo:{self.sampNo} Epoch:{one_ecg_data['Epoch']} Duration:{one_ecg_data['Duration']}")
plt.legend(loc=1)
ax = plt.gca()
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.spines["bottom"].set_visible(False)
plt.xticks([])
plt.subplot(gs[2])
# ['Effort Tho', 'Effort Abd', 'SpO2', 'Flow Patient', 'Flow Patient']
plt.plot(np.linspace(bcg_SP, bcg_EP, (bcg_EP - bcg_SP) * self.frequency),
self.signal_select["xin_xiao_respire"][bcg_SP * self.frequency:bcg_EP * self.frequency], label="心晓 呼吸")
min_bcg = self.signal_select["xin_xiao_respire"][bcg_SP * self.frequency:bcg_EP * self.frequency].min()
len_bcg = bcg_EP * self.frequency - bcg_SP * self.frequency
for j in range(1, 5):
mask = self.bcg_event_label[bcg_SP * self.frequency:bcg_EP * self.frequency] == j
y = (min_bcg.repeat(len_bcg) * mask).astype('float')
np.place(y, y == 0, np.nan)
plt.plot(np.linspace(bcg_SP, bcg_EP, (bcg_EP - bcg_SP) * self.frequency), y, color=self.color_cycle[j])
# plt.title(f"sampNo:{self.sampNo} Epoch:{one_bcg_data['Epoch']} Duration:{one_bcg_data['Duration']}")
plt.legend(loc=1)
ax = plt.gca()
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.spines["bottom"].set_visible(False)
plt.xticks([])
plt.subplot(gs[3])
# ['Effort Tho', 'Effort Abd', 'SpO2', 'Flow Patient', 'Flow Patient']
plt.plot(np.linspace(bcg_SP, bcg_EP, (bcg_EP - bcg_SP) * self.frequency),
self.signal_select["xin_xiao"][bcg_SP * self.frequency:bcg_EP * self.frequency], label="心晓")
min_bcg = self.signal_select["xin_xiao"][bcg_SP * self.frequency:bcg_EP * self.frequency].min()
len_bcg = bcg_EP * self.frequency - bcg_SP * self.frequency
for j in range(1, 5):
mask = self.bcg_event_label[bcg_SP * self.frequency:bcg_EP * self.frequency] == j
y = (min_bcg.repeat(len_bcg) * mask).astype('float')
np.place(y, y == 0, np.nan)
plt.plot(np.linspace(bcg_SP, bcg_EP, (bcg_EP - bcg_SP) * self.frequency), y, color=self.color_cycle[j])
plt.title(f"sampNo:{self.sampNo} Epoch:{one_bcg_data['Epoch']} Duration:{one_bcg_data['Duration']}")
plt.legend(loc=1)
ax = plt.gca()
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.spines["bottom"].set_visible(False)
plt.xticks([]) # 去掉x轴
plt.subplot(gs[4])
# ['Effort Tho', 'Effort Abd', 'SpO2', 'Flow Patient', 'Flow Patient']
plt.plot(np.linspace(ecg_SP, ecg_EP, (ecg_EP - ecg_SP) * self.frequency),
self.signal_select["Flow Patient1"][ecg_SP * self.frequency:ecg_EP * self.frequency],
label="Flow Patient1")
for j in range(1, 5):
mask = self.ecg_event_label[ecg_SP * self.frequency:ecg_EP * self.frequency] == j
y = (self.signal_select["Flow Patient1"][ecg_SP * self.frequency:ecg_EP * self.frequency] * mask).astype(
'float')
np.place(y, y == 0, np.nan)
plt.plot(np.linspace(ecg_SP, ecg_EP, (ecg_EP - ecg_SP) * self.frequency), y, color=self.color_cycle[j])
plt.legend(loc=1)
ax = plt.gca()
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.spines["bottom"].set_visible(False)
plt.xticks([]) # 去掉x轴
plt.subplot(gs[5])
# ['Effort Tho', 'Effort Abd', 'SpO2', 'Flow Patient', 'Flow Patient']
plt.plot(np.linspace(ecg_SP, ecg_EP, (ecg_EP - ecg_SP) * self.frequency),
self.signal_select["Flow Patient2"][ecg_SP * self.frequency:ecg_EP * self.frequency],
label="Flow Patient2")
for j in range(1, 5):
mask = self.ecg_event_label[ecg_SP * self.frequency:ecg_EP * self.frequency] == j
y = (self.signal_select["Flow Patient2"][ecg_SP * self.frequency:ecg_EP * self.frequency] * mask).astype(
'float')
np.place(y, y == 0, np.nan)
plt.plot(np.linspace(ecg_SP, ecg_EP, (ecg_EP - ecg_SP) * self.frequency), y, color=self.color_cycle[j])
plt.legend(loc=1)
ax = plt.gca()
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.spines["bottom"].set_visible(False)
plt.xticks([])
plt.subplot(gs[6])
# ['Effort Tho', 'Effort Abd', 'SpO2', 'Flow Patient', 'Flow Patient']
plt.plot(np.linspace(ecg_SP, ecg_EP, (ecg_EP - ecg_SP) * self.frequency),
self.signal_select["SpO2"][ecg_SP * self.frequency:ecg_EP * self.frequency], label="SpO2")
plt.legend(loc=1)
ax = plt.gca()
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.spines["bottom"].set_visible(False)
plt.xticks([])
plt.show()
def show_all_event(self, start_index: int = 0, shifting: int = 0, front_add_second: int = 60,
back_add_second: int = 60, main_SA_visual: int = 1):
for index in range(start_index, len(self.bcg_event_label_filtered_df)):
self.show_one_event(self.bcg_event_label_filtered_df.index[index],
self.ecg_event_label_filtered_df.index[index + shifting],
front_add_second=front_add_second,
back_add_second=back_add_second,
main_SA_visual=main_SA_visual
)
def get_fft(self):
pass
if __name__ == '__main__':
prepareData = Prepare_Data(670)
prepareData.show_all_event()