Heartbeat_Annotation/BCGDataset/Dataset_operation.py

272 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# encoding:utf-8
import os
import numpy as np
import pandas as pd
import warnings
import matplotlib.pyplot as plt
import matplotlib.colors as colors
from scipy import signal
from glob import glob
from torch.utils.data import Dataset, DataLoader, TensorDataset
warnings.filterwarnings("ignore")
class BCGDataset(Dataset):
def __init__(self, train=True):
if train:
self.data = np.array(pd.read_csv("./in_data/train.txt").iloc[:,np.arange(1000)])
self.label = np.array(pd.read_csv("./in_data/train.txt").iloc[:,np.arange(1000,2000)])
else:
self.data = np.array(pd.read_csv("./in_data/test.txt").iloc[:, np.arange(1000)])
self.label = np.array(pd.read_csv("./in_data/test.txt").iloc[:, np.arange(1000, 2000)])
def __getitem__(self, index):
return self.data[index], self.label[index]
def __len__(self):
return len(self.label)
class BCG_Operation():
def __init__(self, sample_rate=1000):
self.sample_rate = sample_rate
def down_sample(self,data=None, down_radio=10):
if data is None:
raise ValueError("data is None, please given an real value!")
length_before = len(data)
length_after = length_before//down_radio
data = data[:length_after*down_radio]
data = data.reshape(-1,down_radio)
data = data[:,0]
self.sample_rate = self.sample_rate/down_radio
return data
def Splitwin(self, data=None, len_win=None, coverage=1.0,calculate_to_end=False):
"""
分窗
:param len_win: length of window
:return: signal windows
"""
if ( len_win is None) or (data is None):
raise ValueError("length of window or data is None, please given an real value!")
else:
length = len_win * self.sample_rate # number point of a window
# step of split windows
step = length*coverage
start = 0
Splitdata = []
while (len(data)-start>=length):
Splitdata.append( data[int(start):int(start+length)] )
start += step
if calculate_to_end and (len(data)-start>2000):
remain = len(data)-start
start = start - step
step = int(remain/2000)
start = start + step*2000
Splitdata.append(data[int(start):int(start+length)])
return np.array(Splitdata), step
elif calculate_to_end :
return np.array(Splitdata), 0
else:
return np.array(Splitdata)
def Butterworth(self,data, type, low_cut = 0.0, high_cut = 0.0, order = 10):
"""
:param type: Type of Butter. filter, lowpass, bandpass, ...
:param lowcut: Low cutoff frequency
:param highcut: High cutoff frequency
:param order: Order of filter
:return: Signal after filtering
"""
if type == "lowpass": # 低通滤波处理
b, a = signal.butter(order, low_cut / (self.sample_rate * 0.5), btype='lowpass')
return signal.filtfilt(b, a, np.array(data))
elif type == "bandpass": # 带通滤波处理
low = low_cut / (self.sample_rate * 0.5)
high = high_cut / (self.sample_rate * 0.5)
b, a = signal.butter(order, [low, high], btype='bandpass')
return signal.filtfilt(b, a, np.array(data))
elif type == "highpass": # 高通滤波处理
b, a = signal.butter(order, high_cut / (self.sample_rate * 0.5), btype='highpass')
return signal.filtfilt(b, a, np.array(data))
else: # 警告,滤波器类型必须有
raise ValueError("Please choose a type of fliter")
def AmpMovement(self, data, win_size, threshold=20, get_judge_line=False):
"""
基于幅值方法检测体动:
1.将输入信号按win_size切分
2.将每个win_size信号片段分窗每个窗2s步长为2s
3.计算一分钟所有信号窗的最大峰谷值差,获取中位数和均值
4.所有2s时间窗内大于中位数/均值的2.2倍视为体动
5.体动间间隔过短的信号,同样标记为体动
:param data: Input signal
:param win_size: Size of the win(Must be a multiple of 2)
:return: State of signal
"""
Dataframe, cover_num = self.Splitwin(data, len_win=win_size, coverage=1.0, calculate_to_end=True)
state_all = np.array([])
Amp_list = np.array([])
for win in range(Dataframe.shape[0]):
state = np.array([])
# two seconds window
data_win = self.Splitwin(Dataframe[win], len_win=2, coverage=1.0)
Amp = np.zeros(data_win.shape[0])
for i in range(data_win.shape[0]):
Amp[i] = np.max(data_win[i]) - np.min(data_win[i]) # max - min
# 取..位数
Median_Amp = np.percentile(Amp, 20) # 20%
if get_judge_line:
Amp_list = np.append(Amp_list, np.full(win_size * self.sample_rate, 2.3 * Median_Amp))
for i in range(len(Amp)):
if (Amp[i] > 2.1 * Median_Amp):
state = np.append(state, "Movement")
elif Amp[i] < threshold:
state = np.append(state, "Nobody")
else:
state = np.append(state, "Sleep")
if win == Dataframe.shape[0] - 1 and cover_num > 0:
state = state[-int(cover_num):]
state_all = np.append(state_all, state)
if get_judge_line:
return state_all, Amp_list
else:
return state_all
def preprocess1(self):
# ----------------------------------------------------------
data_dir = "../in_data/"
dir_list = os.listdir(data_dir)
data_list = [data_dir + dir + "/orgData.txt" for dir in dir_list]
label_list = [data_dir + dir + "/label.txt" for dir in dir_list]
print(data_list)
print(label_list)
for i in range(len(data_list)):
orgBCG = np.array(pd.read_csv(data_list[i], header=None)).reshape(-1)
orgLabel = np.array(pd.read_csv(label_list[i])).reshape(-1)
# ---------------------Movement Detection-------------------------
operation = BCG_Operation()
BCG = operation.Butterworth(data=orgBCG, type="bandpass", low_cut=2.5, high_cut=10, order=2)
state_win60 = operation.AmpMovement(orgBCG, win_size=60)
visual_state = np.array([])
for num in range(state_win60.shape[0]):
print("state_num/all_state: ", num, '/', state_win60.shape[0])
if state_win60[num] == "Movement":
visual_state = np.append(visual_state, np.full(2000, 1))
else:
visual_state = np.append(visual_state, np.full(2000, 0))
# ------------------------------------------------------------------
downBCG = operation.down_sample(data=orgBCG, down_radio=10)
downLabel = operation.down_sample(data=orgLabel, down_radio=10)
downState = operation.down_sample(data=visual_state, down_radio=10)
length_before = len(downState)
length_after = length_before // 1000
downBCG = downBCG[:length_after * 1000]
downLabel = downLabel[:length_after * 1000]
downState = downState[:length_after * 1000]
downBCG = downBCG.reshape(-1, 1000)
downLabel = downLabel.reshape(-1, 1000)
downState = downState.reshape(-1, 1000)
downState = np.max(downState, axis=1)
df_BCG = pd.DataFrame(downBCG)
df_label = pd.DataFrame(downLabel)
df_state = pd.DataFrame(downState, columns=["state"])
df_BCG.to_csv()
df_all = pd.concat([df_BCG, df_label, df_state], axis=1)
df_all.to_csv(data_dir + "/data" + str(i + 1) + ".txt", index=False)
def read_all_data(data_dir):
df_all = pd.read_csv(data_dir)
df_clean = df_all[ df_all["state"]==0.0 ]
df_artifact = df_all[ df_all["state"]==1.0 ]
data_clean = df_clean.iloc[:,np.arange(1000)]
label_clean = df_clean.iloc[:,np.arange(1000,2000)]
data_artifact = df_artifact.iloc[:,np.arange(1000)]
label_artifact = df_artifact.iloc[:,np.arange(1000,2000)]
return np.array(data_clean),np.array(label_clean),np.array(data_artifact),np.array(label_artifact)
#orgBCG = np.array(pd.read_csv("../in_data/data1zuo/orgData.txt", header=None)).reshape(-1)
#orgLabel = np.array(pd.read_csv("../in_data/data1zuo/label.txt")).reshape(-1)
## ---------------------Movement Detection-------------------------
#operation = BCG_Operation()
#BCG = operation.Butterworth(data=orgBCG, type="bandpass", low_cut=2.5, high_cut=10, order=2)
#state_win60 = operation.AmpMovement(orgBCG, win_size=60)
#visual_state = np.array([])
#for num in range(state_win60.shape[0]):
# print("state_num/all_state: ", num, '/', state_win60.shape[0])
# if state_win60[num] == "Movement":
# visual_state = np.append(visual_state, np.full(2000, 1))
# else:
# visual_state = np.append(visual_state, np.full(2000, 0))
## ------------------------------------------------------------------
#downBCG = operation.down_sample(data=orgBCG, down_radio=10)
#downLabel = operation.down_sample(data=orgLabel, down_radio=10)
#downState = operation.down_sample(data=visual_state, down_radio=10)
#length_before = len(downState)
#length_after = length_before // 1000
#downBCG = downBCG[:length_after * 1000]
#downLabel = downLabel[:length_after * 1000]
#downState = downState[:length_after * 1000]
#downBCG = downBCG.reshape(-1, 1000)
#downLabel = downLabel.reshape(-1, 1000)
#downState = downState.reshape(-1, 1000)
#downState = np.max(downState, axis=1)
#df_BCG = pd.DataFrame(downBCG)
#df_label = pd.DataFrame(downLabel)
#df_state = pd.DataFrame(downState, columns=["state"])
#df_BCG.to_csv()
#df_all = pd.concat([df_BCG, df_label, df_state], axis=1)
#df_all.to_csv("../in_data/data1zuo.txt", index=False)
#data_dir = glob("../in_data/*.txt")
#print(data_dir)
#for num in range(len(data_dir)):
# if num==0 :
# all_data = pd.read_csv(data_dir[num])
# else:
# all_data = pd.concat([all_data,pd.read_csv(data_dir[num])],ignore_index=True,axis=0)
#
#all_data.to_csv("../in_data/all_data.txt",index=False)
#data = pd.read_csv("../in_data/all_data.txt")
#clean_data = data[data["state"]==0]
#Movement_data = data[data["state"]==1]
#print(data.shape)
#print(clean_data.shape)
#
## -------------------- 划分训练集和测试集73 ----------------------------
#sample = clean_data.sample(int(0.3*len(clean_data)))
#sample_index = sample.index
#print(sample.shape)
#print(sample_index)
## 剩余数据
#all_index = clean_data.index
## 去除sample之后剩余的数据
#residue_index = all_index.difference(sample_index)
#print(residue_index.shape)
#print(residue_index)
#residue = clean_data.loc[residue_index]
## 保存
#test = pd.concat([sample,Movement_data],ignore_index=True)
#test.to_csv("../in_data/test.txt",index=False)
#residue.to_csv("../in_data/train.txt",index=False)