import copy
import pathlib
import pickle

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
from matplotlib.ticker import MaxNLocator
from scipy.ndimage import zoom
from sklearn.metrics import ConfusionMatrixDisplay
from sklearn.metrics import RocCurveDisplay
from sklearn.metrics import accuracy_score
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split
from torch import nn
from torch.nn.functional import relu
from torch.utils.data import DataLoader
from torch.utils.data import TensorDataset

pd.set_option("display.max_columns", 10)
pd.set_option("display.precision", 3)
plt.style.use("default")

random_state = 42
batch_size = 256
max_epochs = 30

!command -v nvidia-smi &> /dev/null && nvidia-smi

Data

Initially, we load the data and appropriately split it into datasets for model training, validation and evaluation (accuracy estimation).

workdir = pathlib.Path(".")

train_file = workdir / "train.csv"
eval_file = workdir / "evaluate.csv"
res_file = workdir / "results.csv"

artifacts_path = workdir / "artifacts"
models_path = artifacts_path / "models"
logs_path = artifacts_path / "logs"
outputs_path = artifacts_path / "outputs"

artifacts_path.mkdir(parents=True, exist_ok=True)
models_path.mkdir(parents=True, exist_ok=True)
logs_path.mkdir(parents=True, exist_ok=True)
outputs_path.mkdir(parents=True, exist_ok=True)

df = pd.read_csv(train_file)
display(df.head(5))

cats = [
    "T-shirt/top",
    "Trouser",
    "Pullover",
    "Dress",
    "Coat",
    "Sandal",
    "Shirt",
    "Sneaker",
    "Bag",
    "Ankle boot",
]
MAP_IL = {i: c for i, c in enumerate(cats)}
VEC_IL = np.vectorize(MAP_IL.get)

d = df.drop("label", axis=1)
fig, ax = plt.subplots(4, 8, figsize=(10, 6))
fig.suptitle("Sample from training data (inverted brightness)")
for j in range(4):
    for i in range(8):
        ax[j, i].imshow(d.iloc[5 * j + i].to_numpy().reshape((32, 32)), cmap="Greys")
        ax[j, i].set_title(MAP_IL[df.loc[5 * j + i, "label"]], fontsize=11)
        ax[j, i].set_axis_off()
plt.show()
pix1 pix2 pix3 pix4 pix5 ... pix1021 pix1022 pix1023 pix1024 label
0 0 0 0 0 0 ... 0 0 0 0 3
1 1 1 1 1 1 ... 1 1 1 1 3
2 1 1 1 1 1 ... 1 1 1 1 7
3 0 0 0 0 0 ... 0 0 0 0 9
4 1 1 1 1 1 ... 1 1 1 1 5

5 rows × 1025 columns

These are 32x32 pixel grayscale images. Each image belongs to one of 10 classes:

  • T-shirt/top
  • Trouser
  • Pullover
  • Dress
  • Coat
  • Sandal
  • Shirt
  • Sneaker
  • Bag
  • Ankle boot

Train-validation-test split

X_all = df.drop("label", axis=1).to_numpy(np.uint8)
y_all = df.loc[:, "label"].to_numpy(np.uint8)

X_train_val, X_test, y_train_val, y_test = train_test_split(
    X_all, y_all, test_size=0.2, random_state=random_state
)
X_train, X_val, y_train, y_val = train_test_split(
    X_train_val, y_train_val, test_size=0.25, random_state=random_state
)

d = pd.DataFrame(
    [
        ["train", X_train.shape[0], X_train.shape[0] / X_all.shape[0]],
        ["validation", X_val.shape[0], X_val.shape[0] / X_all.shape[0]],
        ["test", X_test.shape[0], X_test.shape[0] / X_all.shape[0]],
    ],
    columns=["dataset", "count", "relative count"],
)
display(d)

fig, ax = plt.subplots(figsize=(4, 4))
ax.pie(d["count"], labels=d["dataset"], autopct="%1.f%%", wedgeprops={"alpha": 0.95})
ax.set_title("Train-validation-test split")
plt.show()
dataset count relative count
0 train 31500 0.6
1 validation 10500 0.2
2 test 10500 0.2

Exploratory data analysis

Now we take a closer look at the features and the target variable. We also show the random sample of each class from the training set.

Features (pixels)

d = pd.DataFrame(
    [
        ["count", X_train.shape[0]],
        ["size", X_train.shape[1]],
        ["min", X_train.min()],
        ["mean", X_train.mean().astype(int)],
        ["max", X_train.max()],
    ],
    columns=["stat", "value"],
)
display(d.set_index("stat").T)

d = X_train.flatten()
d2 = pd.DataFrame(
    [
        ["black pixel (lte 10)", d[d <= 10].shape[0]],
        ["non-black pixel (gt 10)", d[d > 10].shape[0]],
    ],
    columns=["color", "count"],
)

fig, ax = plt.subplots(figsize=(6, 3))
ax.bar(d2["color"], d2["count"])
ax.set_title("Pixel count: black v. non-black")
ax.set_ylabel("count")
plt.show()

d = X_train.flatten()

fig, ax = plt.subplots(figsize=(6, 4))
ax.hist(d[d > 10], bins=20)
ax.set_title("Histogram of gray level distribution (gt 10)")
ax.set_xlabel("grey level")
ax.set_ylabel("frequency")
plt.show()
stat count size min mean max
value 31500 1024 0 44 255

We observe that the images are largely composed of black pixels. For the other pixels, those with brightness around 200 are represented the most.

Target variable (class)

d = []
for c in range(10):
    d.append(
        [
            f"{MAP_IL[c]}",
            sum(y_train == c),
            round(sum(y_train == c) / y_train.shape[0], 3),
        ]
    )
d = pd.DataFrame(d, columns=["label", "count", "relative count"])
display(d)

fig, ax = plt.subplots(figsize=(4, 4))
ax.pie(
    d.loc[:, "count"],
    labels=d.loc[:, "label"],
    autopct="%.0f%%",
    wedgeprops={"alpha": 0.8},
)
ax.set_title("Class distribution")
plt.show()
label count relative count
0 T-shirt/top 3103 0.099
1 Trouser 3052 0.097
2 Pullover 3170 0.101
3 Dress 3179 0.101
4 Coat 3067 0.097
5 Sandal 3199 0.102
6 Shirt 3137 0.100
7 Sneaker 3183 0.101
8 Bag 3225 0.102
9 Ankle boot 3185 0.101

The individual classes are evenly represented (balanced) and do not need to be rebalanced (resampling).

for c in range(10):
    fig, ax = plt.subplots(1, 10, figsize=(10, 1.5))
    fig.suptitle(f"{MAP_IL[c]}")

    d = X_train[y_train == c]
    for i in range(10):
        ax[i].imshow(d[i].reshape((32, 32)), cmap="Greys")
        ax[i].set_axis_off()
    plt.show()

From the visualization of a small selection of training data, we can assume that it will most likely be problematic to distinguish between the T-shirt/top and Shirt classes, or Pullover and Coat, or Sneaker and Ankle boot.

Data preprocessing

In this section, we focus on data preprocessing and data augmentation.

Image augmentation

def augment(x, k):
    x1 = np.fliplr(x)
    x2 = zoom(x[k : 32 - k, k : 32 - k], 32 / (32 - 2 * k), order=0)
    x3 = np.fliplr(x2)
    return x1, x2, x3


def data_augment(X, y):
    X_aug = np.empty((4 * X.shape[0], X.shape[1]), dtype=np.uint8)
    y_aug = np.repeat(y, 4)
    for i in range(X.shape[0]):
        x0 = X[i].reshape((32, 32))
        x1, x2, x3 = augment(x0, 2)
        X_aug[4 * i] = x0.flatten()
        X_aug[4 * i + 1] = x1.flatten()
        X_aug[4 * i + 2] = x2.flatten()
        X_aug[4 * i + 3] = x3.flatten()
    return X_aug, y_aug


new_X_train, new_y_train = data_augment(X_train, y_train)
new_X_val, new_y_val = data_augment(X_val, y_val)


fig, axes = plt.subplots(5, 4, figsize=(8, 8))
for i in range(5):
    x1 = X_train[i].reshape((32, 32))
    x2, x3, x4 = augment(x1, k=5)

    ax = axes[i]
    ax[0].imshow(x1, cmap="Greys")
    ax[1].imshow(x2, cmap="Greys")
    ax[2].imshow(x3, cmap="Greys")
    ax[3].imshow(x4, cmap="Greys")

    if i == 0:
        ax[0].set_title("original")
        ax[1].set_title("flipped")
        ax[2].set_title("cropped")

    for j in range(4):
        ax[j].set_axis_off()

plt.show()

It is desirable that the model is flip and zoom invariant. A change in orientation or size has no effect on the class.

Set up data and device

In this section we prepare datasets so that we can access them in batches.

train_dataloader = DataLoader(
    TensorDataset(
        torch.Tensor(new_X_train).reshape((-1, 1, 32, 32)),
        torch.Tensor(new_y_train).type(torch.uint8),
    ),
    batch_size=batch_size,
)

val_dataloader = DataLoader(
    TensorDataset(
        torch.Tensor(new_X_val).reshape((-1, 1, 32, 32)),
        torch.Tensor(new_y_val).type(torch.uint8),
    ),
    batch_size=batch_size,
)

test_dataloader = DataLoader(
    TensorDataset(
        torch.Tensor(X_test).reshape((-1, 1, 32, 32)),
        torch.Tensor(y_test).type(torch.uint8),
    ),
    batch_size=batch_size,
)

for X, y in train_dataloader:
    print(f"Shape of X [N, C, H, W]: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps" if torch.backends.mps.is_available() else "cpu"
)
print(f"Using {device} device")
Shape of X [N, C, H, W]: torch.Size([256, 1, 32, 32])
Shape of y: torch.Size([256]) torch.uint8
Using cpu device

Feed forward neural network

Here we define functions for training neural networks. All models are regularized by the early stopping method.

class EarlyStopping:
    def __init__(self, tolerance, model):
        self.tolerance = tolerance
        self.counter = 0
        self.early_stop = False
        self.init = False
        self.max_acc = 0
        self.best_model = model

    def __call__(self, model, val_acc):
        if not self.init:
            self.init = True
            self.max_acc = val_acc
            self.best_model = copy.deepcopy(model)
            return

        if val_acc > self.max_acc:
            self.counter = 0
            self.max_acc = val_acc
            self.best_model = copy.deepcopy(model)
            return
        else:
            self.counter += 1
            if self.counter >= self.tolerance:
                self.early_stop = True
            return


def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.train()

    total_loss, total_correct = 0, 0
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        pred = model(X)
        loss = loss_fn(pred, y)

        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        with torch.no_grad():
            total_loss += loss.item()
            total_correct += (pred.argmax(1) == y).type(torch.float).sum().item()

        if batch % 100 == 0:
            loss, current = loss.item(), (batch + 1) * len(X)

    total_loss /= num_batches
    total_correct /= size
    return total_correct, total_loss


def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()

    loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()

    loss /= num_batches
    correct /= size

    print(
        f"Validation Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {loss:>8f} \n"
    )
    return correct, loss


def predict_proba(dataloader, model, out_path):
    if out_path.is_file():
        return pickle.load(open(out_path, "rb"))

    size = len(dataloader.dataset)
    y_hat = np.empty((0, 10), dtype=np.uint8)
    model.eval()
    with torch.no_grad():
        for X in dataloader:
            X = X[0].to(device)
            logits = model(X)
            y_hat = np.concatenate((y_hat, logits.cpu()))

    pickle.dump(y_hat, open(out_path, "wb"))
    return y_hat


def predict(dataloader, model, out_path):
    return np.argmax(predict_proba(dataloader, model, out_path), axis=1)


def load_model(file, m):
    model_path = models_path / (file + ".pt")
    m.load_state_dict(torch.load(model_path, map_location=torch.device(device)))
    return m


def train_model(file, m, optimizer, early_stopping=3, force_train=False):
    model_path = models_path / (file + ".pt")
    log_path = logs_path / (file + ".pickle")

    if model_path.is_file() and log_path.is_file() and not force_train:
        m = m.load_state_dict(torch.load(model_path, map_location=torch.device(device)))
        logs = pickle.load(open(log_path, "rb"))
    else:
        m = m.to(device)
        loss_fn = nn.CrossEntropyLoss()
        stopping = EarlyStopping(early_stopping, model)

        logs = []
        for t in range(max_epochs):
            print(f"Epoch {t+1}\n-------------------------------")
            tacc, tloss = train(train_dataloader, m, loss_fn, optimizer)
            vacc, vloss = test(val_dataloader, m, loss_fn)
            logs.append((tacc, tloss, vacc, vloss))

            stopping(m, vacc)
            if stopping.early_stop:
                break

        m = stopping.best_model
        torch.save(m.state_dict(), model_path)
        pickle.dump(logs, open(log_path, "wb"))

    stats = ["train_accuracy", "train_loss", "val_accuracy", "val_loss"]
    d = pd.DataFrame(logs, columns=stats)
    d.index += 1
    return d


def display_result(d):
    display(d.tail(5))

    fig, axes = plt.subplots(1, 2, figsize=(10, 4), layout="constrained")
    fig.suptitle("Learning curve")

    ax = axes[0]
    ax.set_title("Loss")
    ax.plot(d.index, d.loc[:, "train_loss"], label="train", linestyle="dashed")
    ax.plot(d.index, d.loc[:, "val_loss"], label="validation")
    ax.set_xlabel("epoch")
    ax.set_ylabel("loss")
    ax.xaxis.set_major_locator(MaxNLocator(integer=True))
    ax.legend()

    ax = axes[1]
    ax.set_title("Accuracy")
    ax.plot(d.index, d.loc[:, "train_accuracy"], label="train", linestyle="dashed")
    ax.plot(d.index, d.loc[:, "val_accuracy"], label="validation")
    ax.set_xlabel("epoch")
    ax.set_ylabel("accuracy")
    ax.xaxis.set_major_locator(MaxNLocator(integer=True))
    ax.legend()

    plt.show()

Model suitability

Fully connected (dense) neural network models are very flexible and should perform relatively well on image data. However, we expect convolutional networks to perform significantly better. Compared to traditional models, neural networks are capable of modeling complicated functions, which on the one hand is desirable because it is certainly a complex function, however, it can also lead to overfitting or difficult training.

Base network

class BaseNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(32 * 32, 512),
            nn.ReLU(),
            nn.Linear(512, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, 10),
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits


model = BaseNetwork()
optimizer = torch.optim.Adam(model.parameters())
d_base = train_model("base", model, optimizer)
display_result(d_base)
train_accuracy train_loss val_accuracy val_loss
10 0.851 0.404 0.833 0.467
11 0.852 0.394 0.827 0.488
12 0.856 0.388 0.823 0.504
13 0.856 0.383 0.828 0.472
14 0.859 0.377 0.821 0.505

Base network is the model we use as a baseline. In the following cells we modify this model, each time starting from this model. At the end, we compare all models en masse and evaluate the respective modification.

Wide layers

class WideNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(32 * 32, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, 10),
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits


model = WideNetwork()
optimizer = torch.optim.Adam(model.parameters())
d_wide = train_model("wide", model, optimizer)
display_result(d_wide)
train_accuracy train_loss val_accuracy val_loss
12 0.853 0.396 0.839 0.478
13 0.855 0.386 0.833 0.470
14 0.858 0.378 0.836 0.475
15 0.861 0.371 0.835 0.495
16 0.864 0.365 0.837 0.495

Wide network is a model with wider hidden layers. We observe a subtle improvement over the baseline model.

Deep network

class DeepNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(32 * 32, 512),
            nn.ReLU(),
            nn.Linear(512, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, 10),
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits


model = DeepNetwork()
optimizer = torch.optim.Adam(model.parameters())
d_deep = train_model("deep", model, optimizer)
display_result(d_deep)
train_accuracy train_loss val_accuracy val_loss
26 0.883 0.313 0.849 0.467
27 0.884 0.311 0.851 0.466
28 0.883 0.314 0.853 0.468
29 0.888 0.303 0.849 0.476
30 0.887 0.302 0.846 0.474

Deep network is a model with multiple hidden layers. We observe a significant improvement over the original model. However, the model also took roughly twice as long to train.

SGD optimizer

model = BaseNetwork()
optimizer = torch.optim.SGD(model.parameters())
d_sgd = train_model("sgd", model, optimizer)
display_result(d_sgd)
train_accuracy train_loss val_accuracy val_loss
26 0.874 0.346 0.839 0.442
27 0.876 0.341 0.840 0.438
28 0.878 0.337 0.841 0.438
29 0.879 0.332 0.842 0.437
30 0.881 0.328 0.843 0.436

The training with the SGD optimizer is significantly more stable compared to Adam (the accuracy of the model is not so fluctuating between epochs). The training was also prematurely stopped by limit, not early stopping. By training longer, we would probably get a better model.

L2 regularization

model = BaseNetwork()
optimizer = torch.optim.Adam(model.parameters(), weight_decay=1e-2)
d_l2 = train_model("l2", model, optimizer)
display_result(d_l2)
train_accuracy train_loss val_accuracy val_loss
19 0.801 0.540 0.810 0.527
20 0.800 0.539 0.797 0.558
21 0.801 0.541 0.809 0.523
22 0.802 0.539 0.804 0.538
23 0.801 0.536 0.808 0.523

L2 regularization doesn't help much in this case. The training progress is more volatile on the validation set. However, we can assume that a larger model would generalize better (we quadratically penalize the weights, so the model does not put much emphasis on any specific neuron).

Batch normalization

class BatchNormNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(32 * 32, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Linear(512, 128),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.Linear(128, 10),
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits


model = BatchNormNetwork()
optimizer = torch.optim.Adam(model.parameters())
d_batch_norm = train_model("batch_norm", model, optimizer)
display_result(d_batch_norm)
train_accuracy train_loss val_accuracy val_loss
12 0.911 0.236 0.843 0.495
13 0.919 0.217 0.841 0.524
14 0.924 0.202 0.840 0.545
15 0.930 0.187 0.836 0.575
16 0.934 0.174 0.836 0.585

Batch normalization performs better than the baseline model, however, we observe overfitting with longer training. The difference between the training curve and the validation curve is larger than for the other models.

Dropout

class DropoutNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(32 * 32, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, 128),
            nn.ReLU(),
            nn.Dropout(0.4),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, 10),
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits


model = DropoutNetwork()
optimizer = torch.optim.Adam(model.parameters())
d_dropout = train_model("dropout", model, optimizer)
display_result(d_dropout)
train_accuracy train_loss val_accuracy val_loss
16 0.748 0.687 0.788 0.596
17 0.748 0.688 0.751 0.658
18 0.750 0.686 0.783 0.612
19 0.749 0.684 0.780 0.599
20 0.749 0.689 0.754 0.643

We observe a deterioration from the baseline, but it can be assumed that increasing the number of parameters would improve the model. The model should generalise relatively well. We also see for the first time that the performance on the validation data is significantly better than on the training data. This is expected given that the dropout mechanics are turned off during evaluation.

Summary

def display_summary(logs, labels):
    accs = [l.loc[:, "val_accuracy"].max() for l in logs]

    fig, ax = plt.subplots(figsize=(10, 6))
    fig.suptitle("Comparison of networks")
    for d, label in zip(logs, labels):
        ax.plot(d.index, d.loc[:, "val_loss"], label=label, alpha=0.95)
    ax.set_title("Learning curve")
    ax.set_xlabel("epoch")
    ax.set_ylabel("validation loss")
    ax.legend()
    plt.show()

    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 4))
    fig.subplots_adjust(hspace=0.05)
    for i, l in enumerate(labels):
        ax1.bar(l, accs[i], alpha=0.7)
        ax2.bar(l, accs[i], alpha=0.7)
    ax1.set_ylim(0.75, 0.9)
    ax2.set_ylim(0, 0.2)
    ax1.spines.bottom.set_visible(False)
    ax2.spines.top.set_visible(False)
    ax1.set_xticks([])
    ax2.set_yticks([0, 0.04, 0.10, 0.15])
    ax1.xaxis.tick_top()

    ax1.axhline(accs[0], alpha=0.4, c="black", linestyle="--", linewidth=0.51)

    d = 0.5
    kwargs = dict(
        marker=[(-1, -d), (1, d)],
        markersize=12,
        linestyle="none",
        color="k",
        mec="k",
        mew=1,
        clip_on=False,
    )
    ax1.plot([0, 1], [0, 0], transform=ax1.transAxes, **kwargs)
    ax2.plot([0, 1], [1, 1], transform=ax2.transAxes, **kwargs)

    ax1.set_title("Best accuracy")
    ax1.set_ylabel("validation accuracy")
    plt.show()


display_summary(
    [d_base, d_wide, d_deep, d_sgd, d_l2, d_batch_norm, d_dropout],
    ["base", "wide", "deep", "SGD", "L2", "batch norm", "dropout"],
)

The performance of the baseline model is outperformed by a deep and wide neural network. This is expected as they have more parameters and are better able to capture the relevant classification rules. SGD has the smoothest training curve of all the training curves and is the only one that was not terminated by early stopping. A model trained this way could be improved by further training. Furthermore, we see that very good results were achieved by the batch normalized model. On the other hand, models with L2 and dropout regularizations do not work on this data. Probably these are too strong regularizations on a relatively limited neural network. In larger models with more parameters, they would probably help with overfitting.

Convolutional neural network

Model suitability

CNN is generally better suited for image data. We expect convolutional neural networks to achieve higher accuracy on this data than fully connected models. This is only a special case of the previous variant, however, by restricting to the location of neurons, the model can focus on more relevant information. We can also afford to increase the depth and width of the model without significantly increasing the total number of parameters.

Base network

class CNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 32, 3)
        self.conv2 = nn.Conv2d(32, 64, 3)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(2304, 64)
        self.fc2 = nn.Linear(64, 10)

    def forward(self, x):
        x = self.pool(relu(self.conv1(x)))
        x = self.pool(relu(self.conv2(x)))
        x = torch.flatten(x, 1)
        x = relu(self.fc1(x))
        x = self.fc2(x)
        return x


model = CNN()
optimizer = torch.optim.Adam(model.parameters())
d_base = train_model("base_cnn", model, optimizer)
display_result(d_base)
train_accuracy train_loss val_accuracy val_loss
8 0.911 0.234 0.875 0.381
9 0.917 0.222 0.868 0.414
10 0.921 0.210 0.867 0.432
11 0.924 0.199 0.870 0.435
12 0.926 0.196 0.872 0.436

Base CNN will now serve as the baseline for the convolutional network. We are already seeing a significant improvement over all variants of fully connected networks. Without modification, it achieves better validation accuracy than the best of the previous set of models. We also see that it converges very quickly.

Wide network

class WideCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 32, 3)
        self.conv2 = nn.Conv2d(32, 64, 3)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(2304, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.pool(relu(self.conv1(x)))
        x = self.pool(relu(self.conv2(x)))
        x = torch.flatten(x, 1)
        x = relu(self.fc1(x))
        x = self.fc2(x)
        return x


model = WideCNN()
optimizer = torch.optim.Adam(model.parameters())
d_wide = train_model("wide_cnn", model, optimizer)
display_result(d_wide)
train_accuracy train_loss val_accuracy val_loss
4 0.880 0.321 0.862 0.380
5 0.889 0.295 0.853 0.410
6 0.898 0.272 0.849 0.448
7 0.904 0.256 0.854 0.444
8 0.911 0.238 0.857 0.444

The model with a wider hidden layer offers no improvement (even a minor deterioration). Similar to the baseline model, we observe that training converges very quickly and that further training leads to overfitting.

Deep network

class DeepCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 64, 3)
        self.conv2 = nn.Conv2d(64, 128, 3)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(4608, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, 32)
        self.fc4 = nn.Linear(32, 10)

    def forward(self, x):
        x = self.pool(relu(self.conv1(x)))
        x = self.pool(relu(self.conv2(x)))
        x = torch.flatten(x, 1)
        x = relu(self.fc1(x))
        x = relu(self.fc2(x))
        x = relu(self.fc3(x))
        x = self.fc4(x)
        return x


model = DeepCNN()
optimizer = torch.optim.Adam(model.parameters())
d_deep = train_model("deep_cnn", model, optimizer)
display_result(d_deep)
train_accuracy train_loss val_accuracy val_loss
7 0.912 0.236 0.878 0.359
8 0.917 0.223 0.868 0.390
9 0.923 0.204 0.862 0.423
10 0.926 0.195 0.865 0.418
11 0.931 0.186 0.872 0.434

The model with multiple hidden layers leads to only a slight improvement. The training is highly fluctuating and we observe that the model would overfit with longer training.

SGD optimizer

model = CNN()
optimizer = torch.optim.SGD(model.parameters())
d_sgd = train_model("sgd_cnn", model, optimizer)
display_result(d_sgd)
train_accuracy train_loss val_accuracy val_loss
26 0.908 0.256 0.864 0.369
27 0.909 0.253 0.865 0.367
28 0.911 0.249 0.866 0.363
29 0.912 0.246 0.868 0.359
30 0.913 0.242 0.869 0.357

Similar to the fully-connected network, training the SGD optimizer is very stable (the model performance smoothly degrades). The training could continue further at rest (it converges more slowly with this optimizer).

L2 regularization

model = CNN()
optimizer = torch.optim.Adam(model.parameters(), weight_decay=1e-2)
d_l2 = train_model("l2_cnn", model, optimizer)
display_result(d_l2)
train_accuracy train_loss val_accuracy val_loss
12 0.874 0.343 0.869 0.346
13 0.875 0.343 0.860 0.370
14 0.876 0.340 0.868 0.353
15 0.876 0.339 0.869 0.350
16 0.877 0.336 0.868 0.349

L2 regularization does not lead to improvement, but it is very helpful for more stable training. The training curve is less volatile and for larger models it could help to generalize better. The difference between the training and validation curves is very small compared to previous models.

Batch normalization

class BatchNormCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 32, 3)
        self.conv2 = nn.Conv2d(32, 64, 3)
        self.conv1_bn = nn.BatchNorm2d(32)
        self.conv2_bn = nn.BatchNorm2d(64)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(2304, 64)
        self.fc2 = nn.Linear(64, 10)

    def forward(self, x):
        x = self.pool(relu(self.conv1_bn(self.conv1(x))))
        x = self.pool(relu(self.conv2_bn(self.conv2(x))))
        x = torch.flatten(x, 1)
        x = relu(self.fc1(x))
        x = self.fc2(x)
        return x


model = BatchNormCNN()
optimizer = torch.optim.Adam(model.parameters())
d_batch_norm = train_model("batch_norm_cnn", model, optimizer)
display_result(d_batch_norm)
train_accuracy train_loss val_accuracy val_loss
9 0.925 0.203 0.881 0.337
10 0.931 0.188 0.877 0.368
11 0.935 0.176 0.880 0.376
12 0.940 0.163 0.881 0.386
13 0.945 0.151 0.879 0.399

For the convolutional network with batch normalization, we observe a finer training curve and slightly better accuracy compared to the baseline model.

Dropout

class DropoutCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 32, 3)
        self.conv2 = nn.Conv2d(32, 64, 3)
        self.dropout = nn.Dropout2d(0.5)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(2304, 64)
        self.fc2 = nn.Linear(64, 10)

    def forward(self, x):
        x = self.pool(self.dropout(relu(self.conv1(x))))
        x = self.pool(self.dropout(relu(self.conv2(x))))
        x = torch.flatten(x, 1)
        x = relu(self.fc1(x))
        x = self.fc2(x)
        return x


model = DropoutCNN()
optimizer = torch.optim.Adam(model.parameters())
d_dropout = train_model("dropout_cnn", model, optimizer)
display_result(d_dropout)
train_accuracy train_loss val_accuracy val_loss
21 0.858 0.379 0.874 0.332
22 0.861 0.375 0.865 0.350
23 0.860 0.376 0.872 0.338
24 0.861 0.373 0.871 0.332
25 0.862 0.373 0.870 0.341

Dropout works better here than with fully connected. Again, we can assume that the model will generalize very well. It performs better on validation data than on training data for the same reason as fully connected (dropout is turned off during evaluation). This regularization helps a lot with the overfitting that we observe with the other models.

Summary

display_summary(
    [d_base, d_wide, d_deep, d_sgd, d_l2, d_batch_norm, d_dropout],
    ["base", "wide", "deep", "SGD", "L2", "batch norm", "dropout"],
)

It is clear that CNNs are generally more powerful on image data than fully connected networks. They are, on the other hand, more prone to overffiting. Batch norm regularization and deeper network again showed improvement over the baseline model. The other variants perform similarly. Most of them help with more stable training and better generalization.

Final model

In this final section we train the model for the final evaluation. This model combines all the elements used in the experiments. Compared to the previous CNN baseline model, we now use a very deep and wide network, which we regularise accordingly. Batch normalization worked very well in both neural network variants, so we use it here as well. This is a larger model with a lot of parameters, so we also implement dropout and L2 regularization, which should help avoid overfitting and soften the training curve. We also observed that the Adam optimizer converges very fast, but SGD offers more stable training. For this reason, we initially train the model with the Adam optimizer and then smoothly tune SGD for best accuracy.

class FinalCNNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.pool = nn.MaxPool2d(2, 2)

        self.conv1 = nn.Conv2d(1, 64, 2)
        self.conv1_bn = nn.BatchNorm2d(64)
        self.conv1_do = nn.Dropout2d(0.1)

        self.conv2 = nn.Conv2d(64, 128, 2)
        self.conv2_bn = nn.BatchNorm2d(128)
        self.conv2_do = nn.Dropout2d(0.3)

        self.conv3 = nn.Conv2d(128, 256, 2)
        self.conv3_bn = nn.BatchNorm2d(256)
        self.conv3_do = nn.Dropout2d(0.5)

        self.conv4 = nn.Conv2d(256, 512, 4)
        self.conv4_bn = nn.BatchNorm2d(512)
        self.conv4_do = nn.Dropout2d(0.5)

        self.fc1 = nn.Linear(4608, 512)
        self.fc1_bn = nn.BatchNorm1d(512)
        self.fc1_do = nn.Dropout(0.1)

        self.fc2 = nn.Linear(512, 256)
        self.fc2_bn = nn.BatchNorm1d(256)
        self.fc2_do = nn.Dropout(0.3)

        self.fc3 = nn.Linear(256, 128)
        self.fc3_bn = nn.BatchNorm1d(128)
        self.fc3_do = nn.Dropout(0.5)

        self.fc4 = nn.Linear(128, 64)
        self.fc4_bn = nn.BatchNorm1d(64)
        self.fc4_do = nn.Dropout(0.5)

        self.fc5 = nn.Linear(64, 10)

    def forward(self, x):
        x = self.pool(self.conv1_do(relu(self.conv1_bn(self.conv1(x)))))
        x = self.pool(self.conv2_do(relu(self.conv2_bn(self.conv2(x)))))
        x = self.conv3_do(relu(self.conv3_bn(self.conv3(x))))
        x = self.conv4_do(relu(self.conv4_bn(self.conv4(x))))

        x = torch.flatten(x, 1)
        x = self.fc1_do(relu(self.fc1_bn(self.fc1(x))))
        x = self.fc2_do(relu(self.fc2_bn(self.fc2(x))))
        x = self.fc3_do(relu(self.fc3_bn(self.fc3(x))))
        x = self.fc4_do(relu(self.fc4_bn(self.fc4(x))))
        x = self.fc5(x)
        return x


RETRAIN = False
max_epochs = 200
model = FinalCNNet()

optimizer0 = torch.optim.Adam(model.parameters(), weight_decay=1e-3)
optimizer1 = torch.optim.SGD(model.parameters(), weight_decay=1e-3)
optimizer2 = torch.optim.SGD(model.parameters())

d_final0 = train_model("final0", model, optimizer0, 4, RETRAIN)
d_final1 = train_model("final1", model, optimizer1, 4, RETRAIN)
d_final2 = train_model("final2", model, optimizer2, 4, RETRAIN)

display_result(pd.concat((d_final0, d_final1, d_final2)).reset_index())
index train_accuracy train_loss val_accuracy val_loss
149 39 0.909 0.282 0.914 0.246
150 40 0.908 0.283 0.914 0.246
151 41 0.909 0.281 0.914 0.246
152 42 0.908 0.284 0.914 0.246
153 43 0.910 0.281 0.914 0.246

Evaluation

Now that the final model is trained, we estimate its accuracy on the test data.

final = load_model("final2", FinalCNNet()).to(device)

y_test_proba_pred_path = outputs_path / "y_test_proba_pred.pickle"
y_test_proba_hat = predict_proba(test_dataloader, final, y_test_proba_pred_path)
y_test_hat = np.argmax(y_test_proba_hat, axis=1)

acc = np.round(accuracy_score(y_test, y_test_hat), 3)
print(f"Test accuracy: {acc}")
Test accuracy: 0.916

On the new data we can assume an accuracy of 91%.

report = pd.DataFrame(
    classification_report(y_test, y_test_hat, target_names=cats, output_dict=True)
).T
display(report)
precision recall f1-score support
T-shirt/top 0.858 0.887 0.872 1115.000
Trouser 0.995 0.994 0.995 1101.000
Pullover 0.882 0.871 0.877 1047.000
Dress 0.892 0.911 0.901 1028.000
Coat 0.861 0.867 0.864 1061.000
Sandal 0.989 0.967 0.978 1024.000
Shirt 0.771 0.743 0.756 1058.000
Sneaker 0.951 0.977 0.964 998.000
Bag 0.991 0.977 0.984 1044.000
Ankle boot 0.978 0.974 0.976 1024.000
accuracy 0.916 0.916 0.916 0.916
macro avg 0.917 0.917 0.917 10500.000
weighted avg 0.916 0.916 0.916 10500.000
def display_confusion_matrix(ax, norm=None):
    ConfusionMatrixDisplay.from_predictions(
        VEC_IL(y_test),
        VEC_IL(y_test_hat),
        labels=cats,
        normalize=norm,
        xticks_rotation="vertical",
        colorbar=False,
        cmap="Blues",
        ax=ax,
    )
    ax.set_title("Confusion matrix (final model, test data)")
    ax.set_xlabel("Predicted class")
    ax.set_ylabel("True class", size=16)
    ax.tick_params(axis="both", which="major")
    ax.grid(False)


fig, ax = plt.subplots(1, 1, figsize=(8, 8), layout="constrained")
display_confusion_matrix(ax)
plt.show()

We see that the model most often confuses Shirt with T-shirt/top. Also Coat/Shirt, Pullover/Shirt or Coat/Pullover combinations are problematic.

fig, axes = plt.subplots(4, 3, figsize=(10, 10), layout="constrained")
fig.suptitle("ROC and AUC")
fig.supxlabel("True positive rate")
fig.supylabel("False positive rate")
for i, (cat, ax) in enumerate(zip(cats, fig.axes[:10])):
    disp = RocCurveDisplay.from_predictions(
        y_test == i, y_test_proba_hat[:, i], ax=ax, name=f"{cat} v rest"
    )
    ax.set(xlabel=None, ylabel=None)
for ax in fig.axes[10:]:
    ax.remove()
plt.show()

The final model copes well with the Trouser, Ankle boot, Bag, Sneaker and Sandal classes (AUC = 1). However, it poorly distinguishes Shirt, Coat, Pullover and T-shirt/top.

Sample

edf = pd.read_csv(eval_file)
id = edf.loc[:, "ID"]
X_eval = edf.drop("ID", axis=1).to_numpy()

eval_dataloader = DataLoader(
    TensorDataset(torch.Tensor(X_eval).reshape((-1, 1, 32, 32))),
    batch_size=batch_size,
)

eval_pred_path = outputs_path / "eval_pred.pickle"
final = load_model("final2", FinalCNNet()).to(device)
y_eval = predict(eval_dataloader, final, eval_pred_path)

d = pd.DataFrame()
d["ID"] = id
d["label"] = y_eval
d.to_csv(res_file, index=False)

fig, ax = plt.subplots(4, 8, figsize=(10, 6))
fig.suptitle("Sample from evaluation.csv (inverted brightness)")
for j in range(4):
    for i in range(8):
        ax[j, i].imshow(X_eval[5 * j + i].reshape((32, 32)), cmap="Greys")
        ax[j, i].set_title(MAP_IL[d.loc[5 * j + i, "label"]], fontsize=9)
        ax[j, i].set_axis_off()
plt.show()

Predikce prvních 32 obrázků vypadají v pořádku. Zkusíme predikce zobrazit podle kategorií.

for c in range(10):
    fig, axes = plt.subplots(2, 10, figsize=(10, 2))
    fig.suptitle(f"{MAP_IL[c]}")

    d = X_eval[y_eval == c]
    for i in range(10):
        axes[0][i].imshow(d[i].reshape((32, 32)), cmap="Greys")
        axes[0][i].set_axis_off()
        axes[1][i].imshow(d[i + 10].reshape((32, 32)), cmap="Greys")
        axes[1][i].set_axis_off()
    plt.show()