From d384d38e5a8b5af3aa3dee0f1cd734370feb532e Mon Sep 17 00:00:00 2001 From: mmmistgun <20172333133@m.scnu.edu.cn> Date: Sun, 27 Mar 2022 22:10:32 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8D=8A=E6=88=90=E5=93=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 + utils/Prepare_Data.py | 410 +++++++++++++++++++++++++++++++++++++++++ utils/Preprocessing.py | 179 ++++++++++++++++++ 3 files changed, 591 insertions(+) create mode 100644 README.md create mode 100644 utils/Prepare_Data.py create mode 100644 utils/Preprocessing.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..198ef99 --- /dev/null +++ b/README.md @@ -0,0 +1,2 @@ +# Quality_Relabel + diff --git a/utils/Prepare_Data.py b/utils/Prepare_Data.py new file mode 100644 index 0000000..cfec3b8 --- /dev/null +++ b/utils/Prepare_Data.py @@ -0,0 +1,410 @@ +# -*- coding: cp936 -*- +# 使用gbk编码才能显示 +""" +@author:Marques +@file:Prepare_Data.py +@email:admin@marques22.com +@email:2021022362@m.scnu.edu.cn +@time:2022/03/26 +""" +from datetime import datetime +from typing import Union, List + +import pyedflib +from pathlib import Path +import numpy as np +import pandas as pd +from matplotlib import pyplot as plt, gridspec +from Preprocessing import BCG_Operation +from tqdm import tqdm + +plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签 +plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号 + + +# ['EEG F3-A2', 'EEG F4-A1', 'EEG C3-A2', 'EEG C4-A1', 'EEG O1-A2', +# 'EEG O2-A1', 'EOG Right', 'EOG Left', 'EMG Chin', 'ECG I', 'RR', +# 'ECG II', 'Effort Tho', 'Flow Patient', 'Flow Patient', 'Effort Abd', +# 'SpO2', 'Pleth', 'Snore', 'Body', 'Pulse', 'Leg LEG1', 'Leg LEG2', +# 'EEG A1-A2', 'Imp'] + +class Prepare_Data: + # 可选择的通道 + base_channel = ['EEG F3-A2', 'EEG F4-A1', 'EEG C3-A2', 'EEG C4-A1', 'EEG O1-A2', 'EEG O2-A1', 'EOG Right', + 'EOG Left', 'EMG Chin', 'ECG I', 'RR', 'ECG II', 'Effort Tho', 'Flow Patient', 'Flow Patient', 'HR', + 'Effort Abd', 'SpO2', 'Pleth', 'Snore', 'Body', 'Pulse', 'Leg LEG1', 'Leg LEG2', 'EEG A1-A2', 'Imp'] + + # 显示事件 + base_event = ["Hypopnea", "Central apnea", "Obstructive apnea", "Mixed apnea"] + + # 设定事件和其对应颜色 + # 蓝色 背景 + # 粉色 低通气 + # 橙色 中枢性 + # 红色 阻塞型 与 混合型 + color_cycle = ["blue", "pink", "orange", "red", "red"] + assert len(color_cycle) == len(base_event) + 1, "基础事件数量与颜色数量不一致" + + def __init__(self, sampNo: int, frequency: int = 100, bcg_frequency: int = 1000, + channel_list: List[str] = ['Effort Tho', 'Effort Abd', 'SpO2', 'Flow Patient', 'Flow Patient'], + focus_event_list: List[str] = ["Obstructive apnea"]): + """ + + :param sampNo: 编号选择 + :param frequency: 显示采样率 + :param bcg_frequency: BCG信号采样率 + :param channel_list: 显示的通道 + :param focus_event_list: 关注暂停事件 + """ + self.sampNo = sampNo + self.channel_list = channel_list + self.focus_event_list = focus_event_list + self.frequency = frequency + self.bcg_frequency = bcg_frequency + + self.ecg_start_time = None + + # 用来显示颜色时按点匹配事件 + self.ecg_event_label = None + self.bcg_event_label = None + + # 仅包含关注暂停事件的列表 + self.ecg_event_label_filtered_df = None + self.bcg_event_label_filtered_df = None + + # 所有事件列表 + self.ecg_event_label_df = None + self.bcg_event_label_df = None + + # 各通道信号 + self.signal_select = {} + + self.check_channel() + self.read_data(frequency, bcg_frequency) + self.read_event() + + def check_channel(self): + for i in self.channel_list: + if i not in self.base_channel: + print(f"{i} 不存在于常见通道名中") + print(f"常见通道名:{self.channel_list}") + + def read_data(self, frequency: int = 100, bcg_frequency: int = 1000): + bcg_path = Path(f"../Data/BCG/{self.sampNo}samp.npy") + ecg_path = Path(f"../Data/ECG/A{str(self.sampNo).rjust(7, '0')}.edf") + + if not bcg_path.exists(): + raise FileNotFoundError(f"{bcg_path} 不存在!") + + if not ecg_path.exists(): + raise FileNotFoundError(f"{ecg_path} 不存在!") + + with pyedflib.EdfReader(str(ecg_path.resolve())) as file: + signal_num = file.signals_in_file + print(f"{self.sampNo} EDF file signal number is {signal_num}") + + signal_label = file.getSignalLabels() + print(f"{self.sampNo} EDF file signal label : {signal_label}") + + self.ecg_start_time = file.getStartdatetime() + + # 根据PSG记录长度生成事件表 + self.ecg_event_label = np.zeros(file.getFileDuration() * self.frequency) + + # 打印PSG信息 + file.file_info_long() + + # sub_index 用于区分两个flow patient + sub_index = 1 + + for i, index in enumerate(signal_label): + # 仅加载选中的通道 + if index in self.channel_list: + # 重命名flow patient通道 + if index == 'Flow Patient': + index = index + str(sub_index) + sub_index += 1 + + signal = file.readSignal(i) + sample_frequency = file.getSampleFrequency(i) + # 读取采样率 进行重采样 + if sample_frequency < frequency: + signal = signal.repeat(frequency / sample_frequency) + elif sample_frequency > frequency: + signal = signal[::int(sample_frequency / frequency)] + self.signal_select[index] = signal + + # 加载心晓信号 + signal = np.load(bcg_path) + preprocessing = BCG_Operation(sample_rate=bcg_frequency) + # 20Hz低通去噪 + signal1 = preprocessing.Butterworth(signal, 'lowpass', low_cut=20, order=3) + # 0.7Hz 低通提取呼吸 + signal2 = preprocessing.Butterworth(signal, 'lowpass', low_cut=0.7, order=3) + + # 进行降采样 + signal1 = signal1[::int(bcg_frequency / frequency)] + signal2 = signal2[::int(bcg_frequency / frequency)] + + # 根据心晓事件长度生成事件记录 + self.bcg_event_label = np.zeros(len(signal)) + self.signal_select['xin_xiao'] = signal1 + self.signal_select['xin_xiao_respire'] = signal2 + + def read_event(self): + bcg_label_path = Path(f"../Data/BCG_label/{self.sampNo}_label_all.csv") + ecg_label_path = Path(f"../Data/ECG_label/export{self.sampNo}.csv") + + if not bcg_label_path.exists(): + raise FileNotFoundError(f"{bcg_label_path} 不存在!") + + if not ecg_label_path.exists(): + raise FileNotFoundError(f"{ecg_label_path} 不存在!") + + df = pd.read_csv(ecg_label_path, encoding='gbk') + self.ecg_event_label_df = df + + # 过滤不关注的事件 + df = df[df["Event type"].isin(self.focus_event_list)] + # 根据epoch进行排列方便索引 + df = df.sort_values(by='Epoch') + self.ecg_event_label_filtered_df = df + + for one_data in df.index: + one_data = df.loc[one_data] + + # 通过开始事件推算事件起始点与结束点 + event_start_time = datetime.strptime(one_data["Date"] + " " + one_data["Time"], '%Y/%m/%d %H:%M:%S') + SP = (event_start_time - self.ecg_start_time).seconds + # 对括号进行切分,避免Duration 20 (20) 这种带括号的问题 + EP = int(SP + float(one_data["Duration"].split("(")[0])) + SP *= self.frequency + EP *= self.frequency + + # 对事件重新编码并存到事件记录表中 + if one_data["Event type"] == "Hypopnea": + self.ecg_event_label[SP:EP] = 1 + elif one_data["Event type"] == "Central apnea": + self.ecg_event_label[SP:EP] = 2 + elif one_data["Event type"] == "Obstructive apnea": + self.ecg_event_label[SP:EP] = 3 + elif one_data["Event type"] == "Mixed apnea": + self.ecg_event_label[SP:EP] = 4 + + # 读取心晓事件 + df = pd.read_csv(bcg_label_path, encoding='gbk') + df["new_start"] = df["new_start"].astype("int") + df["new_end"] = df["new_end"].astype("int") + self.bcg_event_label_df = df + + # 过滤不关注事件 + df = df[df["Event type"].isin(self.focus_event_list)] + df = df.sort_values(by='Epoch') + self.bcg_event_label_filtered_df = df + + for one_data in df.index: + one_data = df.loc[one_data] + SP = one_data["new_start"] * self.frequency + EP = one_data["new_end"] * self.frequency + + if one_data["Event type"] == "Hypopnea": + self.bcg_event_label[SP:EP] = 1 + elif one_data["Event type"] == "Central apnea": + self.bcg_event_label[SP:EP] = 2 + elif one_data["Event type"] == "Obstructive apnea": + self.bcg_event_label[SP:EP] = 3 + elif one_data["Event type"] == "Mixed apnea": + self.bcg_event_label[SP:EP] = 4 + + # assert len(self.ecg_event_label_filtered_df) == len(self.bcg_event_label_filtered_df), \ + # f"PSG与心晓事件数量不一致, PSG事件数量{len(self.ecg_event_label_filtered_df)}, + # 心晓事件数量{len(self.bcg_event_label_filtered_df)}" + + def show_one_event(self, bcg_index: int, ecg_index: int, front_add_second: int = 60, + back_add_second: int = 60, main_SA_visual: int = 1): + """ + :param bcg_index: 心晓事件在csv中行号 + :param ecg_index: PSG事件在csv中序号 + :param front_add_second: 向前延伸时间 + :param back_add_second: 向后延伸时间 + :param main_SA_visual: 1:仅当前事件上色 0:不上色 2:所有事件上色 + :return: + """ + one_bcg_data = self.bcg_event_label_df.loc[bcg_index] + one_ecg_data = self.ecg_event_label_df.loc[ecg_index] + + # 获取ECG事件开始与结束时间 + event_start_time = datetime.strptime(one_ecg_data["Date"] + " " + one_ecg_data["Time"], '%Y/%m/%d %H:%M:%S') + ecg_SP = (event_start_time - self.ecg_start_time).seconds + ecg_duration = int(float(str(one_ecg_data["Duration"]).split("(")[0]) + 0.5) + ecg_EP = ecg_SP + ecg_duration + + # 获取BCG事件开始与结束时间 + bcg_SP = one_bcg_data["new_start"] + bcg_EP = one_bcg_data["new_end"] + bcg_duration = int(float(str(one_bcg_data["Duration"]).split("(")[0])) + print(ecg_SP, ecg_EP, bcg_SP, bcg_EP) + + # 进行向两边延展 + ecg_SP = ecg_SP - front_add_second + ecg_EP = ecg_EP + back_add_second + bcg_SP = bcg_SP - front_add_second - (ecg_duration - bcg_duration) // 2 + bcg_EP = bcg_EP + back_add_second + (ecg_duration - bcg_duration) // 2 + + # 绘图 + plt.figure(figsize=(12, 6), dpi=150) + gs = gridspec.GridSpec(7, 1, height_ratios=[1, 1, 1, 3, 1, 1, 1]) + plt.subplots_adjust(top=1, bottom=0, right=1, left=0, hspace=0, wspace=0) + plt.margins(0, 0) + plt.tight_layout() + + plt.subplot(gs[0]) + # ['Effort Tho', 'Effort Abd', 'SpO2', 'Flow Patient', 'Flow Patient'] + plt.plot(np.linspace(ecg_SP, ecg_EP, (ecg_EP - ecg_SP) * self.frequency), + self.signal_select["Effort Tho"][ecg_SP * self.frequency:ecg_EP * self.frequency], label="Effort Tho") + # 进行事件颜色标注 + for j in range(1, 5): + mask = self.ecg_event_label[ecg_SP * self.frequency:ecg_EP * self.frequency] == j + y = (self.signal_select["Effort Tho"][ecg_SP * self.frequency:ecg_EP * self.frequency] * mask).astype( + 'float') + np.place(y, y == 0, np.nan) + plt.plot(np.linspace(ecg_SP, ecg_EP, (ecg_EP - ecg_SP) * self.frequency), y, color=self.color_cycle[j]) + # 显示图注 + plt.legend(loc=1) + + # 隐藏部分边框 + ax = plt.gca() + ax.spines["top"].set_visible(False) + ax.spines["right"].set_visible(False) + ax.spines["bottom"].set_visible(False) + # 去掉x轴 + plt.xticks([]) + + plt.subplot(gs[1]) + # ['Effort Tho', 'Effort Abd', 'SpO2', 'Flow Patient', 'Flow Patient'] + plt.plot(np.linspace(ecg_SP, ecg_EP, (ecg_EP - ecg_SP) * self.frequency), + self.signal_select["Effort Abd"][ecg_SP * self.frequency:ecg_EP * self.frequency], label="Effort Abd") + for j in range(1, 5): + mask = self.ecg_event_label[ecg_SP * self.frequency:ecg_EP * self.frequency] == j + y = (self.signal_select["Effort Abd"][ecg_SP * self.frequency:ecg_EP * self.frequency] * mask).astype( + 'float') + np.place(y, y == 0, np.nan) + plt.plot(np.linspace(ecg_SP, ecg_EP, (ecg_EP - ecg_SP) * self.frequency), y, color=self.color_cycle[j]) + + plt.title(f"sampNo:{self.sampNo} Epoch:{one_ecg_data['Epoch']} Duration:{one_ecg_data['Duration']}") + plt.legend(loc=1) + ax = plt.gca() + ax.spines["top"].set_visible(False) + ax.spines["right"].set_visible(False) + ax.spines["bottom"].set_visible(False) + plt.xticks([]) + + plt.subplot(gs[2]) + # ['Effort Tho', 'Effort Abd', 'SpO2', 'Flow Patient', 'Flow Patient'] + plt.plot(np.linspace(bcg_SP, bcg_EP, (bcg_EP - bcg_SP) * self.frequency), + self.signal_select["xin_xiao_respire"][bcg_SP * self.frequency:bcg_EP * self.frequency], label="心晓 呼吸") + + min_bcg = self.signal_select["xin_xiao_respire"][bcg_SP * self.frequency:bcg_EP * self.frequency].min() + len_bcg = bcg_EP * self.frequency - bcg_SP * self.frequency + for j in range(1, 5): + mask = self.bcg_event_label[bcg_SP * self.frequency:bcg_EP * self.frequency] == j + y = (min_bcg.repeat(len_bcg) * mask).astype('float') + np.place(y, y == 0, np.nan) + plt.plot(np.linspace(bcg_SP, bcg_EP, (bcg_EP - bcg_SP) * self.frequency), y, color=self.color_cycle[j]) + # plt.title(f"sampNo:{self.sampNo} Epoch:{one_bcg_data['Epoch']} Duration:{one_bcg_data['Duration']}") + plt.legend(loc=1) + ax = plt.gca() + ax.spines["top"].set_visible(False) + ax.spines["right"].set_visible(False) + ax.spines["bottom"].set_visible(False) + plt.xticks([]) + + plt.subplot(gs[3]) + # ['Effort Tho', 'Effort Abd', 'SpO2', 'Flow Patient', 'Flow Patient'] + plt.plot(np.linspace(bcg_SP, bcg_EP, (bcg_EP - bcg_SP) * self.frequency), + self.signal_select["xin_xiao"][bcg_SP * self.frequency:bcg_EP * self.frequency], label="心晓") + + min_bcg = self.signal_select["xin_xiao"][bcg_SP * self.frequency:bcg_EP * self.frequency].min() + len_bcg = bcg_EP * self.frequency - bcg_SP * self.frequency + for j in range(1, 5): + mask = self.bcg_event_label[bcg_SP * self.frequency:bcg_EP * self.frequency] == j + y = (min_bcg.repeat(len_bcg) * mask).astype('float') + np.place(y, y == 0, np.nan) + plt.plot(np.linspace(bcg_SP, bcg_EP, (bcg_EP - bcg_SP) * self.frequency), y, color=self.color_cycle[j]) + plt.title(f"sampNo:{self.sampNo} Epoch:{one_bcg_data['Epoch']} Duration:{one_bcg_data['Duration']}") + plt.legend(loc=1) + ax = plt.gca() + ax.spines["top"].set_visible(False) + ax.spines["right"].set_visible(False) + ax.spines["bottom"].set_visible(False) + plt.xticks([]) # 去掉x轴 + + plt.subplot(gs[4]) + # ['Effort Tho', 'Effort Abd', 'SpO2', 'Flow Patient', 'Flow Patient'] + plt.plot(np.linspace(ecg_SP, ecg_EP, (ecg_EP - ecg_SP) * self.frequency), + self.signal_select["Flow Patient1"][ecg_SP * self.frequency:ecg_EP * self.frequency], + label="Flow Patient1") + + for j in range(1, 5): + mask = self.ecg_event_label[ecg_SP * self.frequency:ecg_EP * self.frequency] == j + y = (self.signal_select["Flow Patient1"][ecg_SP * self.frequency:ecg_EP * self.frequency] * mask).astype( + 'float') + np.place(y, y == 0, np.nan) + plt.plot(np.linspace(ecg_SP, ecg_EP, (ecg_EP - ecg_SP) * self.frequency), y, color=self.color_cycle[j]) + plt.legend(loc=1) + ax = plt.gca() + ax.spines["top"].set_visible(False) + ax.spines["right"].set_visible(False) + ax.spines["bottom"].set_visible(False) + plt.xticks([]) # 去掉x轴 + + plt.subplot(gs[5]) + # ['Effort Tho', 'Effort Abd', 'SpO2', 'Flow Patient', 'Flow Patient'] + plt.plot(np.linspace(ecg_SP, ecg_EP, (ecg_EP - ecg_SP) * self.frequency), + self.signal_select["Flow Patient2"][ecg_SP * self.frequency:ecg_EP * self.frequency], + label="Flow Patient2") + + for j in range(1, 5): + mask = self.ecg_event_label[ecg_SP * self.frequency:ecg_EP * self.frequency] == j + y = (self.signal_select["Flow Patient2"][ecg_SP * self.frequency:ecg_EP * self.frequency] * mask).astype( + 'float') + np.place(y, y == 0, np.nan) + plt.plot(np.linspace(ecg_SP, ecg_EP, (ecg_EP - ecg_SP) * self.frequency), y, color=self.color_cycle[j]) + plt.legend(loc=1) + ax = plt.gca() + ax.spines["top"].set_visible(False) + ax.spines["right"].set_visible(False) + ax.spines["bottom"].set_visible(False) + plt.xticks([]) + + plt.subplot(gs[6]) + # ['Effort Tho', 'Effort Abd', 'SpO2', 'Flow Patient', 'Flow Patient'] + plt.plot(np.linspace(ecg_SP, ecg_EP, (ecg_EP - ecg_SP) * self.frequency), + self.signal_select["SpO2"][ecg_SP * self.frequency:ecg_EP * self.frequency], label="SpO2") + plt.legend(loc=1) + ax = plt.gca() + ax.spines["top"].set_visible(False) + ax.spines["right"].set_visible(False) + ax.spines["bottom"].set_visible(False) + plt.xticks([]) + + plt.show() + + def show_all_event(self, start_index: int = 0, shifting: int = 0, front_add_second: int = 60, + back_add_second: int = 60, main_SA_visual: int = 1): + + for index in range(start_index, len(self.bcg_event_label_filtered_df)): + self.show_one_event(self.bcg_event_label_filtered_df.index[index], + self.ecg_event_label_filtered_df.index[index + shifting], + front_add_second=front_add_second, + back_add_second=back_add_second, + main_SA_visual=main_SA_visual + ) + + def get_fft(self): + pass + + +if __name__ == '__main__': + prepareData = Prepare_Data(670) + prepareData.show_all_event() diff --git a/utils/Preprocessing.py b/utils/Preprocessing.py new file mode 100644 index 0000000..2085357 --- /dev/null +++ b/utils/Preprocessing.py @@ -0,0 +1,179 @@ +# encoding:utf-8 + +""" +@ date: 2020-09-16 +@ author: jingxian +@ illustration: Pre-processing +""" + +import sys +import numpy as np +import pandas as pd +import matplotlib.pyplot as plt + +import pywt +from scipy import signal +from scipy import fftpack + + +def Dilate(x, N, g, M): + returndata = np.array([]) + for num in range(N - M + 1): + returndata = np.append(returndata, np.min(np.array(x[num:num + M]) - np.array(g))) + return returndata + + +def Eorde(x, N, g, M): + returndata = np.array([]) + for num in range(N - M + 1): + returndata = np.append(returndata, np.max(np.array(x[num:num + M]) - np.array(g))) + return returndata + + +def fin_turn(data, peak): + if len(data) == 0 or len(peak) == 0: return peak + return_peak = [] + for p in peak: + minx, maxx = max(0, p - 100), min(len(data), p + 100) + return_peak.append(minx + np.argmax(data[minx: maxx])) + return return_peak + + +class BCG_Operation(): + def __init__(self, sample_rate=1000): + self.sample_rate = sample_rate + + def down_sample(self, data=None, down_radio=10): + if data is None: + raise ValueError("data is None, please given an real value!") + data = data[:len(data) // down_radio * down_radio].reshape(-1, down_radio)[:, 0] + self.sample_rate = self.sample_rate / down_radio + return data + + def Splitwin(self, data=None, len_win=None, coverage=1.0, calculate_to_end=False): + """ + 鍒嗙獥 + :param len_win: length of window + :return: signal windows + """ + if (len_win is None) or (data is None): + raise ValueError("length of window or data is None, please given an real value!") + else: + length = len_win * self.sample_rate # number point of a window + # step of split windows + step = length * coverage + start = 0 + Splitdata = [] + while (len(data) - start >= length): + Splitdata.append(data[int(start):int(start + length)]) + start += step + if calculate_to_end and (len(data) - start > 2000): + remain = len(data) - start + start = start - step + step = int(remain / 2000) + start = start + step * 2000 + Splitdata.append(data[int(start):int(start + length)]) + return np.array(Splitdata), step + elif calculate_to_end: + return np.array(Splitdata), 0 + else: + return np.array(Splitdata) + + def Butterworth(self, data, type, low_cut=0.0, high_cut=0.0, order=10): + """ + :param type: Type of Butter. filter, lowpass, bandpass, ... + :param lowcut: Low cutoff frequency + :param highcut: High cutoff frequency + :param order: Order of filter + :return: Signal after filtering + """ + if type == "lowpass": # 浣庨氭护娉㈠鐞 + b, a = signal.butter(order, low_cut / (self.sample_rate * 0.5), btype='lowpass') + return signal.filtfilt(b, a, np.array(data)) + elif type == "bandpass": # 甯﹂氭护娉㈠鐞 + low = low_cut / (self.sample_rate * 0.5) + high = high_cut / (self.sample_rate * 0.5) + b, a = signal.butter(order, [low, high], btype='bandpass') + return signal.filtfilt(b, a, np.array(data)) + elif type == "highpass": # 楂橀氭护娉㈠鐞 + b, a = signal.butter(order, high_cut / (self.sample_rate * 0.5), btype='highpass') + return signal.filtfilt(b, a, np.array(data)) + else: # 璀﹀憡,婊ゆ尝鍣ㄧ被鍨嬪繀椤绘湁 + raise ValueError("Please choose a type of fliter") + + def MorphologicalFilter(self, data=None, M=200, get_bre=False): + """ + :param data: Input signal + :param M: Length of structural element + :return: Signal after filter + """ + if not data.any(): + raise ValueError("The input data is None, please given real value data") + g = np.ones(M) + Data_pre = np.insert(data, 0, np.zeros(M)) + Data_pre = np.insert(Data_pre, -1, np.zeros(M)) + # Opening: 鑵愯殌 + 鑶ㄨ儉 + out1 = Eorde(Data_pre, len(Data_pre), g, M) + out2 = Dilate(out1, len(out1), g, M) + out2 = np.insert(out2, 0, np.zeros(M - 2)) + # Closing: 鑶ㄨ儉 + 鑵愯殌 + out5 = Dilate(Data_pre, len(Data_pre), g, M) + out6 = Eorde(out5, len(out5), g, M) + out6 = np.insert(out6, 0, np.zeros(M - 2)) + + baseline = (out2 + out6) / 2 + # -------------------------淇濈暀鍓╀綑浠峰------------------------ + data_filtered = Data_pre[:len(baseline)] - baseline + data_filtered = data_filtered[M: M + len(data)] + baseline = baseline[M:] + data_filtered[-1] = data_filtered[-2] = data_filtered[-3] + baseline[-1] = baseline[-2] = baseline[-3] + if get_bre: + return data_filtered, baseline + else: + return data_filtered + + def Iirnotch(self, data=None, cut_fre=50, quality=3): + """闄锋尝鍣""" + b, a = signal.iirnotch(cut_fre / (self.sample_rate * 0.5), quality) + return signal.filtfilt(b, a, np.array(data)) + + def ChebyFilter(self, data, rp=1, type=None, low_cut=0, high_cut=0, order=10): + """ + 鍒囨瘮闆か婊ゆ尝鍣 + :param data: Input signal + :param rp: The maximum ripple allowed + :param type: 'lowpass', 'bandpass, 'highpass' + :param low_cut: Low cut-off fre + :param high_cut: High cut-off fre + :param order: The order of filter + :return: Signal after filter + """ + if type == 'lowpass': + b, a = signal.cheby1(order, rp, low_cut, btype='lowpass', fs=self.sample_rate) + return signal.filtfilt(b, a, np.array(data)) + elif type == 'bandpass': + b, a = signal.cheby1(order, rp, [low_cut, high_cut], btype='bandpass', fs=self.sample_rate) + return signal.filtfilt(b, a, np.array(data)) + elif type == 'highpass': + b, a = signal.cheby1(order, rp, high_cut, btype='highpass', fs=self.sample_rate) + return signal.filtfilt(b, a, np.array(data)) + else: + raise ValueError("The type of filter is None, please given the real value!") + + def Envelope(self, data): + """鍙栦俊鍙峰寘缁""" + if len(data) <= 1: raise ValueError("Wrong input data") + hx = fftpack.hilbert(data) + return np.sqrt(hx ** 2, data ** 2) + + def wavelet_trans(self, data, c_level=['aaa', 'aad'], wavelet='db4', mode='symmetric', maxlevel=10): + wp = pywt.WaveletPacket(data=data, wavelet=wavelet, mode=mode, maxlevel=maxlevel) + new_wp = pywt.WaveletPacket(data=None, wavelet=wavelet, mode=mode) + for c in c_level: + new_wp[c] = wp[c] + return new_wp.reconstruct() + + # def em_decomposition(self, data): + # from pyhht.emd import EMD + # return EMD(data).decompose()