How to do it...

Execute the following steps to train a 1D CNN in PyTorch.

  1. Import the libraries:
import yfinance as yf
import numpy as np
import os
import random

import torch
import torch.optim as optim
import torch.nn as nn
from torch.utils.data import (Dataset, TensorDataset,
DataLoader, Subset)
from collections import OrderedDict
from chapter_10_utils import create_input_data, custom_set_seed

from sklearn.metrics import mean_squared_error

device = 'cuda' if torch.cuda.is_available() else 'cpu'
  1. Define the parameters:
# data
TICKER = 'INTL'
START_DATE = '2015-01-02'
END_DATE = '2019-12-31'
VALID_START = '2019-07-01'
N_LAGS = 12

# neural network
BATCH_SIZE = 5
N_EPOCHS = 2000
  1. Download and prepare the data:
df = yf.download(TICKER, 
start=START_DATE,
end=END_DATE,
progress=False)

df = df.resample('W-MON').last()
valid_size = df.loc[VALID_START:END_DATE].shape[0]
prices = df['Adj Close'].values
  1. Transform the time series into input for the CNN:
X, y = create_input_data(prices, N_LAGS)
  1. Obtain the naïve forecast:
naive_pred = prices[len(prices) – valid_size – 1:-1]
y_valid = prices[len(prices) – valid_size:]

naive_mse = mean_squared_error(y_valid, naive_pred)
naive_rmse = np.sqrt(naive_mse)
print(f"Naive forecast – MSE: {naive_mse:.2f}, RMSE: {naive_rmse:.2f}")

Running the code produces the following output:

Naive forecast – MSE: 4.16, RMSE: 2.04
  1. Prepare the DataLoader objects:
# set seed for reproducibility
custom_set_seed(42)

valid_ind = len(X) – valid_size

X_tensor = torch.from_numpy(X).float()
y_tensor = torch.from_numpy(y).float().unsqueeze(dim=1)

dataset = TensorDataset(X_tensor, y_tensor)

train_dataset = Subset(dataset, list(range(valid_ind)))
valid_dataset = Subset(dataset, list(range(valid_ind, len(X))))

train_loader = DataLoader(dataset=train_dataset, batch_size=BATCH_SIZE) valid_loader = DataLoader(dataset=valid_dataset, batch_size=BATCH_SIZE)
  1. Define the CNN's architecture:
class Flatten(nn.Module):
def forward(self, x):
return x.view(x.size()[0], -1)

model = nn.Sequential(OrderedDict([
('conv_1', nn.Conv1d(1, 32, 3, padding=1)),
('max_pool_1', nn.MaxPool1d(2)),
('relu_1', nn.ReLU()),
('flatten', Flatten()),
('fc_1', nn.Linear(192, 50)),
('relu_2', nn.ReLU()),
('dropout_1', nn.Dropout(0.4)),
('fc_2', nn.Linear(50, 1))
]))

print(model)

Running the code produces the following output:

Sequential(
(conv_1): Conv1d(1, 32, kernel_size=(3,), stride=(1,), padding=(1,))
(max_pool_1): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
(relu_1): ReLU()
(flatten): Flatten()
(fc_1): Linear(in_features=192, out_features=50, bias=True)
(relu_2): ReLU()
(dropout_1): Dropout(p=0.4, inplace=False)
(fc_2): Linear(in_features=50, out_features=1, bias=True)
)
  1. Instantiate the model, the loss function, and the optimizer:
model = model.to(device)
loss_fn = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
  1. Train the network:
PRINT_EVERY = 50
train_losses, valid_losses = [], []

for epoch in range(N_EPOCHS):
running_loss_train = 0
running_loss_valid = 0

model.train()

for x_batch, y_batch in train_loader:

optimizer.zero_grad()

x_batch = x_batch.to(device)
x_batch = x_batch.view(x_batch.shape[0], 1, N_LAGS)
y_batch = y_batch.to(device)
y_batch = y_batch.view(y_batch.shape[0], 1, 1)
y_hat = model(x_batch).view(y_batch.shape[0], 1, 1)
loss = torch.sqrt(loss_fn(y_batch, y_hat))
loss.backward()
optimizer.step()
running_loss_train += loss.item() * x_batch.size(0)

epoch_loss_train = running_loss_train / len(train_loader.dataset)
train_losses.append(epoch_loss_train)

with torch.no_grad():
model.eval()
for x_val, y_val in valid_loader:
x_val = x_val.to(device)
x_val = x_val.view(x_val.shape[0], 1, N_LAGS)
y_val = y_val.to(device)
y_val = y_val.view(y_val.shape[0], 1, 1)
y_hat = model(x_val).view(y_val.shape[0], 1, 1)
loss = torch.sqrt(loss_fn(y_val, y_hat))
running_loss_valid += loss.item() * x_val.size(0)

epoch_loss_valid = running_loss_valid / len(valid_loader.dataset)

if epoch > 0 and epoch_loss_valid < min(valid_losses):
best_epoch = epoch
torch.save(model.state_dict(), './cnn_checkpoint.pth')

valid_losses.append(epoch_loss_valid)

if epoch % PRINT_EVERY == 0:
print(f"<{epoch}> – Train. loss: {epoch_loss_train:.6f} Valid. loss: {epoch_loss_valid:.6f}")

print(f'Lowest loss recorded in epoch: {best_epoch}')
  1. Plot the losses over epochs:

As the code is identical to the one in step 11 of the previous recipe, Multilayer perceptrons for time series forecasting, we are only presenting the resulting plot.

  1. Load the best model (with the lowest validation loss):
state_dict = torch.load('cnn_checkpoint.pth')
model.load_state_dict(state_dict)
  1. Obtain the predictions:
y_pred, y_valid = [], []

with torch.no_grad():

model.eval()

for x_val, y_val in valid_loader:
x_val = x_val.to(device)
x_val = x_val.view(x_val.shape[0], 1, N_LAGS)
y_pred.append(model(x_val))
y_valid.append(y_val)

y_pred = torch.cat(y_pred).numpy().flatten()
y_valid = torch.cat(y_valid).numpy().flatten()
  1. Evaluate the predictions:
cnn_mse = mean_squared_error(y_valid, y_pred)
cnn_rmse = np.sqrt(cnn_mse)
print(f"CNN's forecast – MSE: {cnn_mse:.2f}, RMSE: {cnn_rmse:.2f}")

fig, ax = plt.subplots()

ax.plot(y_valid, color='blue', label='Actual')
ax.plot(y_pred, color='red', label='Prediction')

ax.set(title="CNN's Forecasts",
xlabel='Time',
ylabel='Price ($)')
ax.legend()

Running the code generates the following plot:

The performance is summarized as follows:

CNN's forecast – MSE: 3.58, RMSE: 1.89
The simple CNN was able to beat the naïve benchmark on the validation set.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.221.140.111