
272 lines
11 KiB
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# encoding:utf-8
import os
import numpy as np
import pandas as pd
import warnings
import matplotlib.pyplot as plt
import matplotlib.colors as colors
from scipy import signal
from glob import glob
from torch.utils.data import Dataset, DataLoader, TensorDataset
class BCGDataset(Dataset):
def __init__(self, train=True):
if train:
self.data = np.array(pd.read_csv("./in_data/train.txt").iloc[:,np.arange(1000)])
self.label = np.array(pd.read_csv("./in_data/train.txt").iloc[:,np.arange(1000,2000)])
self.data = np.array(pd.read_csv("./in_data/test.txt").iloc[:, np.arange(1000)])
self.label = np.array(pd.read_csv("./in_data/test.txt").iloc[:, np.arange(1000, 2000)])
def __getitem__(self, index):
return self.data[index], self.label[index]
def __len__(self):
return len(self.label)
class BCG_Operation():
def __init__(self, sample_rate=1000):
self.sample_rate = sample_rate
def down_sample(self,data=None, down_radio=10):
if data is None:
raise ValueError("data is None, please given an real value!")
length_before = len(data)
length_after = length_before//down_radio
data = data[:length_after*down_radio]
data = data.reshape(-1,down_radio)
data = data[:,0]
self.sample_rate = self.sample_rate/down_radio
return data
def Splitwin(self, data=None, len_win=None, coverage=1.0,calculate_to_end=False):
:param len_win: length of window
:return: signal windows
if ( len_win is None) or (data is None):
raise ValueError("length of window or data is None, please given an real value!")
length = len_win * self.sample_rate # number point of a window
# step of split windows
step = length*coverage
start = 0
Splitdata = []
while (len(data)-start>=length):
Splitdata.append( data[int(start):int(start+length)] )
start += step
if calculate_to_end and (len(data)-start>2000):
remain = len(data)-start
start = start - step
step = int(remain/2000)
start = start + step*2000
return np.array(Splitdata), step
elif calculate_to_end :
return np.array(Splitdata), 0
return np.array(Splitdata)
def Butterworth(self,data, type, low_cut = 0.0, high_cut = 0.0, order = 10):
:param type: Type of Butter. filter, lowpass, bandpass, ...
:param lowcut: Low cutoff frequency
:param highcut: High cutoff frequency
:param order: Order of filter
:return: Signal after filtering
if type == "lowpass": # 低通滤波处理
b, a = signal.butter(order, low_cut / (self.sample_rate * 0.5), btype='lowpass')
return signal.filtfilt(b, a, np.array(data))
elif type == "bandpass": # 带通滤波处理
low = low_cut / (self.sample_rate * 0.5)
high = high_cut / (self.sample_rate * 0.5)
b, a = signal.butter(order, [low, high], btype='bandpass')
return signal.filtfilt(b, a, np.array(data))
elif type == "highpass": # 高通滤波处理
b, a = signal.butter(order, high_cut / (self.sample_rate * 0.5), btype='highpass')
return signal.filtfilt(b, a, np.array(data))
else: # 警告,滤波器类型必须有
raise ValueError("Please choose a type of fliter")
def AmpMovement(self, data, win_size, threshold=20, get_judge_line=False):
:param data: Input signal
:param win_size: Size of the win(Must be a multiple of 2)
:return: State of signal
Dataframe, cover_num = self.Splitwin(data, len_win=win_size, coverage=1.0, calculate_to_end=True)
state_all = np.array([])
Amp_list = np.array([])
for win in range(Dataframe.shape[0]):
state = np.array([])
# two seconds window
data_win = self.Splitwin(Dataframe[win], len_win=2, coverage=1.0)
Amp = np.zeros(data_win.shape[0])
for i in range(data_win.shape[0]):
Amp[i] = np.max(data_win[i]) - np.min(data_win[i]) # max - min
# 取..位数
Median_Amp = np.percentile(Amp, 20) # 20%
if get_judge_line:
Amp_list = np.append(Amp_list, np.full(win_size * self.sample_rate, 2.3 * Median_Amp))
for i in range(len(Amp)):
if (Amp[i] > 2.1 * Median_Amp):
state = np.append(state, "Movement")
elif Amp[i] < threshold:
state = np.append(state, "Nobody")
state = np.append(state, "Sleep")
if win == Dataframe.shape[0] - 1 and cover_num > 0:
state = state[-int(cover_num):]
state_all = np.append(state_all, state)
if get_judge_line:
return state_all, Amp_list
return state_all
def preprocess1(self):
# ----------------------------------------------------------
data_dir = "../in_data/"
dir_list = os.listdir(data_dir)
data_list = [data_dir + dir + "/orgData.txt" for dir in dir_list]
label_list = [data_dir + dir + "/label.txt" for dir in dir_list]
for i in range(len(data_list)):
orgBCG = np.array(pd.read_csv(data_list[i], header=None)).reshape(-1)
orgLabel = np.array(pd.read_csv(label_list[i])).reshape(-1)
# ---------------------Movement Detection-------------------------
operation = BCG_Operation()
BCG = operation.Butterworth(data=orgBCG, type="bandpass", low_cut=2.5, high_cut=10, order=2)
state_win60 = operation.AmpMovement(orgBCG, win_size=60)
visual_state = np.array([])
for num in range(state_win60.shape[0]):
print("state_num/all_state: ", num, '/', state_win60.shape[0])
if state_win60[num] == "Movement":
visual_state = np.append(visual_state, np.full(2000, 1))
visual_state = np.append(visual_state, np.full(2000, 0))
# ------------------------------------------------------------------
downBCG = operation.down_sample(data=orgBCG, down_radio=10)
downLabel = operation.down_sample(data=orgLabel, down_radio=10)
downState = operation.down_sample(data=visual_state, down_radio=10)
length_before = len(downState)
length_after = length_before // 1000
downBCG = downBCG[:length_after * 1000]
downLabel = downLabel[:length_after * 1000]
downState = downState[:length_after * 1000]
downBCG = downBCG.reshape(-1, 1000)
downLabel = downLabel.reshape(-1, 1000)
downState = downState.reshape(-1, 1000)
downState = np.max(downState, axis=1)
df_BCG = pd.DataFrame(downBCG)
df_label = pd.DataFrame(downLabel)
df_state = pd.DataFrame(downState, columns=["state"])
df_all = pd.concat([df_BCG, df_label, df_state], axis=1)
df_all.to_csv(data_dir + "/data" + str(i + 1) + ".txt", index=False)
def read_all_data(data_dir):
df_all = pd.read_csv(data_dir)
df_clean = df_all[ df_all["state"]==0.0 ]
df_artifact = df_all[ df_all["state"]==1.0 ]
data_clean = df_clean.iloc[:,np.arange(1000)]
label_clean = df_clean.iloc[:,np.arange(1000,2000)]
data_artifact = df_artifact.iloc[:,np.arange(1000)]
label_artifact = df_artifact.iloc[:,np.arange(1000,2000)]
return np.array(data_clean),np.array(label_clean),np.array(data_artifact),np.array(label_artifact)
#orgBCG = np.array(pd.read_csv("../in_data/data1zuo/orgData.txt", header=None)).reshape(-1)
#orgLabel = np.array(pd.read_csv("../in_data/data1zuo/label.txt")).reshape(-1)
## ---------------------Movement Detection-------------------------
#operation = BCG_Operation()
#BCG = operation.Butterworth(data=orgBCG, type="bandpass", low_cut=2.5, high_cut=10, order=2)
#state_win60 = operation.AmpMovement(orgBCG, win_size=60)
#visual_state = np.array([])
#for num in range(state_win60.shape[0]):
# print("state_num/all_state: ", num, '/', state_win60.shape[0])
# if state_win60[num] == "Movement":
# visual_state = np.append(visual_state, np.full(2000, 1))
# else:
# visual_state = np.append(visual_state, np.full(2000, 0))
## ------------------------------------------------------------------
#downBCG = operation.down_sample(data=orgBCG, down_radio=10)
#downLabel = operation.down_sample(data=orgLabel, down_radio=10)
#downState = operation.down_sample(data=visual_state, down_radio=10)
#length_before = len(downState)
#length_after = length_before // 1000
#downBCG = downBCG[:length_after * 1000]
#downLabel = downLabel[:length_after * 1000]
#downState = downState[:length_after * 1000]
#downBCG = downBCG.reshape(-1, 1000)
#downLabel = downLabel.reshape(-1, 1000)
#downState = downState.reshape(-1, 1000)
#downState = np.max(downState, axis=1)
#df_BCG = pd.DataFrame(downBCG)
#df_label = pd.DataFrame(downLabel)
#df_state = pd.DataFrame(downState, columns=["state"])
#df_all = pd.concat([df_BCG, df_label, df_state], axis=1)
#df_all.to_csv("../in_data/data1zuo.txt", index=False)
#data_dir = glob("../in_data/*.txt")
#for num in range(len(data_dir)):
# if num==0 :
# all_data = pd.read_csv(data_dir[num])
# else:
# all_data = pd.concat([all_data,pd.read_csv(data_dir[num])],ignore_index=True,axis=0)
#data = pd.read_csv("../in_data/all_data.txt")
#clean_data = data[data["state"]==0]
#Movement_data = data[data["state"]==1]
## -------------------- 划分训练集和测试集73 ----------------------------
#sample = clean_data.sample(int(0.3*len(clean_data)))
#sample_index = sample.index
## 剩余数据
#all_index = clean_data.index
## 去除sample之后剩余的数据
#residue_index = all_index.difference(sample_index)
#residue = clean_data.loc[residue_index]
## 保存
#test = pd.concat([sample,Movement_data],ignore_index=True)