<h1>Experiment 3</h1>
<h3>Optimised model training</h3>
<p>In experiment 3 the model was trained using the optimised hyperparameters. By examining the results of expt2, it was noticed that trials #1, #10, and #16 all resulted in quite low losses while also showing clear downward trends resembling a clearly discernible training curve. Of these, trial #16 was ultimately selected as the model to be tested, as the data suggests that <code>`in_act=Mish`</code> tends to give the lowest losses in most models tested. The parameters for trial #16 were as follows:</p>
<ul>
<li><b>in_act</b> = Mish</li>
<li><b>compressor_kernel_size</b> = 128</li>
<li><b>compressor_chunk_size</b> = 128</li>
<li><b>compressor_act</b> = SoftExp</li>
<li><b>conv_kernel_size</b> = 128</li>
<li><b>conv_act</b> = Sigmoid</li>
<li><b>channel_combine_act</b> = GELU</li>
<li><b>ff_width</b> = 512</li>
<li><b>ff_depth</b> = 2</li>
<li><b>ff_act</b> = CELU</li>
<li><b>out_act</b> = Tanhshrink</li>
</ul>
<p>
Because most of the training curves in expt2 appeared to be unstable, a learning rate scheduler was used to reduce the learning rate by 20% if the validation loss did not improve for 5 epochs. The model was checkpointed, with the best 10 iterations of the model being retained for testing after training.
</p>
<h3>Modified optimal model training</h3>
<p>
Following the first attempt at training the optimised model (Model 1, Test 1), it was noted that training curves were clearly discernible, but still quite unstable and noisy. To try and further improve the stability of the training, a modified version of the model was prepared and trained (Model 2, Test 2). The modified model was the same as Model 1, but with the addition of a LayerNormalization layer to the convolutional layer of the <code>`DaskCompressor`</code> submodule. This change was made because highly recurrent submodules such as the compressor are known to be especially prone to instability caused by vanishing or exploding gradients. It was reasoned that by normalizing at each iteration the gradients would be less likely to vanish or explode, making the training more stable.
</p>

In [1]:
# Data handling imports
from dask.distributed import Client, LocalCluster
import dask
import dask.dataframe as dd
import dask.array as da
import numpy as np
import pickle
import random
from itertools import chain
from tqdm.auto import tqdm

# Deep learning imports
import torch
from torch.utils.data import DataLoader
from torch import nn
from torch.nn import functional as F
from torch import optim
import pytorch_lightning as pl
from pytorch_lightning import Trainer
from pytorch_lightning.callbacks import EarlyStopping, ModelCheckpoint
from pytorch_lightning.loggers import WandbLogger

# Suppress some warning messages from pytorch_lightning,
# It really doesn't like that i've forced it to handle a dask array!
import warnings

warnings.filterwarnings("ignore", category=UserWarning, module=pl.__name__)

# Also, set up a log to record debug messages for failed trials
import logging

logging.basicConfig(filename="debug.log", encoding="utf-8", level=logging.DEBUG)

In [2]:
from expt1 import (
    Model,
    device,
    X_train,
    y_train,
    X_val,
    y_val,
    create_collate_fn,
)
from custom_activations import SoftExp

In [3]:
cluster = LocalCluster(n_workers=8, threads_per_worker=1)
client = Client(cluster)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 34477 instead


In [4]:
# Monkey patch to allow pytorch lightning to accept a dask array as a model input
from typing import Any, Generator, Iterable, Mapping, Optional, Union

BType = Union[da.Array, torch.Tensor, str, Mapping[Any, "BType"], Iterable["BType"]]

unpatched = pl.utilities.data._extract_batch_size


def patch(batch: BType) -> Generator[Optional[int], None, None]:
    if isinstance(batch, da.core.Array):
        if len(batch.shape) == 0:
            yield 1
        else:
            yield batch.shape[0]
    else:
        yield from unpatched(batch)


pl.utilities.data._extract_batch_size = patch

In [5]:
# Prepare datasets
train = DataLoader(
    list(zip(X_train.values(), y_train.values())),
    collate_fn=create_collate_fn(),
    shuffle=True,
)
valid = DataLoader(
    list(zip(X_val.values(), y_val.values())),
    shuffle=True,
    collate_fn=create_collate_fn(),
)

In [6]:
# Set up the model architecture and other necessary components
model = Model(
    # Training parameters
    optimizer=optim.Adam,
    scheduler=optim.lr_scheduler.ReduceLROnPlateau,
    scheduler_kwargs={"factor": 0.8, "patience": 5},
    # Model parameters
    in_act=(nn.Mish, list(), dict()),
    compressor_kernel_size=128,
    compressor_chunk_size=128,
    compressor_act=(SoftExp, list(), dict()),
    conv_kernel_size=128,
    conv_act=(nn.Sigmoid, list(), dict()),
    channel_combine_act=(nn.GELU, list(), dict()),
    ff_width=512,
    ff_depth=2,
    ff_act=(nn.CELU, list(), dict()),
    out_size=len(list(next(iter(y_train.values())).keys())),
    out_act=(nn.Tanhshrink, list(), dict()),
).to(device)

In [7]:
early_stop_callback = EarlyStopping(
    monitor="val_loss", patience=15, verbose=False, mode="min"
)

checkpoint_callback = ModelCheckpoint(
    monitor="val_loss",
    dirpath="./checkpoints",
    filename="checkpoint-{epoch:02d}-{val_loss:.2f}",
    save_top_k=10,
    mode="min",
)

logger = WandbLogger(project="Aconity_ML_Test_DryRun", name=f"Test 1")
logger.experiment.watch(model, log="all", log_freq=1)

trainer = Trainer(
    accelerator="gpu",
    max_epochs=-1,
    devices="auto",
    strategy="auto",
    logger=logger,
    callbacks=[checkpoint_callback, early_stop_callback],
    num_sanity_val_steps=0,  # Needs to be disabled or else we get an error because X is dask array
)

[34m[1mwandb[0m: Currently logged in as: [33mchughes000[0m. Use [1m`wandb login --relogin`[0m to force relogin


VBox(children=(Label(value='Waiting for wandb.init()...\r'), FloatProgress(value=0.016669258750092317, max=1.0â€¦

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


In [8]:
# Finally, train the model
trainer.fit(model, train, valid)

LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
  rank_zero_warn(

   | Name                     | Type            | Params
--------------------------------------------------------------
0  | loss                     | MSELoss         | 0     
1  | in_act                   | Mish            | 0     
2  | convolutional_compressor | DaskCompression | 3.2 K 
3  | compressor_act           | SoftExp         | 1     
4  | conv                     | Conv1d          | 3.2 K 
5  | conv_act                 | Sigmoid         | 0     
6  | combine_channels         | Conv1d          | 6     
7  | channel_combine_act      | GELU            | 0     
8  | ff                       | Sequential      | 525 K 
9  | out_dense                | Linear          | 11.8 K
10 | out_act                  | Tanhshrink      | 0     
--------------------------------------------------------------
543 K     Trainable params
0         Non-trainable params
543 K     Total params
2.174     Total estimated model params size (MB)

Training: 0it [00:00, ?it/s]