468 lines
19 KiB
Python
468 lines
19 KiB
Python
# -*- coding: cp936 -*-
|
||
# 使用gbk编码才能读标签
|
||
"""
|
||
@author:Marques
|
||
@file:Prepare_Data.py
|
||
@email:admin@marques22.com
|
||
@email:2021022362@m.scnu.edu.cn
|
||
@time:2022/03/26
|
||
"""
|
||
import time
|
||
from typing import List
|
||
import logging
|
||
import pyedflib
|
||
from pathlib import Path
|
||
import numpy as np
|
||
import pandas as pd
|
||
from matplotlib import pyplot as plt, gridspec
|
||
from utils.Preprocessing import BCG_Operation
|
||
from tqdm import tqdm
|
||
from datetime import datetime
|
||
|
||
plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
|
||
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
|
||
|
||
# ['EEG F3-A2', 'EEG F4-A1', 'EEG C3-A2', 'EEG C4-A1', 'EEG O1-A2',
|
||
# 'EEG O2-A1', 'EOG Right', 'EOG Left', 'EMG Chin', 'ECG I', 'RR',
|
||
# 'ECG II', 'Effort Tho', 'Flow Patient', 'Flow Patient', 'Effort Abd',
|
||
# 'SpO2', 'Pleth', 'Snore', 'Body', 'Pulse', 'Leg LEG1', 'Leg LEG2',
|
||
# 'EEG A1-A2', 'Imp']
|
||
|
||
|
||
# 设置日志
|
||
logger = logging.getLogger()
|
||
logger.setLevel(logging.NOTSET)
|
||
realtime = time.strftime('%Y%m%d', time.localtime(time.time()))
|
||
fh = logging.FileHandler(Path("history") / (realtime + ".log"), mode='a')
|
||
fh.setLevel(logging.NOTSET)
|
||
fh.setFormatter(logging.Formatter("%(asctime)s: %(message)s"))
|
||
logger.addHandler(fh)
|
||
|
||
ch = logging.StreamHandler()
|
||
ch.setLevel(logging.NOTSET)
|
||
ch.setFormatter(logging.Formatter("%(asctime)s: %(message)s"))
|
||
logger.addHandler(ch)
|
||
logging.getLogger('matplotlib.font_manager').disabled = True
|
||
logging.info("------------------------------------")
|
||
|
||
|
||
class Quality_Relabel:
|
||
# 可选择的通道
|
||
base_channel = ['EEG F3-A2', 'EEG F4-A1', 'EEG C3-A2', 'EEG C4-A1', 'EEG O1-A2', 'EEG O2-A1', 'EOG Right',
|
||
'EOG Left', 'EMG Chin', 'ECG I', 'RR', 'ECG II', 'Effort Tho', 'Flow Patient', 'Flow Patient', 'HR',
|
||
'Effort Abd', 'SpO2', 'Pleth', 'Snore', 'Body', 'Pulse', 'Leg LEG1', 'Leg LEG2', 'EEG A1-A2', 'Imp']
|
||
|
||
# 显示事件
|
||
base_event = ["Hypopnea", "Central apnea", "Obstructive apnea", "Mixed apnea", "Desaturation"]
|
||
|
||
# 设定事件和其对应颜色
|
||
# event_code color event
|
||
# 0 黑色 背景
|
||
# 1 粉色 低通气
|
||
# 2 蓝色 中枢性
|
||
# 3 红色 阻塞型
|
||
# 4 灰色 混合型
|
||
# 5 绿色 血氧饱和度下降
|
||
# 6 橙色 大体动
|
||
# 7 橙色 小体动
|
||
# 8 橙色 深呼吸
|
||
# 9 橙色 脉冲体动
|
||
# 10 橙色 无效片段
|
||
color_cycle = ["black", "pink", "blue", "red", "silver", "green", "orange", "orange", "orange", "orange", "orange"]
|
||
|
||
# assert len(color_cycle) == len(base_event) + 1, "基础事件数量与颜色数量不一致"
|
||
|
||
def __init__(self, all_path: List, sampNo: int, frequency: int = 100, bcg_frequency: int = 1000,
|
||
extend_second: int = 0,
|
||
channel_list: List[str] = ['Effort Tho', 'Effort Abd', 'SpO2', 'Flow Patient', 'Flow Patient'],
|
||
focus_event_list: List[str] = ["Obstructive apnea"]):
|
||
"""
|
||
|
||
:param sampNo: 编号选择
|
||
:param frequency: 显示采样率
|
||
:param bcg_frequency: BCG信号采样率
|
||
:param channel_list: 显示的通道
|
||
:param focus_event_list: 关注暂停事件
|
||
"""
|
||
self.PSG_Data_Path = all_path[0]
|
||
self.PSG_Label_Path = all_path[1]
|
||
self.BCG_Data_Path = all_path[2]
|
||
self.BCG_Label_Path = all_path[3]
|
||
self.Artifact_Label_Path = all_path[4]
|
||
self.Artifact_Offset_Path = all_path[5]
|
||
|
||
self.sampNo = sampNo
|
||
self.channel_list = channel_list
|
||
self.focus_event_list = focus_event_list
|
||
self.frequency = frequency
|
||
self.bcg_frequency = bcg_frequency
|
||
self.extend_second = extend_second
|
||
|
||
self.ecg_start_time = None
|
||
|
||
# 用来显示颜色时按点匹配事件
|
||
self.ecg_event_label = None
|
||
self.bcg_event_label = None
|
||
self.spo2_event_label = None
|
||
self.artifact_event_label = None
|
||
|
||
# 仅包含关注暂停事件的列表
|
||
self.ecg_event_label_filtered_df = None
|
||
self.bcg_event_label_filtered_df = None
|
||
|
||
# 所有事件列表
|
||
self.ecg_event_label_df = None
|
||
self.bcg_event_label_df = None
|
||
|
||
# 各通道信号
|
||
self.signal_select = {}
|
||
|
||
self.check_channel()
|
||
self.read_data(frequency, bcg_frequency, extend_second)
|
||
self.read_event()
|
||
self.read_artifact_label()
|
||
|
||
def check_channel(self):
|
||
for i in self.channel_list:
|
||
if i not in self.base_channel:
|
||
logging.debug(f"{i} 不存在于常见通道名中")
|
||
print(f"常见通道名:{self.channel_list}")
|
||
|
||
def read_data(self, frequency: int = 100, bcg_frequency: int = 1000):
|
||
bcg_path = self.BCG_Data_Path / f"{self.sampNo}samp.npy"
|
||
ecg_path = self.PSG_Data_Path / f"A{str(self.sampNo).rjust(7, '0')}.edf"
|
||
|
||
if not bcg_path.exists():
|
||
logging.error(f"{bcg_path} 不存在!")
|
||
raise FileNotFoundError(f"{bcg_path} 不存在!")
|
||
|
||
if not ecg_path.exists():
|
||
logging.error(f"{ecg_path} 不存在!")
|
||
raise FileNotFoundError(f"{ecg_path} 不存在!")
|
||
|
||
with pyedflib.EdfReader(str(ecg_path.resolve())) as file:
|
||
signal_num = file.signals_in_file
|
||
logging.debug(f"{self.sampNo} EDF file signal number is {signal_num}")
|
||
|
||
signal_label = file.getSignalLabels()
|
||
logging.debug(f"{self.sampNo} EDF file signal label : {signal_label}")
|
||
|
||
self.ecg_start_time = file.getStartdatetime()
|
||
|
||
# 根据PSG记录长度生成事件表
|
||
self.ecg_event_label = np.zeros(
|
||
file.getFileDuration() * self.frequency + self.extend_second * self.frequency)
|
||
self.spo2_event_label = np.zeros(
|
||
file.getFileDuration() * self.frequency + self.extend_second * self.frequency)
|
||
# 打印PSG信息
|
||
file.file_info_long()
|
||
|
||
# sub_index 用于区分两个flow patient
|
||
sub_index = 1
|
||
|
||
logging.info("读取PSG信号····")
|
||
for i, index in enumerate(signal_label):
|
||
# 仅加载选中的通道
|
||
if index in self.channel_list:
|
||
# 重命名flow patient通道
|
||
if index == 'Flow Patient':
|
||
if sub_index == 1:
|
||
index = "Flow T"
|
||
else:
|
||
index = "Flow P"
|
||
sub_index += 1
|
||
signal = file.readSignal(i)
|
||
sample_frequency = file.getSampleFrequency(i)
|
||
# 读取采样率 进行重采样
|
||
if sample_frequency < frequency:
|
||
signal = signal.repeat(frequency / sample_frequency)
|
||
elif sample_frequency > frequency:
|
||
signal = signal[::int(sample_frequency / frequency)]
|
||
self.signal_select[index] = signal
|
||
logging.info(f"完成读取PSG: {index}")
|
||
|
||
# 加载心晓信号
|
||
logging.info("读取心晓信号····")
|
||
signal = np.load(bcg_path)
|
||
preprocessing = BCG_Operation(sample_rate=bcg_frequency)
|
||
# 20Hz低通去噪
|
||
signal1 = preprocessing.Butterworth(signal, 'lowpass', low_cut=20, order=3)
|
||
# 0.7Hz 低通提取呼吸
|
||
signal2 = preprocessing.Butterworth(signal, 'lowpass', low_cut=0.7, order=3)
|
||
|
||
# 进行降采样
|
||
signal1 = signal1[::int(bcg_frequency / frequency)]
|
||
signal2 = signal2[::int(bcg_frequency / frequency)]
|
||
|
||
# 根据心晓事件长度生成事件记录
|
||
self.bcg_event_label = np.zeros(len(signal) + self.extend_second * self.frequency)
|
||
self.signal_select['orgdata'] = signal1
|
||
self.signal_select['0.7lowpass_resp'] = signal2
|
||
|
||
for signal_key in self.signal_select.keys():
|
||
self.signal_select[signal_key] = np.append(self.signal_select[signal_key],
|
||
self.signal_select[signal_key].mean().astype(int).repeat(
|
||
self.extend_second * self.frequency))
|
||
|
||
def read_event(self):
|
||
bcg_label_path = self.BCG_Label_Path / f"export{self.sampNo}_all.csv"
|
||
ecg_label_path = self.PSG_Label_Path / f"export{self.sampNo}.csv"
|
||
|
||
if not bcg_label_path.exists():
|
||
logging.error(f"{bcg_label_path} 不存在!")
|
||
raise FileNotFoundError(f"{bcg_label_path} 不存在!")
|
||
|
||
if not ecg_label_path.exists():
|
||
logging.error(f"{ecg_label_path} 不存在!")
|
||
raise FileNotFoundError(f"{ecg_label_path} 不存在!")
|
||
|
||
df = pd.read_csv(ecg_label_path, encoding='gbk')
|
||
self.ecg_event_label_df = df
|
||
|
||
# 过滤不关注的事件
|
||
df2 = df[df["Event type"].isin(self.focus_event_list)]
|
||
# 根据epoch进行排列方便索引
|
||
df2 = df2.sort_values(by='Epoch')
|
||
self.ecg_event_label_filtered_df = df2
|
||
|
||
logging.info("遍历PSG事件···")
|
||
for one_data in tqdm(df.index, ncols=80):
|
||
one_data = df.loc[one_data]
|
||
|
||
# 通过开始时间推算事件起始点与结束点
|
||
event_start_time = datetime.strptime(one_data["Date"] + " " + one_data["Time"], '%Y/%m/%d %H:%M:%S')
|
||
SP = (event_start_time - self.ecg_start_time).seconds
|
||
# 对括号进行切分,避免Duration 20 (20) 这种带括号的问题
|
||
EP = int(SP + float(one_data["Duration"].split("(")[0]))
|
||
SP *= self.frequency
|
||
EP *= self.frequency
|
||
|
||
# 对事件重新编码并存到事件记录表中
|
||
if one_data["Event type"] == "Hypopnea":
|
||
self.ecg_event_label[SP:EP] = 1
|
||
elif one_data["Event type"] == "Central apnea":
|
||
self.ecg_event_label[SP:EP] = 2
|
||
elif one_data["Event type"] == "Obstructive apnea":
|
||
self.ecg_event_label[SP:EP] = 3
|
||
elif one_data["Event type"] == "Mixed apnea":
|
||
self.ecg_event_label[SP:EP] = 4
|
||
elif one_data["Event type"] == "Desaturation":
|
||
self.spo2_event_label[SP:EP] = 5
|
||
|
||
# 读取心晓事件
|
||
df = pd.read_csv(bcg_label_path, encoding='gbk')
|
||
df["new_start"] = df["new_start"].astype("int")
|
||
df["new_end"] = df["new_end"].astype("int")
|
||
self.bcg_event_label_df = df
|
||
|
||
# 过滤不关注事件
|
||
df2 = df[df["Event type"].isin(self.focus_event_list)]
|
||
df2 = df2.sort_values(by='Epoch')
|
||
self.bcg_event_label_filtered_df = df2
|
||
logging.info("遍历心晓事件···")
|
||
for one_data in tqdm(df.index):
|
||
one_data = df.loc[one_data]
|
||
SP = one_data["new_start"] * self.frequency
|
||
EP = one_data["new_end"] * self.frequency
|
||
|
||
if one_data["Event type"] == "Hypopnea":
|
||
self.bcg_event_label[SP:EP] = 1
|
||
elif one_data["Event type"] == "Central apnea":
|
||
self.bcg_event_label[SP:EP] = 2
|
||
elif one_data["Event type"] == "Obstructive apnea":
|
||
self.bcg_event_label[SP:EP] = 3
|
||
elif one_data["Event type"] == "Mixed apnea":
|
||
self.bcg_event_label[SP:EP] = 4
|
||
|
||
def read_artifact_label(self):
|
||
all_offset_length = pd.read_excel(self.Artifact_Offset_Path)
|
||
offset_length = all_offset_length[all_offset_length['数据编号'] == self.sampNo]['from_code'].values[0]
|
||
artifact_label_path = self.Artifact_Label_Path / f"Artifact_{self.sampNo}.txt"
|
||
artifact_label = pd.read_csv(artifact_label_path, header=None).to_numpy().reshape(-1, 4)
|
||
|
||
self.artifact_event_label = np.zeros(len(self.bcg_event_label) + self.extend_second * self.frequency)
|
||
|
||
for i, artifact_type, SP, EP in artifact_label:
|
||
SP = (int(SP) + offset_length) // (1000 // self.frequency)
|
||
EP = (int(EP) + offset_length) // (1000 // self.frequency)
|
||
artifact_type = int(artifact_type) + 5
|
||
|
||
SP = 0 if SP < 0 else SP
|
||
if EP < 0:
|
||
continue
|
||
|
||
if SP > len(self.bcg_event_label):
|
||
continue
|
||
|
||
self.artifact_event_label[SP:EP] = artifact_type
|
||
|
||
# assert len(self.ecg_event_label_filtered_df) == len(self.bcg_event_label_filtered_df), \
|
||
# f"PSG与心晓事件数量不一致, PSG事件数量{len(self.ecg_event_label_filtered_df)},
|
||
# 心晓事件数量{len(self.bcg_event_label_filtered_df)}"
|
||
|
||
def show_one_event(self, bcg_index: int, ecg_index: int, front_add_second: int = 60,
|
||
back_add_second: int = 60):
|
||
"""
|
||
:param bcg_index: 心晓事件在csv中行号
|
||
:param ecg_index: PSG事件在csv中序号
|
||
:param front_add_second: 向前延伸时间
|
||
:param back_add_second: 向后延伸时间
|
||
:return:
|
||
"""
|
||
# 获取事件实际在csv文件中的序号
|
||
bcg_real_index = self.bcg_event_label_filtered_df.index[bcg_index],
|
||
ecg_real_index = self.ecg_event_label_filtered_df.index[ecg_index],
|
||
one_bcg_data = self.bcg_event_label_df.loc[bcg_real_index]
|
||
one_ecg_data = self.ecg_event_label_df.loc[ecg_real_index]
|
||
|
||
# 获取ECG事件开始与结束时间
|
||
event_start_time = datetime.strptime(one_ecg_data["Date"] + " " + one_ecg_data["Time"], '%Y/%m/%d %H:%M:%S')
|
||
ecg_SP = (event_start_time - self.ecg_start_time).seconds
|
||
ecg_duration = int(float(str(one_ecg_data["Duration"]).split("(")[0]) + 0.5)
|
||
ecg_EP = ecg_SP + ecg_duration
|
||
|
||
# 获取BCG事件开始与结束时间
|
||
bcg_SP = one_bcg_data["new_start"]
|
||
bcg_EP = one_bcg_data["new_end"]
|
||
bcg_duration = bcg_EP - bcg_SP
|
||
|
||
logging.info(f"sampNo:{self.sampNo} "
|
||
f"bcg[index:{bcg_index} epoch:{one_bcg_data['Epoch']} event:{one_bcg_data['Event type']}] "
|
||
f"ecg:[index:{ecg_index} epoch:{one_ecg_data['Epoch']} event:{one_ecg_data['Event type']}]")
|
||
|
||
if one_bcg_data['Event type'] != one_ecg_data['Event type']:
|
||
logging.error(f"sampNo:{self.sampNo} PSG事件与心晓时间不一致,请排查"
|
||
f"bcg[index:{bcg_index} epoch:{one_bcg_data['Epoch']} event:{one_bcg_data['Event type']}] "
|
||
f"ecg:[index:{ecg_index} epoch:{one_ecg_data['Epoch']} event:{one_ecg_data['Event type']}]")
|
||
raise ValueError()
|
||
|
||
# 进行向两边延展
|
||
ecg_SP = ecg_SP - front_add_second
|
||
ecg_EP = ecg_EP + back_add_second
|
||
bcg_SP = bcg_SP - front_add_second - (ecg_duration - bcg_duration) // 2
|
||
bcg_EP = bcg_EP + back_add_second + (ecg_duration - bcg_duration) // 2
|
||
|
||
# 绘图
|
||
plt.figure(figsize=(12, 6), dpi=150)
|
||
# 各个子图之间的比例
|
||
gs = gridspec.GridSpec(7, 1, height_ratios=[1, 1, 1, 1, 1, 3, 1])
|
||
plt.subplots_adjust(top=1, bottom=0, right=1, left=0, hspace=0, wspace=0)
|
||
plt.margins(0, 0)
|
||
plt.tight_layout()
|
||
|
||
# 绘制 Flow1
|
||
plt.subplot(gs[0])
|
||
self.plt_channel(plt_=plt, SP=ecg_SP, EP=ecg_EP, channel="Flow T")
|
||
|
||
# 绘制 Flow2
|
||
plt.subplot(gs[1])
|
||
self.plt_channel(plt_=plt, SP=ecg_SP, EP=ecg_EP, channel="Flow P",
|
||
title=f"PSG sampNo:{self.sampNo} Index:{ecg_index + 1}/{len(self.ecg_event_label_filtered_df)} "
|
||
f"Epoch:{one_ecg_data['Epoch']} Duration:{ecg_duration}")
|
||
|
||
plt.subplot(gs[2])
|
||
self.plt_channel(plt_=plt, SP=ecg_SP, EP=ecg_EP, channel="Effort Tho")
|
||
|
||
plt.subplot(gs[3])
|
||
self.plt_channel(plt_=plt, SP=ecg_SP, EP=ecg_EP, channel="Effort Abd")
|
||
|
||
plt.subplot(gs[4])
|
||
self.plt_channel(plt_=plt, SP=ecg_SP, EP=ecg_EP, channel="SpO2", event_code=[5])
|
||
|
||
plt.subplot(gs[5])
|
||
self.plt_channel(plt_=plt, SP=bcg_SP, EP=bcg_EP, channel="orgdata",
|
||
event_code=[6, 7, 8, 9, 10, 1, 2, 3, 4],
|
||
event_show_under=False)
|
||
|
||
plt.subplot(gs[6])
|
||
self.plt_channel(plt_=plt, SP=bcg_SP, EP=bcg_EP, channel="0.7lowpass_resp",
|
||
event_code=[6, 7, 8, 9, 10, 1, 2, 3, 4],
|
||
event_show_under=False,
|
||
ax_bottom=True,
|
||
title=f"心晓 sampNo:{self.sampNo} Index:{bcg_index + 1}/{len(self.bcg_event_label_filtered_df)} "
|
||
f"Epoch:{one_bcg_data['Epoch']} Duration:{bcg_duration}",
|
||
)
|
||
|
||
figManager = plt.get_current_fig_manager()
|
||
figManager.window.showMaximized()
|
||
plt.show()
|
||
|
||
def plt_channel(self, plt_, SP, EP, channel, event_code=[1, 2, 3, 4], event_show_under=False,
|
||
ax_top=False, ax_bottom=False, ax_left=True, ax_right=False, title=None):
|
||
"""
|
||
|
||
:param plt_:
|
||
:param SP: 显示开始秒数
|
||
:param EP: 显示结束秒数
|
||
:param channel: 通道名称
|
||
:param event_code: 要上色的事件编号
|
||
:param event_show_under: 事件颜色显示在信号下面
|
||
:param ax_top: 显示上框线
|
||
:param ax_bottom: 显示下框线
|
||
:param ax_left: 显示左框线
|
||
:param ax_right: 显示右框线
|
||
:param title: 显示标题
|
||
:return:
|
||
"""
|
||
linestyle = "-"
|
||
SP = 0 if SP < 0 else SP
|
||
plt_.plot(np.linspace(SP, EP, (EP - SP) * self.frequency),
|
||
self.signal_select[channel][SP * self.frequency:EP * self.frequency], label=channel,
|
||
color=self.color_cycle[0])
|
||
|
||
for j in event_code:
|
||
if channel == "SpO2":
|
||
mask = self.spo2_event_label[SP * self.frequency:EP * self.frequency] == j
|
||
elif channel == "orgdata" or channel == "0.7lowpass_resp":
|
||
if j <= 5:
|
||
mask = self.bcg_event_label[SP * self.frequency:EP * self.frequency] == j
|
||
else:
|
||
mask = self.artifact_event_label[SP * self.frequency:EP * self.frequency] == j
|
||
linestyle = "-"
|
||
else:
|
||
mask = self.ecg_event_label[SP * self.frequency:EP * self.frequency] == j
|
||
|
||
if event_show_under:
|
||
min_point = self.signal_select[channel][SP * self.frequency:EP * self.frequency].min()
|
||
len_segment = EP * self.frequency - SP * self.frequency
|
||
y = (min_point.repeat(len_segment) * mask).astype('float')
|
||
np.place(y, y == 0, np.nan)
|
||
else:
|
||
y = (self.signal_select[channel][SP * self.frequency:EP * self.frequency] * mask).astype('float')
|
||
|
||
np.place(y, y == 0, np.nan)
|
||
plt_.plot(np.linspace(SP, EP, (EP - SP) * self.frequency), y, color=self.color_cycle[j],
|
||
linestyle=linestyle)
|
||
plt_.legend(loc=1)
|
||
|
||
if title is not None:
|
||
plt_.title(title)
|
||
ax = plt.gca()
|
||
ax.spines["top"].set_visible(ax_top)
|
||
ax.spines["right"].set_visible(ax_right)
|
||
ax.spines["bottom"].set_visible(ax_bottom)
|
||
ax.spines["left"].set_visible(ax_left)
|
||
# xticks = [[]] if xticks else [range(SP, EP, 5), [str(i) for i in range(0, (EP - SP), 5)]]
|
||
# print(xticks)
|
||
# plt_.xticks(*xticks) # 去掉x轴
|
||
|
||
def show_all_event(self, start_bcg_index: int = 0, front_add_second: int = 60,
|
||
back_add_second: int = 60):
|
||
|
||
for index in range(start_bcg_index, len(self.bcg_event_label_filtered_df)):
|
||
self.show_one_event(index,
|
||
index,
|
||
front_add_second=front_add_second,
|
||
back_add_second=back_add_second
|
||
)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
PSG_Data_Path = "../Data/PSG/"
|
||
PSG_Label_Path = "../Data/PSG_label/"
|
||
BCG_Data_Path = "../Data/BCG/"
|
||
BCG_Label_Path = "../Data/BCG_label/"
|
||
all_paths = [PSG_Data_Path, PSG_Label_Path, BCG_Data_Path, BCG_Label_Path]
|
||
prepareData = Quality_Relabel(all_paths, 670)
|
||
prepareData.show_all_event()
|