添加主程序接口

添加血氧信号与标签的显示
调整了信号顺序
添加日志模块
添加程序注释
This commit is contained in:
mmmistgun 2022-03-29 22:18:46 +08:00
parent 85de1815b2
commit d7725857a9
2 changed files with 183 additions and 159 deletions

41
Main_Quality_Relabel.py Normal file
View File

@ -0,0 +1,41 @@
#!/usr/bin/python
# -*- coding: UTF-8 -*-
"""
@author:Marques
@file:Main_Quality_Relabel.py
@email:admin@marques22.com
@email:2021022362@m.scnu.edu.cn
@time:2022/03/28
"""
from utils.Quality_Relabel import Quality_Relabel
# start-----一般不用修改------
# 绘图时的采样率
frequency = 100
# 心晓数据采样率
bcg_frequency = 1000
# end-----一般不用修改------
# 要遍历的事件
# 可选一个或多个 "Hypopnea" "Central apnea" "Obstructive apnea" "Mixed apnea"
focus_event_list = ["Obstructive apnea"]
# 信号显示事件前多少秒
front_add_second = 60
# 信号显示事件后多少秒
back_add_second = 60
# 样本编号
sampNo = 670
# 从第几个心晓事件数量开始
start_bcg_index = 0
# 用于心晓信号前面有一部分信号不可用PSG事件第几个事件对应于心晓第一个事件
shifting = 0
if __name__ == '__main__':
qualityRelabel = Quality_Relabel(sampNo=sampNo, frequency=frequency, bcg_frequency=bcg_frequency,
focus_event_list=focus_event_list)
qualityRelabel.show_all_event(start_bcg_index=start_bcg_index,
shifting=shifting,
front_add_second=front_add_second,
back_add_second=back_add_second)

View File

@ -1,5 +1,5 @@
# -*- coding: cp936 -*- # -*- coding: cp936 -*-
# 使用gbk编码才能显示 # 使用gbk编码才能读标签
""" """
@author:Marques @author:Marques
@file:Prepare_Data.py @file:Prepare_Data.py
@ -7,9 +7,9 @@
@email:2021022362@m.scnu.edu.cn @email:2021022362@m.scnu.edu.cn
@time:2022/03/26 @time:2022/03/26
""" """
from datetime import datetime import time
from typing import Union, List from typing import List
import logging
import pyedflib import pyedflib
from pathlib import Path from pathlib import Path
import numpy as np import numpy as np
@ -17,32 +17,53 @@ import pandas as pd
from matplotlib import pyplot as plt, gridspec from matplotlib import pyplot as plt, gridspec
from Preprocessing import BCG_Operation from Preprocessing import BCG_Operation
from tqdm import tqdm from tqdm import tqdm
from datetime import datetime
plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签 plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号 plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
# ['EEG F3-A2', 'EEG F4-A1', 'EEG C3-A2', 'EEG C4-A1', 'EEG O1-A2', # ['EEG F3-A2', 'EEG F4-A1', 'EEG C3-A2', 'EEG C4-A1', 'EEG O1-A2',
# 'EEG O2-A1', 'EOG Right', 'EOG Left', 'EMG Chin', 'ECG I', 'RR', # 'EEG O2-A1', 'EOG Right', 'EOG Left', 'EMG Chin', 'ECG I', 'RR',
# 'ECG II', 'Effort Tho', 'Flow Patient', 'Flow Patient', 'Effort Abd', # 'ECG II', 'Effort Tho', 'Flow Patient', 'Flow Patient', 'Effort Abd',
# 'SpO2', 'Pleth', 'Snore', 'Body', 'Pulse', 'Leg LEG1', 'Leg LEG2', # 'SpO2', 'Pleth', 'Snore', 'Body', 'Pulse', 'Leg LEG1', 'Leg LEG2',
# 'EEG A1-A2', 'Imp'] # 'EEG A1-A2', 'Imp']
class Prepare_Data:
# 设置日志
logger = logging.getLogger()
logger.setLevel(logging.NOTSET)
realtime = time.strftime('%Y%m%d', time.localtime(time.time()))
fh = logging.FileHandler(Path("../history") / (realtime + ".log"), mode='a')
fh.setLevel(logging.NOTSET)
fh.setFormatter(logging.Formatter("%(asctime)s: %(message)s"))
logger.addHandler(fh)
ch = logging.StreamHandler()
ch.setLevel(logging.NOTSET)
ch.setFormatter(logging.Formatter("%(asctime)s: %(message)s"))
logger.addHandler(ch)
logging.getLogger('matplotlib.font_manager').disabled = True
logging.info("------------------------------------")
class Quality_Relabel:
# 可选择的通道 # 可选择的通道
base_channel = ['EEG F3-A2', 'EEG F4-A1', 'EEG C3-A2', 'EEG C4-A1', 'EEG O1-A2', 'EEG O2-A1', 'EOG Right', base_channel = ['EEG F3-A2', 'EEG F4-A1', 'EEG C3-A2', 'EEG C4-A1', 'EEG O1-A2', 'EEG O2-A1', 'EOG Right',
'EOG Left', 'EMG Chin', 'ECG I', 'RR', 'ECG II', 'Effort Tho', 'Flow Patient', 'Flow Patient', 'HR', 'EOG Left', 'EMG Chin', 'ECG I', 'RR', 'ECG II', 'Effort Tho', 'Flow Patient', 'Flow Patient', 'HR',
'Effort Abd', 'SpO2', 'Pleth', 'Snore', 'Body', 'Pulse', 'Leg LEG1', 'Leg LEG2', 'EEG A1-A2', 'Imp'] 'Effort Abd', 'SpO2', 'Pleth', 'Snore', 'Body', 'Pulse', 'Leg LEG1', 'Leg LEG2', 'EEG A1-A2', 'Imp']
# 显示事件 # 显示事件
base_event = ["Hypopnea", "Central apnea", "Obstructive apnea", "Mixed apnea"] base_event = ["Hypopnea", "Central apnea", "Obstructive apnea", "Mixed apnea", "Desaturation"]
# 设定事件和其对应颜色 # 设定事件和其对应颜色
# 蓝色 背景 # event_code color event
# 粉色 低通气 # 0 蓝色 背景
# 橙色 中枢性 # 1 粉色 低通气
# 红色 阻塞型 与 混合型 # 2 橙色 中枢性
color_cycle = ["blue", "pink", "orange", "red", "red"] # 3 红色 阻塞型
# 4 红色 混合型
# 5 绿色 血氧饱和度下降
color_cycle = ["blue", "pink", "orange", "red", "red", "green"]
assert len(color_cycle) == len(base_event) + 1, "基础事件数量与颜色数量不一致" assert len(color_cycle) == len(base_event) + 1, "基础事件数量与颜色数量不一致"
def __init__(self, sampNo: int, frequency: int = 100, bcg_frequency: int = 1000, def __init__(self, sampNo: int, frequency: int = 100, bcg_frequency: int = 1000,
@ -67,6 +88,7 @@ class Prepare_Data:
# 用来显示颜色时按点匹配事件 # 用来显示颜色时按点匹配事件
self.ecg_event_label = None self.ecg_event_label = None
self.bcg_event_label = None self.bcg_event_label = None
self.spo2_event_label = None
# 仅包含关注暂停事件的列表 # 仅包含关注暂停事件的列表
self.ecg_event_label_filtered_df = None self.ecg_event_label_filtered_df = None
@ -86,7 +108,7 @@ class Prepare_Data:
def check_channel(self): def check_channel(self):
for i in self.channel_list: for i in self.channel_list:
if i not in self.base_channel: if i not in self.base_channel:
print(f"{i} 不存在于常见通道名中") logging.debug(f"{i} 不存在于常见通道名中")
print(f"常见通道名:{self.channel_list}") print(f"常见通道名:{self.channel_list}")
def read_data(self, frequency: int = 100, bcg_frequency: int = 1000): def read_data(self, frequency: int = 100, bcg_frequency: int = 1000):
@ -94,29 +116,32 @@ class Prepare_Data:
ecg_path = Path(f"../Data/ECG/A{str(self.sampNo).rjust(7, '0')}.edf") ecg_path = Path(f"../Data/ECG/A{str(self.sampNo).rjust(7, '0')}.edf")
if not bcg_path.exists(): if not bcg_path.exists():
logging.error(f"{bcg_path} 不存在!")
raise FileNotFoundError(f"{bcg_path} 不存在!") raise FileNotFoundError(f"{bcg_path} 不存在!")
if not ecg_path.exists(): if not ecg_path.exists():
logging.error(f"{ecg_path} 不存在!")
raise FileNotFoundError(f"{ecg_path} 不存在!") raise FileNotFoundError(f"{ecg_path} 不存在!")
with pyedflib.EdfReader(str(ecg_path.resolve())) as file: with pyedflib.EdfReader(str(ecg_path.resolve())) as file:
signal_num = file.signals_in_file signal_num = file.signals_in_file
print(f"{self.sampNo} EDF file signal number is {signal_num}") logging.debug(f"{self.sampNo} EDF file signal number is {signal_num}")
signal_label = file.getSignalLabels() signal_label = file.getSignalLabels()
print(f"{self.sampNo} EDF file signal label : {signal_label}") logging.debug(f"{self.sampNo} EDF file signal label : {signal_label}")
self.ecg_start_time = file.getStartdatetime() self.ecg_start_time = file.getStartdatetime()
# 根据PSG记录长度生成事件表 # 根据PSG记录长度生成事件表
self.ecg_event_label = np.zeros(file.getFileDuration() * self.frequency) self.ecg_event_label = np.zeros(file.getFileDuration() * self.frequency)
self.spo2_event_label = np.zeros(file.getFileDuration() * self.frequency)
# 打印PSG信息 # 打印PSG信息
file.file_info_long() file.file_info_long()
# sub_index 用于区分两个flow patient # sub_index 用于区分两个flow patient
sub_index = 1 sub_index = 1
logging.info("读取PSG信号····")
for i, index in enumerate(signal_label): for i, index in enumerate(signal_label):
# 仅加载选中的通道 # 仅加载选中的通道
if index in self.channel_list: if index in self.channel_list:
@ -124,7 +149,6 @@ class Prepare_Data:
if index == 'Flow Patient': if index == 'Flow Patient':
index = index + str(sub_index) index = index + str(sub_index)
sub_index += 1 sub_index += 1
signal = file.readSignal(i) signal = file.readSignal(i)
sample_frequency = file.getSampleFrequency(i) sample_frequency = file.getSampleFrequency(i)
# 读取采样率 进行重采样 # 读取采样率 进行重采样
@ -133,8 +157,10 @@ class Prepare_Data:
elif sample_frequency > frequency: elif sample_frequency > frequency:
signal = signal[::int(sample_frequency / frequency)] signal = signal[::int(sample_frequency / frequency)]
self.signal_select[index] = signal self.signal_select[index] = signal
logging.info(f"完成读取PSG: {index}")
# 加载心晓信号 # 加载心晓信号
logging.info("读取心晓信号····")
signal = np.load(bcg_path) signal = np.load(bcg_path)
preprocessing = BCG_Operation(sample_rate=bcg_frequency) preprocessing = BCG_Operation(sample_rate=bcg_frequency)
# 20Hz低通去噪 # 20Hz低通去噪
@ -156,21 +182,24 @@ class Prepare_Data:
ecg_label_path = Path(f"../Data/ECG_label/export{self.sampNo}.csv") ecg_label_path = Path(f"../Data/ECG_label/export{self.sampNo}.csv")
if not bcg_label_path.exists(): if not bcg_label_path.exists():
logging.error(f"{bcg_label_path} 不存在!")
raise FileNotFoundError(f"{bcg_label_path} 不存在!") raise FileNotFoundError(f"{bcg_label_path} 不存在!")
if not ecg_label_path.exists(): if not ecg_label_path.exists():
logging.error(f"{ecg_label_path} 不存在!")
raise FileNotFoundError(f"{ecg_label_path} 不存在!") raise FileNotFoundError(f"{ecg_label_path} 不存在!")
df = pd.read_csv(ecg_label_path, encoding='gbk') df = pd.read_csv(ecg_label_path, encoding='gbk')
self.ecg_event_label_df = df self.ecg_event_label_df = df
# 过滤不关注的事件 # 过滤不关注的事件
df = df[df["Event type"].isin(self.focus_event_list)] df2 = df[df["Event type"].isin(self.focus_event_list)]
# 根据epoch进行排列方便索引 # 根据epoch进行排列方便索引
df = df.sort_values(by='Epoch') df2 = df2.sort_values(by='Epoch')
self.ecg_event_label_filtered_df = df self.ecg_event_label_filtered_df = df2
for one_data in df.index: logging.info("遍历PSG事件···")
for one_data in tqdm(df.index, ncols=80):
one_data = df.loc[one_data] one_data = df.loc[one_data]
# 通过开始事件推算事件起始点与结束点 # 通过开始事件推算事件起始点与结束点
@ -190,6 +219,8 @@ class Prepare_Data:
self.ecg_event_label[SP:EP] = 3 self.ecg_event_label[SP:EP] = 3
elif one_data["Event type"] == "Mixed apnea": elif one_data["Event type"] == "Mixed apnea":
self.ecg_event_label[SP:EP] = 4 self.ecg_event_label[SP:EP] = 4
elif one_data["Event type"] == "Desaturation":
self.spo2_event_label[SP:EP] = 5
# 读取心晓事件 # 读取心晓事件
df = pd.read_csv(bcg_label_path, encoding='gbk') df = pd.read_csv(bcg_label_path, encoding='gbk')
@ -198,11 +229,11 @@ class Prepare_Data:
self.bcg_event_label_df = df self.bcg_event_label_df = df
# 过滤不关注事件 # 过滤不关注事件
df = df[df["Event type"].isin(self.focus_event_list)] df2 = df[df["Event type"].isin(self.focus_event_list)]
df = df.sort_values(by='Epoch') df2 = df2.sort_values(by='Epoch')
self.bcg_event_label_filtered_df = df self.bcg_event_label_filtered_df = df2
logging.info("遍历心晓事件···")
for one_data in df.index: for one_data in tqdm(df.index):
one_data = df.loc[one_data] one_data = df.loc[one_data]
SP = one_data["new_start"] * self.frequency SP = one_data["new_start"] * self.frequency
EP = one_data["new_end"] * self.frequency EP = one_data["new_end"] * self.frequency
@ -221,17 +252,19 @@ class Prepare_Data:
# 心晓事件数量{len(self.bcg_event_label_filtered_df)}" # 心晓事件数量{len(self.bcg_event_label_filtered_df)}"
def show_one_event(self, bcg_index: int, ecg_index: int, front_add_second: int = 60, def show_one_event(self, bcg_index: int, ecg_index: int, front_add_second: int = 60,
back_add_second: int = 60, main_SA_visual: int = 1): back_add_second: int = 60):
""" """
:param bcg_index: 心晓事件在csv中行号 :param bcg_index: 心晓事件在csv中行号
:param ecg_index: PSG事件在csv中序号 :param ecg_index: PSG事件在csv中序号
:param front_add_second: 向前延伸时间 :param front_add_second: 向前延伸时间
:param back_add_second: 向后延伸时间 :param back_add_second: 向后延伸时间
:param main_SA_visual: 1仅当前事件上色 0不上色 2所有事件上色
:return: :return:
""" """
one_bcg_data = self.bcg_event_label_df.loc[bcg_index] # 获取事件实际在csv文件中的序号
one_ecg_data = self.ecg_event_label_df.loc[ecg_index] bcg_real_index = self.bcg_event_label_filtered_df.index[bcg_index],
ecg_real_index = self.ecg_event_label_filtered_df.index[ecg_index],
one_bcg_data = self.bcg_event_label_df.loc[bcg_real_index]
one_ecg_data = self.ecg_event_label_df.loc[ecg_real_index]
# 获取ECG事件开始与结束时间 # 获取ECG事件开始与结束时间
event_start_time = datetime.strptime(one_ecg_data["Date"] + " " + one_ecg_data["Time"], '%Y/%m/%d %H:%M:%S') event_start_time = datetime.strptime(one_ecg_data["Date"] + " " + one_ecg_data["Time"], '%Y/%m/%d %H:%M:%S')
@ -243,7 +276,16 @@ class Prepare_Data:
bcg_SP = one_bcg_data["new_start"] bcg_SP = one_bcg_data["new_start"]
bcg_EP = one_bcg_data["new_end"] bcg_EP = one_bcg_data["new_end"]
bcg_duration = int(float(str(one_bcg_data["Duration"]).split("(")[0])) bcg_duration = int(float(str(one_bcg_data["Duration"]).split("(")[0]))
print(ecg_SP, ecg_EP, bcg_SP, bcg_EP)
logging.info(f"sampNo:{self.sampNo} "
f"bcg[index:{bcg_index} epoch:{one_bcg_data['Epoch']} event:{one_bcg_data['Event type']}] "
f"ecg:[index:{ecg_index} epoch:{one_ecg_data['Epoch']} event:{one_ecg_data['Event type']}]")
if one_bcg_data['Event type'] != one_ecg_data['Event type']:
logging.error(f"sampNo:{self.sampNo} PSG事件与心晓时间不一致请排查"
f"bcg[index:{bcg_index} epoch:{one_bcg_data['Epoch']} event:{one_bcg_data['Event type']}] "
f"ecg:[index:{ecg_index} epoch:{one_ecg_data['Epoch']} event:{one_ecg_data['Event type']}]")
raise ValueError()
# 进行向两边延展 # 进行向两边延展
ecg_SP = ecg_SP - front_add_second ecg_SP = ecg_SP - front_add_second
@ -253,158 +295,99 @@ class Prepare_Data:
# 绘图 # 绘图
plt.figure(figsize=(12, 6), dpi=150) plt.figure(figsize=(12, 6), dpi=150)
gs = gridspec.GridSpec(7, 1, height_ratios=[1, 1, 1, 3, 1, 1, 1]) # 各个子图之间的比例
gs = gridspec.GridSpec(7, 1, height_ratios=[1, 1, 1, 1, 1, 3, 1])
plt.subplots_adjust(top=1, bottom=0, right=1, left=0, hspace=0, wspace=0) plt.subplots_adjust(top=1, bottom=0, right=1, left=0, hspace=0, wspace=0)
plt.margins(0, 0) plt.margins(0, 0)
plt.tight_layout() plt.tight_layout()
# 绘制 Flow1
plt.subplot(gs[0]) plt.subplot(gs[0])
# ['Effort Tho', 'Effort Abd', 'SpO2', 'Flow Patient', 'Flow Patient'] self.plt_channel(plt_=plt, SP=ecg_SP, EP=ecg_EP, channel="Flow Patient1")
plt.plot(np.linspace(ecg_SP, ecg_EP, (ecg_EP - ecg_SP) * self.frequency),
self.signal_select["Effort Tho"][ecg_SP * self.frequency:ecg_EP * self.frequency], label="Effort Tho")
# 进行事件颜色标注
for j in range(1, 5):
mask = self.ecg_event_label[ecg_SP * self.frequency:ecg_EP * self.frequency] == j
y = (self.signal_select["Effort Tho"][ecg_SP * self.frequency:ecg_EP * self.frequency] * mask).astype(
'float')
np.place(y, y == 0, np.nan)
plt.plot(np.linspace(ecg_SP, ecg_EP, (ecg_EP - ecg_SP) * self.frequency), y, color=self.color_cycle[j])
# 显示图注
plt.legend(loc=1)
# 隐藏部分边框
ax = plt.gca()
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.spines["bottom"].set_visible(False)
# 去掉x轴
plt.xticks([])
# 绘制 Flow2
plt.subplot(gs[1]) plt.subplot(gs[1])
# ['Effort Tho', 'Effort Abd', 'SpO2', 'Flow Patient', 'Flow Patient'] self.plt_channel(plt_=plt, SP=ecg_SP, EP=ecg_EP, channel="Flow Patient2",
plt.plot(np.linspace(ecg_SP, ecg_EP, (ecg_EP - ecg_SP) * self.frequency), title=f"PSG sampNo:{self.sampNo} Epoch:{one_ecg_data['Epoch']} Duration:{one_ecg_data['Duration']}")
self.signal_select["Effort Abd"][ecg_SP * self.frequency:ecg_EP * self.frequency], label="Effort Abd")
for j in range(1, 5):
mask = self.ecg_event_label[ecg_SP * self.frequency:ecg_EP * self.frequency] == j
y = (self.signal_select["Effort Abd"][ecg_SP * self.frequency:ecg_EP * self.frequency] * mask).astype(
'float')
np.place(y, y == 0, np.nan)
plt.plot(np.linspace(ecg_SP, ecg_EP, (ecg_EP - ecg_SP) * self.frequency), y, color=self.color_cycle[j])
plt.title(f"sampNo:{self.sampNo} Epoch:{one_ecg_data['Epoch']} Duration:{one_ecg_data['Duration']}")
plt.legend(loc=1)
ax = plt.gca()
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.spines["bottom"].set_visible(False)
plt.xticks([])
plt.subplot(gs[2]) plt.subplot(gs[2])
# ['Effort Tho', 'Effort Abd', 'SpO2', 'Flow Patient', 'Flow Patient'] self.plt_channel(plt_=plt, SP=ecg_SP, EP=ecg_EP, channel="Effort Tho")
plt.plot(np.linspace(bcg_SP, bcg_EP, (bcg_EP - bcg_SP) * self.frequency),
self.signal_select["xin_xiao_respire"][bcg_SP * self.frequency:bcg_EP * self.frequency], label="心晓 呼吸")
min_bcg = self.signal_select["xin_xiao_respire"][bcg_SP * self.frequency:bcg_EP * self.frequency].min()
len_bcg = bcg_EP * self.frequency - bcg_SP * self.frequency
for j in range(1, 5):
mask = self.bcg_event_label[bcg_SP * self.frequency:bcg_EP * self.frequency] == j
y = (min_bcg.repeat(len_bcg) * mask).astype('float')
np.place(y, y == 0, np.nan)
plt.plot(np.linspace(bcg_SP, bcg_EP, (bcg_EP - bcg_SP) * self.frequency), y, color=self.color_cycle[j])
# plt.title(f"sampNo:{self.sampNo} Epoch:{one_bcg_data['Epoch']} Duration:{one_bcg_data['Duration']}")
plt.legend(loc=1)
ax = plt.gca()
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.spines["bottom"].set_visible(False)
plt.xticks([])
plt.subplot(gs[3]) plt.subplot(gs[3])
# ['Effort Tho', 'Effort Abd', 'SpO2', 'Flow Patient', 'Flow Patient'] self.plt_channel(plt_=plt, SP=ecg_SP, EP=ecg_EP, channel="Effort Abd")
plt.plot(np.linspace(bcg_SP, bcg_EP, (bcg_EP - bcg_SP) * self.frequency),
self.signal_select["xin_xiao"][bcg_SP * self.frequency:bcg_EP * self.frequency], label="心晓")
min_bcg = self.signal_select["xin_xiao"][bcg_SP * self.frequency:bcg_EP * self.frequency].min()
len_bcg = bcg_EP * self.frequency - bcg_SP * self.frequency
for j in range(1, 5):
mask = self.bcg_event_label[bcg_SP * self.frequency:bcg_EP * self.frequency] == j
y = (min_bcg.repeat(len_bcg) * mask).astype('float')
np.place(y, y == 0, np.nan)
plt.plot(np.linspace(bcg_SP, bcg_EP, (bcg_EP - bcg_SP) * self.frequency), y, color=self.color_cycle[j])
plt.title(f"sampNo:{self.sampNo} Epoch:{one_bcg_data['Epoch']} Duration:{one_bcg_data['Duration']}")
plt.legend(loc=1)
ax = plt.gca()
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.spines["bottom"].set_visible(False)
plt.xticks([]) # 去掉x轴
plt.subplot(gs[4]) plt.subplot(gs[4])
# ['Effort Tho', 'Effort Abd', 'SpO2', 'Flow Patient', 'Flow Patient'] self.plt_channel(plt_=plt, SP=ecg_SP, EP=ecg_EP, channel="SpO2", event_code=[5])
plt.plot(np.linspace(ecg_SP, ecg_EP, (ecg_EP - ecg_SP) * self.frequency),
self.signal_select["Flow Patient1"][ecg_SP * self.frequency:ecg_EP * self.frequency],
label="Flow Patient1")
for j in range(1, 5):
mask = self.ecg_event_label[ecg_SP * self.frequency:ecg_EP * self.frequency] == j
y = (self.signal_select["Flow Patient1"][ecg_SP * self.frequency:ecg_EP * self.frequency] * mask).astype(
'float')
np.place(y, y == 0, np.nan)
plt.plot(np.linspace(ecg_SP, ecg_EP, (ecg_EP - ecg_SP) * self.frequency), y, color=self.color_cycle[j])
plt.legend(loc=1)
ax = plt.gca()
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.spines["bottom"].set_visible(False)
plt.xticks([]) # 去掉x轴
plt.subplot(gs[5]) plt.subplot(gs[5])
# ['Effort Tho', 'Effort Abd', 'SpO2', 'Flow Patient', 'Flow Patient'] self.plt_channel(plt_=plt, SP=bcg_SP, EP=bcg_EP, channel="xin_xiao", event_show_under=True)
plt.plot(np.linspace(ecg_SP, ecg_EP, (ecg_EP - ecg_SP) * self.frequency),
self.signal_select["Flow Patient2"][ecg_SP * self.frequency:ecg_EP * self.frequency],
label="Flow Patient2")
for j in range(1, 5):
mask = self.ecg_event_label[ecg_SP * self.frequency:ecg_EP * self.frequency] == j
y = (self.signal_select["Flow Patient2"][ecg_SP * self.frequency:ecg_EP * self.frequency] * mask).astype(
'float')
np.place(y, y == 0, np.nan)
plt.plot(np.linspace(ecg_SP, ecg_EP, (ecg_EP - ecg_SP) * self.frequency), y, color=self.color_cycle[j])
plt.legend(loc=1)
ax = plt.gca()
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.spines["bottom"].set_visible(False)
plt.xticks([])
plt.subplot(gs[6]) plt.subplot(gs[6])
# ['Effort Tho', 'Effort Abd', 'SpO2', 'Flow Patient', 'Flow Patient'] self.plt_channel(plt_=plt, SP=bcg_SP, EP=bcg_EP, channel="xin_xiao_respire", event_show_under=True,
plt.plot(np.linspace(ecg_SP, ecg_EP, (ecg_EP - ecg_SP) * self.frequency), ax_bottom=True,
self.signal_select["SpO2"][ecg_SP * self.frequency:ecg_EP * self.frequency], label="SpO2") title=f"心晓 sampNo:{self.sampNo} Epoch:{one_bcg_data['Epoch']} Duration:{one_bcg_data['Duration']}",
plt.legend(loc=1) )
ax = plt.gca()
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.spines["bottom"].set_visible(False)
plt.xticks([])
plt.show() plt.show()
def show_all_event(self, start_index: int = 0, shifting: int = 0, front_add_second: int = 60, def plt_channel(self, plt_, SP, EP, channel, event_code=[1, 2, 3, 4], event_show_under=False,
back_add_second: int = 60, main_SA_visual: int = 1): ax_top=False, ax_bottom=False, ax_left=True, ax_right=False, title=None):
"""
for index in range(start_index, len(self.bcg_event_label_filtered_df)): :param plt_:
self.show_one_event(self.bcg_event_label_filtered_df.index[index], :param SP: 显示开始秒数
self.ecg_event_label_filtered_df.index[index + shifting], :param EP: 显示结束秒数
:param channel: 通道名称
:param event_code: 要上色的事件编号
:param event_show_under: 事件颜色显示在信号下面
:param ax_top: 显示上框线
:param ax_bottom: 显示下框线
:param ax_left: 显示左框线
:param ax_right: 显示右框线
:param title: 显示标题
:return:
"""
plt_.plot(np.linspace(SP, EP, (EP - SP) * self.frequency),
self.signal_select[channel][SP * self.frequency:EP * self.frequency], label=channel)
for j in event_code:
if channel == "SpO2":
mask = self.spo2_event_label[SP * self.frequency:EP * self.frequency] == j
else:
mask = self.ecg_event_label[SP * self.frequency:EP * self.frequency] == j
if event_show_under:
min_point = self.signal_select[channel][SP * self.frequency:EP * self.frequency].min()
len_segment = EP * self.frequency - SP * self.frequency
y = (min_point.repeat(len_segment) * mask).astype('float')
np.place(y, y == 0, np.nan)
else:
y = (self.signal_select[channel][SP * self.frequency:EP * self.frequency] * mask).astype('float')
np.place(y, y == 0, np.nan)
plt_.plot(np.linspace(SP, EP, (EP - SP) * self.frequency), y, color=self.color_cycle[j])
plt_.legend(loc=1)
if title is not None:
plt_.title(title)
ax = plt.gca()
ax.spines["top"].set_visible(ax_top)
ax.spines["right"].set_visible(ax_right)
ax.spines["bottom"].set_visible(ax_bottom)
ax.spines["left"].set_visible(ax_left)
# xticks = [[]] if xticks else [range(SP, EP, 5), [str(i) for i in range(0, (EP - SP), 5)]]
# print(xticks)
# plt_.xticks(*xticks) # 去掉x轴
def show_all_event(self, start_bcg_index: int = 0, shifting: int = 0, front_add_second: int = 60,
back_add_second: int = 60):
for index in range(start_bcg_index, len(self.bcg_event_label_filtered_df)):
self.show_one_event(index,
index + shifting,
front_add_second=front_add_second, front_add_second=front_add_second,
back_add_second=back_add_second, back_add_second=back_add_second
main_SA_visual=main_SA_visual
) )
def get_fft(self):
pass
if __name__ == '__main__': if __name__ == '__main__':
prepareData = Prepare_Data(670) prepareData = Quality_Relabel(670)
prepareData.show_all_event() prepareData.show_all_event()