2025-02-21 20:40:04 +08:00

286 lines
10 KiB
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import numpy as np
import torch
import torch.nn.functional as F
from BCGDataset import BCG_Operation
from Deep_Model import Unet,Fivelayer_Lstm_Unet,Fivelayer_Unet,Sixlayer_Unet
modle_dic = {'Fivelayer_Unet':Fivelayer_Unet(),
def evaluate(test_data, model,fs,useCPU):
orgBCG = test_data
operation = BCG_Operation()
# 降采样
orgBCG = operation.down_sample(orgBCG, down_radio=int(fs//100)).copy() #一开始没加.copy()会报错,后来加了就没事了,结果没影响
# plt.figure()
# plt.plot(orgBCG)
# plt.show()
orgBCG = orgBCG.reshape(-1, 1000)
# test dataset
orgData = torch.FloatTensor(orgBCG).unsqueeze(1)
# predict
if useCPU == True:
gpu = False
gpu = torch.cuda.is_available()
if gpu:
orgData = orgData.cuda()
with torch.no_grad():
y_hat = model(orgData)
y_prob = F.sigmoid(y_hat)
beat = (y_prob>0.5).float().view(-1).cpu().data.numpy()
beat_diff = np.diff(beat)
up_index = np.argwhere(beat_diff==1)
down_index = np.argwhere(beat_diff==-1)
return beat,up_index,down_index,y_prob
def find_TPeak(data,peaks,th=50):
:param data: BCG或ECG数据
:param peaks: 初步峰值从label中导出的location_R
:param th: 范围阈值
:return: 真实峰值
return_peak = []
for peak in peaks:
if peak>len(data):continue
min_win,max_win = max(0,int(peak-th)),min(len(data),int(peak+th))
return return_peak
def new_calculate_beat(y,predict,th=0.5,up=10,th1=100,th2=45): #通过预测计算回原来J峰的坐标 输入y_prob,predict=ture,up*10,降采样多少就乘多少
:param y: 预测输出值或者标签值label
:param predict: ture or false
:param up: 降采样为多少就多少
:return: 预测的J峰位置
if predict:
beat = np.where(y>th,1,0)
beat = y
beat_diff = np.diff(beat) #一阶差分
up_index = np.argwhere(beat_diff == 1).reshape(-1)
down_index = np.argwhere(beat_diff == -1).reshape(-1)
# print(up_index,down_index)
# print(y)
# print(y[up_index[4]+1:down_index[4]+1])
if len(up_index)==0:
return [0]
if up_index[0] > down_index[0]:
down_index = np.delete(down_index, 0)
if up_index[-1] > down_index[-1]:
up_index = np.delete(up_index, -1)
加上若大于130点都没有一个心跳时降低阈值重新判决一次一般降到0.3就可以了;; 但是对于体动片段降低阈值可能又会造成误判,而且出现体动的话会被丢弃,间隔时间也长
# print("初始:",up_index.shape,down_index.shape)
i = 0
lenth1 = len(up_index)
while i < len(up_index)-1:
if abs(up_index[i+1]-up_index[i]) > th1:
re_prob = y[down_index[i]+15:up_index[i+1]-15] #原本按正常应该是两个都+1的但是由于Unet输出低于0.6时把阈值调小后会在附近一两个点也变为1会影响判断
# print(re_prob.shape)
beat1 = np.where(re_prob > 0.1, 1, 0)
# print(beat1)
if sum(beat1) != 0 and beat1[0] != 1 and beat1[-1] != 1:
insert_up_index,insert_down_index = add_beat(re_prob,th=0.1)
# print(insert_up_index,insert_down_index,i)
if len(insert_up_index) > 1:
l = i+1
for u,d in zip(insert_up_index,insert_down_index):
up_index = np.insert(up_index,l,u+down_index[i]+1+15) #np.insert(arr, obj, values, axis) arr原始数组可一可多obj插入元素位置values是插入内容axis是按行按列插入。
down_index = np.insert(down_index,l,d+down_index[i]+1+15)
l = l+1
# print('l=', l)
elif len(insert_up_index) == 1:
# print(i)
up_index = np.insert(up_index,i+1,down_index[i]+insert_up_index+1+15)
down_index = np.insert(down_index,i+1,down_index[i]+insert_down_index+1+15)
i = i + len(insert_up_index) + 1
i = i+1
i = i+1
# print("最终:",up_index.shape,down_index.shape)
new_up_index = up_index
new_down_index = down_index
flag = 0
i = 0
lenth = len(up_index)
while i < lenth:
if abs(up_index[i+1]-up_index[i]) < th2:
prob_forward = y[up_index[i]+1:down_index[i]+1]
prob_backward = y[up_index[i+1]+1:down_index[i+1]+1]
forward_score = 0
back_score = 0
forward_count = down_index[i] - up_index[i]
back_count = down_index[i+1] - up_index[i+1]
forward_max = np.max(prob_forward)
back_max = np.max(prob_backward)
forward_min = np.min(prob_forward)
back_min = np.min(prob_backward)
forward_average = np.mean(prob_forward)
back_average = np.mean(prob_backward)
if forward_count > back_count:
forward_score = forward_score + 1
else:back_score = back_score + 1
if forward_max > back_max:
forward_score = forward_score + 1
else:back_score = back_score + 1
if forward_min < back_min:
forward_score = forward_score + 1
else:back_score = back_score + 1
if forward_average > back_average:
forward_score = forward_score + 1
else:back_score = back_score + 1
if forward_score >=3:
up_index = np.delete(up_index, i+1)
down_index = np.delete(down_index, i+1)
flag = 1
elif back_score >=3:
up_index = np.delete(up_index, i)
down_index = np.delete(down_index, i)
flag = 1
elif forward_score == back_score:
if forward_average > back_average:
up_index = np.delete(up_index, i + 1)
down_index = np.delete(down_index, i + 1)
flag = 1
up_index = np.delete(up_index, i)
down_index = np.delete(down_index, i)
flag = 1
if flag == 1:
i = i
flag = 0
else: i = i+1
else:i = i + 1
if i > len(up_index)-2:
# elif abs(up_index[i+1]-up_index[i]) > 120:
# print("全部处理之后",up_index.shape,down_index.shape)
predict_J = (up_index.reshape(-1) + down_index.reshape(-1)) // 2*up
# predict_J = predict_J.astype(int)
return predict_J
def add_beat(y,th=0.2): #通过预测计算回原来J峰的坐标 输入y_prob,predict=ture,up*10,降采样多少就乘多少
:param y: 预测输出值或者标签值label
:param predict: ture or false
:param up: 降采样为多少就多少
:return: 预测的J峰位置
beat1 = np.where(y>th,1,0)
beat_diff1 = np.diff(beat1) #一阶差分
add_up_index = np.argwhere(beat_diff1 == 1).reshape(-1)
add_down_index = np.argwhere(beat_diff1 == -1).reshape(-1)
# print(beat1)
# print(add_up_index,add_down_index)
if len(add_up_index) > 0:
if add_up_index[0] > add_down_index[0]:
add_down_index = np.delete(add_down_index, 0)
if add_up_index[-1] > add_down_index[-1]:
add_up_index = np.delete(add_up_index, -1)
return add_up_index, add_down_index
return 0
def calculate_beat(y,predict,th=0.5,up=10): #通过预测计算回原来J峰的坐标 输入y_prob,predict=ture,up*10,降采样多少就乘多少
:param y: 预测输出值或者标签值label
:param predict: ture or false
:param up: 降采样为多少就多少
:return: 预测的J峰位置
if predict:
beat = np.where(y>th,1,0)
beat = y
beat_diff = np.diff(beat) #一阶差分
up_index = np.argwhere(beat_diff == 1).reshape(-1)
down_index = np.argwhere(beat_diff == -1).reshape(-1)
if len(up_index)==0:
return [0]
if up_index[0] > down_index[0]:
down_index = np.delete(down_index, 0)
if up_index[-1] > down_index[-1]:
up_index = np.delete(up_index, -1)
predict_J = (up_index.reshape(-1) + down_index.reshape(-1)) // 2*up
# predict_J = predict_J.astype(int)
return predict_J
def Jpeak_Detection(bcg_data, detector_method, fs, low_cut, high_cut, th1, th2, th3, th4, useCPU):
if detector_method == "Fivelayer_Unet_1":
molde_name = "Fivelayer_Unet"
filename = 1
elif detector_method == "Fivelayer_Unet_2":
molde_name = "Fivelayer_Unet"
filename = 2
elif detector_method == "Fivelayer_Lstm_Unet_1":
molde_name = "Fivelayer_Lstm_Unet"
filename = 1
elif detector_method == "Fivelayer_Lstm_Unet_2":
molde_name = "Fivelayer_Lstm_Unet"
filename = 2
model_dir = "./result/" + molde_name + '/' + str(filename) + ".pkl" # 模型路径
# preprocessing = BCG_Operation(sample_rate=1000)
# BCG = preprocessing.down_sample(BCG, down_radio=int(1000 / fs))
bcg_data = bcg_data[:len(bcg_data) // (fs * 10) * fs * 10]
model = modle_dic[molde_name]
model.load_state_dict(torch.load(model_dir, map_location=torch.device('cpu')))
preprocessing = BCG_Operation(sample_rate=fs)
bcg = preprocessing.Butterworth(bcg_data, "bandpass", low_cut=low_cut, high_cut=high_cut, order=3) * th4
# J峰预测
beat, up_index, down_index, y_prob = evaluate(bcg, model=model, fs=fs, useCPU=useCPU)
y_prob = y_prob.cpu().reshape(-1).data.numpy()
predict_J = new_calculate_beat(y_prob, 1, th=0.6, up=fs // 100, th1=th1, th2=th2)
predict_J = find_TPeak(bcg, predict_J, th=int(th3 * fs / 1000))
predict_J = np.array(predict_J)
Interval = np.full(len(bcg), np.nan)
for i in range(len(predict_J) - 1):
Interval[predict_J[i]: predict_J[i + 1]] = predict_J[i + 1] - predict_J[i]
return bcg, predict_J, Interval