Quality_Relabel/utils/Quality_Relabel.py
2022-05-20 11:33:46 +08:00

474 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: cp936 -*-
# 使用gbk编码才能读标签
"""
@author:Marques
@file:Prepare_Data.py
@email:admin@marques22.com
@email:2021022362@m.scnu.edu.cn
@time:2022/03/26
"""
import time
from typing import List
import logging
import pyedflib
from pathlib import Path
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt, gridspec
from utils.Preprocessing import BCG_Operation
from tqdm import tqdm
from datetime import datetime
plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
# ['EEG F3-A2', 'EEG F4-A1', 'EEG C3-A2', 'EEG C4-A1', 'EEG O1-A2',
# 'EEG O2-A1', 'EOG Right', 'EOG Left', 'EMG Chin', 'ECG I', 'RR',
# 'ECG II', 'Effort Tho', 'Flow Patient', 'Flow Patient', 'Effort Abd',
# 'SpO2', 'Pleth', 'Snore', 'Body', 'Pulse', 'Leg LEG1', 'Leg LEG2',
# 'EEG A1-A2', 'Imp']
# 设置日志
logger = logging.getLogger()
logger.setLevel(logging.NOTSET)
realtime = time.strftime('%Y%m%d', time.localtime(time.time()))
fh = logging.FileHandler(Path("history") / (realtime + ".log"), mode='a')
fh.setLevel(logging.NOTSET)
fh.setFormatter(logging.Formatter("%(asctime)s: %(message)s"))
logger.addHandler(fh)
ch = logging.StreamHandler()
ch.setLevel(logging.NOTSET)
ch.setFormatter(logging.Formatter("%(asctime)s: %(message)s"))
logger.addHandler(ch)
logging.getLogger('matplotlib.font_manager').disabled = True
logging.info("------------------------------------")
class Quality_Relabel:
# 可选择的通道
base_channel = ['EEG F3-A2', 'EEG F4-A1', 'EEG C3-A2', 'EEG C4-A1', 'EEG O1-A2', 'EEG O2-A1', 'EOG Right',
'EOG Left', 'EMG Chin', 'ECG I', 'RR', 'ECG II', 'Effort Tho', 'Flow Patient', 'Flow Patient', 'HR',
'Effort Abd', 'SpO2', 'Pleth', 'Snore', 'Body', 'Pulse', 'Leg LEG1', 'Leg LEG2', 'EEG A1-A2', 'Imp']
# 显示事件
base_event = ["Hypopnea", "Central apnea", "Obstructive apnea", "Mixed apnea", "Desaturation"]
# 设定事件和其对应颜色
# event_code color event
# 0 黑色 背景
# 1 粉色 低通气
# 2 蓝色 中枢性
# 3 红色 阻塞型
# 4 灰色 混合型
# 5 绿色 血氧饱和度下降
# 6 橙色 大体动
# 7 橙色 小体动
# 8 橙色 深呼吸
# 9 橙色 脉冲体动
# 10 橙色 无效片段
color_cycle = ["black", "pink", "blue", "red", "silver", "green", "orange", "orange", "orange", "orange", "orange"]
# assert len(color_cycle) == len(base_event) + 1, "基础事件数量与颜色数量不一致"
def __init__(self, all_path: List, sampNo: int, frequency: int = 100, bcg_frequency: int = 1000,
extend_second: int = 0,
channel_list: List[str] = ['Effort Tho', 'Effort Abd', 'SpO2', 'Flow Patient', 'Flow Patient'],
focus_event_list: List[str] = ["Obstructive apnea"]):
"""
:param sampNo: 编号选择
:param frequency: 显示采样率
:param bcg_frequency: BCG信号采样率
:param channel_list: 显示的通道
:param focus_event_list: 关注暂停事件
"""
self.PSG_Data_Path = all_path[0]
self.PSG_Label_Path = all_path[1]
self.BCG_Data_Path = all_path[2]
self.BCG_Label_Path = all_path[3]
self.Artifact_Label_Path = all_path[4]
self.Artifact_Offset_Path = all_path[5]
self.sampNo = sampNo
self.channel_list = channel_list
self.focus_event_list = focus_event_list
self.frequency = frequency
self.bcg_frequency = bcg_frequency
self.extend_second = extend_second
self.ecg_start_time = None
# 用来显示颜色时按点匹配事件
self.ecg_event_label = None
self.bcg_event_label = None
self.spo2_event_label = None
self.artifact_event_label = None
# 仅包含关注暂停事件的列表
self.ecg_event_label_filtered_df = None
self.bcg_event_label_filtered_df = None
# 所有事件列表
self.ecg_event_label_df = None
self.bcg_event_label_df = None
# 各通道信号
self.signal_select = {}
self.check_channel()
self.read_data(frequency, bcg_frequency)
self.read_event()
self.read_artifact_label()
def check_channel(self):
for i in self.channel_list:
if i not in self.base_channel:
logging.debug(f"{i} 不存在于常见通道名中")
print(f"常见通道名:{self.channel_list}")
def read_data(self, frequency: int = 100, bcg_frequency: int = 1000):
bcg_path = self.BCG_Data_Path / f"{self.sampNo}samp.npy"
ecg_path = self.PSG_Data_Path / f"A{str(self.sampNo).rjust(7, '0')}.edf"
if not bcg_path.exists():
logging.error(f"{bcg_path} 不存在!")
raise FileNotFoundError(f"{bcg_path} 不存在!")
if not ecg_path.exists():
logging.error(f"{ecg_path} 不存在!")
raise FileNotFoundError(f"{ecg_path} 不存在!")
with pyedflib.EdfReader(str(ecg_path.resolve())) as file:
signal_num = file.signals_in_file
logging.debug(f"{self.sampNo} EDF file signal number is {signal_num}")
signal_label = file.getSignalLabels()
logging.debug(f"{self.sampNo} EDF file signal label : {signal_label}")
self.ecg_start_time = file.getStartdatetime()
# 根据PSG记录长度生成事件表
self.ecg_event_label = np.zeros(
file.getFileDuration() * self.frequency + self.extend_second * self.frequency)
self.spo2_event_label = np.zeros(
file.getFileDuration() * self.frequency + self.extend_second * self.frequency)
# 打印PSG信息
file.file_info_long()
# sub_index 用于区分两个flow patient
sub_index = 1
logging.info("读取PSG信号····")
for i, index in enumerate(signal_label):
# 仅加载选中的通道
if index in self.channel_list:
# 重命名flow patient通道
if index == 'Flow Patient':
if sub_index == 1:
index = "Flow T"
else:
index = "Flow P"
sub_index += 1
signal = file.readSignal(i)
sample_frequency = file.getSampleFrequency(i)
# 读取采样率 进行重采样
if sample_frequency < frequency:
signal = signal.repeat(frequency / sample_frequency)
elif sample_frequency > frequency:
signal = signal[::int(sample_frequency / frequency)]
self.signal_select[index] = signal
logging.info(f"完成读取PSG: {index}")
# 加载心晓信号
logging.info("读取心晓信号····")
signal = np.load(bcg_path)
preprocessing = BCG_Operation(sample_rate=bcg_frequency)
# 20Hz低通去噪
signal1 = preprocessing.Butterworth(signal, 'lowpass', low_cut=20, order=3)
# 0.7Hz 低通提取呼吸
signal2 = preprocessing.Butterworth(signal, 'lowpass', low_cut=0.7, order=3)
# 进行降采样
signal1 = signal1[::int(bcg_frequency / frequency)]
signal2 = signal2[::int(bcg_frequency / frequency)]
# 根据心晓事件长度生成事件记录
self.bcg_event_label = np.zeros(len(signal) + self.extend_second * self.frequency)
self.signal_select['orgdata'] = signal1
self.signal_select['0.7lowpass_resp'] = signal2
for signal_key in self.signal_select.keys():
self.signal_select[signal_key] = np.append(self.signal_select[signal_key],
self.signal_select[signal_key].mean().astype(int).repeat(
self.extend_second * self.frequency))
def read_event(self):
bcg_label_path = self.BCG_Label_Path / f"export{self.sampNo}_all.csv"
ecg_label_path = self.PSG_Label_Path / f"export{self.sampNo}.csv"
if not bcg_label_path.exists():
logging.error(f"{bcg_label_path} 不存在!")
raise FileNotFoundError(f"{bcg_label_path} 不存在!")
if not ecg_label_path.exists():
logging.error(f"{ecg_label_path} 不存在!")
raise FileNotFoundError(f"{ecg_label_path} 不存在!")
df = pd.read_csv(ecg_label_path, encoding='gbk')
self.ecg_event_label_df = df
# 过滤不关注的事件
df2 = df[df["Event type"].isin(self.focus_event_list)]
# 根据epoch进行排列方便索引
df2 = df2.sort_values(by='Epoch')
self.ecg_event_label_filtered_df = df2
logging.info("遍历PSG事件···")
for one_data in tqdm(df.index, ncols=80):
one_data = df.loc[one_data]
# 通过开始时间推算事件起始点与结束点
event_start_time = datetime.strptime(one_data["Date"] + " " + one_data["Time"], '%Y/%m/%d %H:%M:%S')
SP = (event_start_time - self.ecg_start_time).seconds
# 对括号进行切分避免Duration 20 (20) 这种带括号的问题
EP = int(SP + float(one_data["Duration"].split("(")[0]))
SP *= self.frequency
EP *= self.frequency
# 对事件重新编码并存到事件记录表中
if one_data["Event type"] == "Hypopnea":
self.ecg_event_label[SP:EP] = 1
elif one_data["Event type"] == "Central apnea":
self.ecg_event_label[SP:EP] = 2
elif one_data["Event type"] == "Obstructive apnea":
self.ecg_event_label[SP:EP] = 3
elif one_data["Event type"] == "Mixed apnea":
self.ecg_event_label[SP:EP] = 4
elif one_data["Event type"] == "Desaturation":
self.spo2_event_label[SP:EP] = 5
# 读取心晓事件
df = pd.read_csv(bcg_label_path, encoding='gbk')
df["new_start"] = df["new_start"].astype("int")
df["new_end"] = df["new_end"].astype("int")
self.bcg_event_label_df = df
# 过滤不关注事件
df2 = df[df["Event type"].isin(self.focus_event_list)]
df2 = df2.sort_values(by='Epoch')
self.bcg_event_label_filtered_df = df2
logging.info("遍历心晓事件···")
for one_data in tqdm(df.index):
one_data = df.loc[one_data]
SP = one_data["new_start"] * self.frequency
EP = one_data["new_end"] * self.frequency
if one_data["Event type"] == "Hypopnea":
self.bcg_event_label[SP:EP] = 1
elif one_data["Event type"] == "Central apnea":
self.bcg_event_label[SP:EP] = 2
elif one_data["Event type"] == "Obstructive apnea":
self.bcg_event_label[SP:EP] = 3
elif one_data["Event type"] == "Mixed apnea":
self.bcg_event_label[SP:EP] = 4
def read_artifact_label(self):
all_offset_length = pd.read_excel(self.Artifact_Offset_Path)
offset_length = all_offset_length[all_offset_length['数据编号'] == self.sampNo]['from_code'].values[0]
artifact_label_path = self.Artifact_Label_Path / f"Artifact_{self.sampNo}.txt"
artifact_label = pd.read_csv(artifact_label_path, header=None).to_numpy().reshape(-1, 4)
self.artifact_event_label = np.zeros(len(self.bcg_event_label) + self.extend_second * self.frequency)
for i, artifact_type, SP, EP in artifact_label:
SP = (int(SP) + offset_length) // (1000 // self.frequency)
EP = (int(EP) + offset_length) // (1000 // self.frequency)
artifact_type = int(artifact_type) + 5
SP = 0 if SP < 0 else SP
if EP < 0:
continue
if SP > len(self.bcg_event_label):
continue
self.artifact_event_label[SP:EP] = artifact_type
# assert len(self.ecg_event_label_filtered_df) == len(self.bcg_event_label_filtered_df), \
# f"PSG与心晓事件数量不一致, PSG事件数量{len(self.ecg_event_label_filtered_df)},
# 心晓事件数量{len(self.bcg_event_label_filtered_df)}"
def show_one_event(self, bcg_index: int, ecg_index: int, front_add_second: int = 60,
back_add_second: int = 60):
"""
:param bcg_index: 心晓事件在csv中行号
:param ecg_index: PSG事件在csv中序号
:param front_add_second: 向前延伸时间
:param back_add_second: 向后延伸时间
:return:
"""
# 获取事件实际在csv文件中的序号
bcg_real_index = self.bcg_event_label_filtered_df.index[bcg_index],
ecg_real_index = self.ecg_event_label_filtered_df.index[ecg_index],
one_bcg_data = self.bcg_event_label_df.loc[bcg_real_index]
one_ecg_data = self.ecg_event_label_df.loc[ecg_real_index]
# 获取ECG事件开始与结束时间
event_start_time = datetime.strptime(one_ecg_data["Date"] + " " + one_ecg_data["Time"], '%Y/%m/%d %H:%M:%S')
ecg_SP = (event_start_time - self.ecg_start_time).seconds
ecg_duration = int(float(str(one_ecg_data["Duration"]).split("(")[0]) + 0.5)
ecg_EP = ecg_SP + ecg_duration
# 获取BCG事件开始与结束时间
bcg_SP = one_bcg_data["new_start"]
bcg_EP = one_bcg_data["new_end"]
bcg_duration = bcg_EP - bcg_SP
logging.info(f"sampNo:{self.sampNo} "
f"bcg[index:{bcg_index} epoch:{one_bcg_data['Epoch']} event:{one_bcg_data['Event type']}] "
f"ecg:[index:{ecg_index} epoch:{one_ecg_data['Epoch']} event:{one_ecg_data['Event type']}]")
if one_bcg_data['Event type'] != one_ecg_data['Event type']:
logging.error(f"sampNo:{self.sampNo} PSG事件与心晓时间不一致请排查"
f"bcg[index:{bcg_index} epoch:{one_bcg_data['Epoch']} event:{one_bcg_data['Event type']}] "
f"ecg:[index:{ecg_index} epoch:{one_ecg_data['Epoch']} event:{one_ecg_data['Event type']}]")
raise ValueError()
# 进行向两边延展
ecg_SP = ecg_SP - front_add_second
ecg_EP = ecg_EP + back_add_second
bcg_SP = bcg_SP - front_add_second - (ecg_duration - bcg_duration) // 2
bcg_EP = bcg_EP + back_add_second + (ecg_duration - bcg_duration) // 2
# 绘图
plt.figure(figsize=(12, 6), dpi=150)
# 各个子图之间的比例
gs = gridspec.GridSpec(7, 1, height_ratios=[1, 1, 1, 1, 1, 3, 1])
plt.subplots_adjust(top=1, bottom=0, right=1, left=0, hspace=0, wspace=0)
plt.margins(0, 0)
plt.tight_layout()
# 绘制 Flow1
plt.subplot(gs[0])
self.plt_channel(plt_=plt, SP=ecg_SP, EP=ecg_EP, channel="Flow T")
# 绘制 Flow2
plt.subplot(gs[1])
self.plt_channel(plt_=plt, SP=ecg_SP, EP=ecg_EP, channel="Flow P",
title=f"PSG sampNo:{self.sampNo} Index:{ecg_index + 1}/{len(self.ecg_event_label_filtered_df)} "
f"Epoch:{one_ecg_data['Epoch']} Duration:{ecg_duration}")
plt.subplot(gs[2])
self.plt_channel(plt_=plt, SP=ecg_SP, EP=ecg_EP, channel="Effort Tho")
plt.subplot(gs[3])
self.plt_channel(plt_=plt, SP=ecg_SP, EP=ecg_EP, channel="Effort Abd")
plt.subplot(gs[4])
self.plt_channel(plt_=plt, SP=ecg_SP, EP=ecg_EP, channel="SpO2", event_code=[5])
plt.subplot(gs[5])
self.plt_channel(plt_=plt, SP=bcg_SP, EP=bcg_EP, channel="orgdata",
event_code=[6, 7, 8, 9, 10, 1, 2, 3, 4],
event_show_under=False)
plt.subplot(gs[6])
self.plt_channel(plt_=plt, SP=bcg_SP, EP=bcg_EP, channel="0.7lowpass_resp",
event_code=[6, 7, 8, 9, 10, 1, 2, 3, 4],
event_show_under=False,
ax_bottom=True,
title=f"心晓 sampNo:{self.sampNo} Index:{bcg_index + 1}/{len(self.bcg_event_label_filtered_df)} "
f"Epoch:{one_bcg_data['Epoch']} Duration:{bcg_duration}",
)
figManager = plt.get_current_fig_manager()
figManager.window.showMaximized()
plt.show()
def plt_channel(self, plt_, SP, EP, channel, event_code=[1, 2, 3, 4], event_show_under=False,
ax_top=False, ax_bottom=False, ax_left=True, ax_right=False, title=None):
"""
:param plt_:
:param SP: 显示开始秒数
:param EP: 显示结束秒数
:param channel: 通道名称
:param event_code: 要上色的事件编号
:param event_show_under: 事件颜色显示在信号下面
:param ax_top: 显示上框线
:param ax_bottom: 显示下框线
:param ax_left: 显示左框线
:param ax_right: 显示右框线
:param title: 显示标题
:return:
"""
linestyle = "-"
SP = 0 if SP < 0 else SP
plt_.plot(np.linspace(SP, EP, (EP - SP) * self.frequency),
self.signal_select[channel][SP * self.frequency:EP * self.frequency], label=channel,
color=self.color_cycle[0])
# plt_.arrow((SP + (EP - SP) // 2) * self.frequency,
# self.signal_select[channel][SP * self.frequency:EP * self.frequency].max(),
# (SP + (EP - SP) // 2) * self.frequency,
# self.signal_select[channel][SP * self.frequency:EP * self.frequency].mean(),
# )
for j in event_code:
if channel == "SpO2":
mask = self.spo2_event_label[SP * self.frequency:EP * self.frequency] == j
elif channel == "orgdata" or channel == "0.7lowpass_resp":
if j <= 5:
mask = self.bcg_event_label[SP * self.frequency:EP * self.frequency] == j
else:
mask = self.artifact_event_label[SP * self.frequency:EP * self.frequency] == j
linestyle = "-"
else:
mask = self.ecg_event_label[SP * self.frequency:EP * self.frequency] == j
if event_show_under:
min_point = self.signal_select[channel][SP * self.frequency:EP * self.frequency].min()
len_segment = EP * self.frequency - SP * self.frequency
y = (min_point.repeat(len_segment) * mask).astype('float')
np.place(y, y == 0, np.nan)
else:
y = (self.signal_select[channel][SP * self.frequency:EP * self.frequency] * mask).astype('float')
np.place(y, y == 0, np.nan)
plt_.plot(np.linspace(SP, EP, (EP - SP) * self.frequency), y, color=self.color_cycle[j],
linestyle=linestyle)
plt_.legend(loc=1)
if title is not None:
plt_.title(title)
ax = plt.gca()
ax.spines["top"].set_visible(ax_top)
ax.spines["right"].set_visible(ax_right)
ax.spines["bottom"].set_visible(ax_bottom)
ax.spines["left"].set_visible(ax_left)
# xticks = [[]] if xticks else [range(SP, EP, 5), [str(i) for i in range(0, (EP - SP), 5)]]
# print(xticks)
# plt_.xticks(*xticks) # 去掉x轴
def show_all_event(self, start_bcg_index: int = 0, front_add_second: int = 60,
back_add_second: int = 60):
for index in range(start_bcg_index, len(self.bcg_event_label_filtered_df)):
self.show_one_event(index,
index,
front_add_second=front_add_second,
back_add_second=back_add_second
)
if __name__ == '__main__':
PSG_Data_Path = "../Data/PSG/"
PSG_Label_Path = "../Data/PSG_label/"
BCG_Data_Path = "../Data/BCG/"
BCG_Label_Path = "../Data/BCG_label/"
all_paths = [PSG_Data_Path, PSG_Label_Path, BCG_Data_Path, BCG_Label_Path]
prepareData = Quality_Relabel(all_paths, 670)
prepareData.show_all_event()