I. 前言
在上一篇文章深入理解PyTorch中LSTM的输入和输出(从input输入到Linear输出)中,我详细地解释了如何利用PyTorch来搭建一个LSTM模型,本篇文章的主要目的是搭建一个LSTM模型用于时间序列预测。
系列文章:
- 深入理解PyTorch中LSTM的输入和输出(从input输入到Linear输出)
- PyTorch搭建LSTM实现时间序列预测(负荷预测)
- PyTorch搭建LSTM实现多变量时间序列预测(负荷预测)
- PyTorch搭建双向LSTM实现时间序列预测(负荷预测)
- PyTorch搭建LSTM实现多变量多步长时间序列预测(一):直接多输出
- PyTorch搭建LSTM实现多变量多步长时间序列预测(二):单步滚动预测
- PyTorch搭建LSTM实现多变量多步长时间序列预测(三):多模型单步预测
- PyTorch搭建LSTM实现多变量多步长时间序列预测(四):多模型滚动预测
- PyTorch搭建LSTM实现多变量多步长时间序列预测(五):seq2seq
- PyTorch中实现LSTM多步长时间序列预测的几种方法总结(负荷预测)
- PyTorch-LSTM时间序列预测中如何预测真正的未来值
- PyTorch搭建LSTM实现多变量输入多变量输出时间序列预测(多任务学习)
- PyTorch搭建ANN实现时间序列预测(风速预测)
- PyTorch搭建CNN实现时间序列预测(风速预测)
- PyTorch搭建CNN-LSTM混合模型实现多变量多步长时间序列预测(负荷预测)
- PyTorch搭建Transformer实现多变量多步长时间序列预测(负荷预测)
- PyTorch时间序列预测系列文章总结(代码使用方法)
- TensorFlow搭建LSTM实现时间序列预测(负荷预测)
- TensorFlow搭建LSTM实现多变量时间序列预测(负荷预测)
- TensorFlow搭建双向LSTM实现时间序列预测(负荷预测)
- TensorFlow搭建LSTM实现多变量多步长时间序列预测(一):直接多输出
- TensorFlow搭建LSTM实现多变量多步长时间序列预测(二):单步滚动预测
- TensorFlow搭建LSTM实现多变量多步长时间序列预测(三):多模型单步预测
- TensorFlow搭建LSTM实现多变量多步长时间序列预测(四):多模型滚动预测
- TensorFlow搭建LSTM实现多变量多步长时间序列预测(五):seq2seq
- TensorFlow搭建LSTM实现多变量输入多变量输出时间序列预测(多任务学习)
- TensorFlow搭建ANN实现时间序列预测(风速预测)
- TensorFlow搭建CNN实现时间序列预测(风速预测)
- TensorFlow搭建CNN-LSTM混合模型实现多变量多步长时间序列预测(负荷预测)
II. 数据处理
数据集为某个地区某段时间内的电力负荷数据,除了负荷以外,还包括温度、湿度等信息。
本篇文章暂时不考虑其它变量,只考虑用历史负荷来预测未来负荷。本文中,我们根据前24个时刻的负荷下一时刻的负荷。有关多变量预测请参考:PyTorch搭建LSTM实现多变量时间序列预测(负荷预测)。
def load_data(file_name): df = pd.read_csv('data/new_data/' + file_name, encoding='gbk') columns = df.columns df.fillna(df.mean(), inplace=True) return df class MyDataset(Dataset): def __init__(self, data): self.data = data def __getitem__(self, item): return self.data[item] def __len__(self): return len(self.data) def nn_seq_us(B): print('data processing...') dataset = load_data() # split train = dataset[:int(len(dataset) * 0.6)] val = dataset[int(len(dataset) * 0.6):int(len(dataset) * 0.8)] test = dataset[int(len(dataset) * 0.8):len(dataset)] m, n = np.max(train[train.columns[1]]), np.min(train[train.columns[1]]) def process(data, batch_size, shuffle): load = data[data.columns[1]] load = load.tolist() data = data.values.tolist() load = (load - n) / (m - n) seq = [] for i in range(len(data) - 24): train_seq = [] train_label = [] for j in range(i, i + 24): x = [load[j]] train_seq.append(x) # for c in range(2, 8): # train_seq.append(data[i + 24][c]) train_label.append(load[i + 24]) train_seq = torch.FloatTensor(train_seq) train_label = torch.FloatTensor(train_label).view(-1) seq.append((train_seq, train_label)) # print(seq[-1]) seq = MyDataset(seq) seq = DataLoader(dataset=seq, batch_size=batch_size, shuffle=shuffle, num_workers=0, drop_last=True) return seq Dtr = process(train, B, True) Val = process(val, B, True) Dte = process(test, B, False) return Dtr, Val, Dte, m, n
上面代码用了DataLoader来对原始数据进行处理,最终得到了batch_size=B的数据集Dtr、Val以及Dte,Dtr为训练集,Val为验证集,Dte为测试集。
III. LSTM模型
这里采用了深入理解PyTorch中LSTM的输入和输出(从input输入到Linear输出)中的模型:
class LSTM(nn.Module): def __init__(self, input_size, hidden_size, num_layers, output_size, batch_size): super().__init__() self.input_size = input_size self.hidden_size = hidden_size self.num_layers = num_layers self.output_size = output_size self.num_directions = 1 # 单向LSTM self.batch_size = batch_size self.lstm = nn.LSTM(self.input_size, self.hidden_size, self.num_layers, batch_first=True) self.linear = nn.Linear(self.hidden_size, self.output_size) def forward(self, input_seq): batch_size, seq_len = input_seq.shape[0], input_seq.shape[1] h_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size).to(device) c_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size).to(device) # output(batch_size, seq_len, num_directions * hidden_size) output, _ = self.lstm(input_seq, (h_0, c_0)) # output(5, 30, 64) pred = self.linear(output) # (5, 30, 1) pred = pred[:, -1, :] # (5, 1) return pred
IV. 训练
def train(args, Dtr, Val, path): input_size, hidden_size, num_layers = args.input_size, args.hidden_size, args.num_layers output_size = args.output_size if args.bidirectional: model = BiLSTM(input_size, hidden_size, num_layers, output_size, batch_size=args.batch_size).to(device) else: model = LSTM(input_size, hidden_size, num_layers, output_size, batch_size=args.batch_size).to(device) loss_function = nn.MSELoss().to(device) if args.optimizer == 'adam': optimizer = torch.optim.Adam(model.parameters(), lr=args.lr, weight_decay=args.weight_decay) else: optimizer = torch.optim.SGD(model.parameters(), lr=args.lr, momentum=0.9, weight_decay=args.weight_decay) scheduler = StepLR(optimizer, step_size=args.step_size, gamma=args.gamma) # training min_epochs = 10 best_model = None min_val_loss = 5 for epoch in tqdm(range(args.epochs)): train_loss = [] for (seq, label) in Dtr: seq = seq.to(device) label = label.to(device) y_pred = model(seq) loss = loss_function(y_pred, label) train_loss.append(loss.item()) optimizer.zero_grad() loss.backward() optimizer.step() scheduler.step() # validation val_loss = get_val_loss(args, model, Val) if epoch > min_epochs and val_loss < min_val_loss: min_val_loss = val_loss best_model = copy.deepcopy(model) print('epoch {:03d} train_loss {:.8f} val_loss {:.8f}'.format(epoch, np.mean(train_loss), val_loss)) model.train() state = {
'models': best_model.state_dict()} torch.save(state, path)
保存训练过程中在验证集上表现最好的模型。
V. 测试
def test(args, Dte, path, m, n): pred = [] y = [] print('loading models...') input_size, hidden_size, num_layers = args.input_size, args.hidden_size, args.num_layers output_size = args.output_size if args.bidirectional: model = BiLSTM(input_size, hidden_size, num_layers, output_size, batch_size=args.batch_size).to(device) else: model = LSTM(input_size, hidden_size, num_layers, output_size, batch_size=args.batch_size).to(device) # models = LSTM(input_size, hidden_size, num_layers, output_size, batch_size=args.batch_size).to(device) model.load_state_dict(torch.load(path)['models']) model.eval() print('predicting...') for (seq, target) in tqdm(Dte): target = list(chain.from_iterable(target.data.tolist())) y.extend(target) seq = seq.to(device) with torch.no_grad(): y_pred = model(seq) y_pred = list(chain.from_iterable(y_pred.data.tolist())) pred.extend(y_pred) y, pred = np.array(y), np.array(pred) y = (m - n) * y + n pred = (m - n) * pred + n print('mape:', get_mape(y, pred)) # plot x = [i for i in range(1, 151)] x_smooth = np.linspace(np.min(x), np.max(x), 900) y_smooth = make_interp_spline(x, y[150:300])(x_smooth) plt.plot(x_smooth, y_smooth, c='green', marker='*', ms=1, alpha=0.75, label='true') y_smooth = make_interp_spline(x, pred[150:300])(x_smooth) plt.plot(x_smooth, y_smooth, c='red', marker='o', ms=1, alpha=0.75, label='pred') plt.grid(axis='y') plt.legend() plt.show()
VI. 源码及数据
暂无。
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/203244.html原文链接:https://javaforall.net
