# encoding:utf-8 import os import numpy as np import pandas as pd import warnings import matplotlib.pyplot as plt import matplotlib.colors as colors from scipy import signal from glob import glob from torch.utils.data import Dataset, DataLoader, TensorDataset warnings.filterwarnings("ignore") class BCGDataset(Dataset): def __init__(self, train=True): if train: self.data = np.array(pd.read_csv("./in_data/train.txt").iloc[:,np.arange(1000)]) self.label = np.array(pd.read_csv("./in_data/train.txt").iloc[:,np.arange(1000,2000)]) else: self.data = np.array(pd.read_csv("./in_data/test.txt").iloc[:, np.arange(1000)]) self.label = np.array(pd.read_csv("./in_data/test.txt").iloc[:, np.arange(1000, 2000)]) def __getitem__(self, index): return self.data[index], self.label[index] def __len__(self): return len(self.label) class BCG_Operation(): def __init__(self, sample_rate=1000): self.sample_rate = sample_rate def down_sample(self,data=None, down_radio=10): if data is None: raise ValueError("data is None, please given an real value!") length_before = len(data) length_after = length_before//down_radio data = data[:length_after*down_radio] data = data.reshape(-1,down_radio) data = data[:,0] self.sample_rate = self.sample_rate/down_radio return data def Splitwin(self, data=None, len_win=None, coverage=1.0,calculate_to_end=False): """ 分窗 :param len_win: length of window :return: signal windows """ if ( len_win is None) or (data is None): raise ValueError("length of window or data is None, please given an real value!") else: length = len_win * self.sample_rate # number point of a window # step of split windows step = length*coverage start = 0 Splitdata = [] while (len(data)-start>=length): Splitdata.append( data[int(start):int(start+length)] ) start += step if calculate_to_end and (len(data)-start>2000): remain = len(data)-start start = start - step step = int(remain/2000) start = start + step*2000 Splitdata.append(data[int(start):int(start+length)]) return np.array(Splitdata), step elif calculate_to_end : return np.array(Splitdata), 0 else: return np.array(Splitdata) def Butterworth(self,data, type, low_cut = 0.0, high_cut = 0.0, order = 10): """ :param type: Type of Butter. filter, lowpass, bandpass, ... :param lowcut: Low cutoff frequency :param highcut: High cutoff frequency :param order: Order of filter :return: Signal after filtering """ if type == "lowpass": # 低通滤波处理 b, a = signal.butter(order, low_cut / (self.sample_rate * 0.5), btype='lowpass') return signal.filtfilt(b, a, np.array(data)) elif type == "bandpass": # 带通滤波处理 low = low_cut / (self.sample_rate * 0.5) high = high_cut / (self.sample_rate * 0.5) b, a = signal.butter(order, [low, high], btype='bandpass') return signal.filtfilt(b, a, np.array(data)) elif type == "highpass": # 高通滤波处理 b, a = signal.butter(order, high_cut / (self.sample_rate * 0.5), btype='highpass') return signal.filtfilt(b, a, np.array(data)) else: # 警告,滤波器类型必须有 raise ValueError("Please choose a type of fliter") def AmpMovement(self, data, win_size, threshold=20, get_judge_line=False): """ 基于幅值方法检测体动: 1.将输入信号按win_size切分 2.将每个win_size信号片段分窗,每个窗2s,步长为2s 3.计算一分钟所有信号窗的最大峰谷值差,获取中位数和均值 4.所有2s时间窗内,大于中位数/均值的2.2倍视为体动 5.体动间间隔过短的信号,同样标记为体动 :param data: Input signal :param win_size: Size of the win(Must be a multiple of 2) :return: State of signal """ Dataframe, cover_num = self.Splitwin(data, len_win=win_size, coverage=1.0, calculate_to_end=True) state_all = np.array([]) Amp_list = np.array([]) for win in range(Dataframe.shape[0]): state = np.array([]) # two seconds window data_win = self.Splitwin(Dataframe[win], len_win=2, coverage=1.0) Amp = np.zeros(data_win.shape[0]) for i in range(data_win.shape[0]): Amp[i] = np.max(data_win[i]) - np.min(data_win[i]) # max - min # 取..位数 Median_Amp = np.percentile(Amp, 20) # 20% if get_judge_line: Amp_list = np.append(Amp_list, np.full(win_size * self.sample_rate, 2.3 * Median_Amp)) for i in range(len(Amp)): if (Amp[i] > 2.1 * Median_Amp): state = np.append(state, "Movement") elif Amp[i] < threshold: state = np.append(state, "Nobody") else: state = np.append(state, "Sleep") if win == Dataframe.shape[0] - 1 and cover_num > 0: state = state[-int(cover_num):] state_all = np.append(state_all, state) if get_judge_line: return state_all, Amp_list else: return state_all def preprocess1(self): # ---------------------------------------------------------- data_dir = "../in_data/" dir_list = os.listdir(data_dir) data_list = [data_dir + dir + "/orgData.txt" for dir in dir_list] label_list = [data_dir + dir + "/label.txt" for dir in dir_list] print(data_list) print(label_list) for i in range(len(data_list)): orgBCG = np.array(pd.read_csv(data_list[i], header=None)).reshape(-1) orgLabel = np.array(pd.read_csv(label_list[i])).reshape(-1) # ---------------------Movement Detection------------------------- operation = BCG_Operation() BCG = operation.Butterworth(data=orgBCG, type="bandpass", low_cut=2.5, high_cut=10, order=2) state_win60 = operation.AmpMovement(orgBCG, win_size=60) visual_state = np.array([]) for num in range(state_win60.shape[0]): print("state_num/all_state: ", num, '/', state_win60.shape[0]) if state_win60[num] == "Movement": visual_state = np.append(visual_state, np.full(2000, 1)) else: visual_state = np.append(visual_state, np.full(2000, 0)) # ------------------------------------------------------------------ downBCG = operation.down_sample(data=orgBCG, down_radio=10) downLabel = operation.down_sample(data=orgLabel, down_radio=10) downState = operation.down_sample(data=visual_state, down_radio=10) length_before = len(downState) length_after = length_before // 1000 downBCG = downBCG[:length_after * 1000] downLabel = downLabel[:length_after * 1000] downState = downState[:length_after * 1000] downBCG = downBCG.reshape(-1, 1000) downLabel = downLabel.reshape(-1, 1000) downState = downState.reshape(-1, 1000) downState = np.max(downState, axis=1) df_BCG = pd.DataFrame(downBCG) df_label = pd.DataFrame(downLabel) df_state = pd.DataFrame(downState, columns=["state"]) df_BCG.to_csv() df_all = pd.concat([df_BCG, df_label, df_state], axis=1) df_all.to_csv(data_dir + "/data" + str(i + 1) + ".txt", index=False) def read_all_data(data_dir): df_all = pd.read_csv(data_dir) df_clean = df_all[ df_all["state"]==0.0 ] df_artifact = df_all[ df_all["state"]==1.0 ] data_clean = df_clean.iloc[:,np.arange(1000)] label_clean = df_clean.iloc[:,np.arange(1000,2000)] data_artifact = df_artifact.iloc[:,np.arange(1000)] label_artifact = df_artifact.iloc[:,np.arange(1000,2000)] return np.array(data_clean),np.array(label_clean),np.array(data_artifact),np.array(label_artifact) #orgBCG = np.array(pd.read_csv("../in_data/data1zuo/orgData.txt", header=None)).reshape(-1) #orgLabel = np.array(pd.read_csv("../in_data/data1zuo/label.txt")).reshape(-1) ## ---------------------Movement Detection------------------------- #operation = BCG_Operation() #BCG = operation.Butterworth(data=orgBCG, type="bandpass", low_cut=2.5, high_cut=10, order=2) #state_win60 = operation.AmpMovement(orgBCG, win_size=60) #visual_state = np.array([]) #for num in range(state_win60.shape[0]): # print("state_num/all_state: ", num, '/', state_win60.shape[0]) # if state_win60[num] == "Movement": # visual_state = np.append(visual_state, np.full(2000, 1)) # else: # visual_state = np.append(visual_state, np.full(2000, 0)) ## ------------------------------------------------------------------ #downBCG = operation.down_sample(data=orgBCG, down_radio=10) #downLabel = operation.down_sample(data=orgLabel, down_radio=10) #downState = operation.down_sample(data=visual_state, down_radio=10) #length_before = len(downState) #length_after = length_before // 1000 #downBCG = downBCG[:length_after * 1000] #downLabel = downLabel[:length_after * 1000] #downState = downState[:length_after * 1000] #downBCG = downBCG.reshape(-1, 1000) #downLabel = downLabel.reshape(-1, 1000) #downState = downState.reshape(-1, 1000) #downState = np.max(downState, axis=1) #df_BCG = pd.DataFrame(downBCG) #df_label = pd.DataFrame(downLabel) #df_state = pd.DataFrame(downState, columns=["state"]) #df_BCG.to_csv() #df_all = pd.concat([df_BCG, df_label, df_state], axis=1) #df_all.to_csv("../in_data/data1zuo.txt", index=False) #data_dir = glob("../in_data/*.txt") #print(data_dir) #for num in range(len(data_dir)): # if num==0 : # all_data = pd.read_csv(data_dir[num]) # else: # all_data = pd.concat([all_data,pd.read_csv(data_dir[num])],ignore_index=True,axis=0) # #all_data.to_csv("../in_data/all_data.txt",index=False) #data = pd.read_csv("../in_data/all_data.txt") #clean_data = data[data["state"]==0] #Movement_data = data[data["state"]==1] #print(data.shape) #print(clean_data.shape) # ## -------------------- 划分训练集和测试集:7:3 ---------------------------- #sample = clean_data.sample(int(0.3*len(clean_data))) #sample_index = sample.index #print(sample.shape) #print(sample_index) ## 剩余数据 #all_index = clean_data.index ## 去除sample之后剩余的数据 #residue_index = all_index.difference(sample_index) #print(residue_index.shape) #print(residue_index) #residue = clean_data.loc[residue_index] ## 保存 #test = pd.concat([sample,Movement_data],ignore_index=True) #test.to_csv("../in_data/test.txt",index=False) #residue.to_csv("../in_data/train.txt",index=False)