Working with What You Have

Open In Colab

Time. 10:00 - 12:00 (2h).

Goal. Start from a dataset and ask: how much can we squeeze out of it? We will train a classifier on a motor imagery EEG dataset, then try four standard fixes to improve the performance that all stay inside the same dataset:

  1. Better features: Riemannian tangent space on covariance matrices.
  2. Data augmentation (PML 19.1): generate new data by transforming the ones we have.
  3. Semi-supervised pseudo-labeling (PML 19.3): exploit the unlabeled trials from the same dataset.
  4. Weakly supervised label smoothing (PML 19.7): tolerate noise when the labels are imperfect.

0. Setup

Run the install cell once. We use:

  • moabb: Mother of all BCI benchmarks for downloading dataset.
  • braindecode: Deep learning models for EEG. Used in §7 for a small model.
  • pyriemann: Riemannian geometry tools for EEG covariance matrices. Used in §4.
  • seaborn: plotting.
%%capture
!pip install -q moabb==1.1.0 braindecode==0.8.1 pyriemann==0.6 seaborn
import os, copy

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import mne
import torch

from moabb import datasets, paradigms
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.pipeline import make_pipeline
from sklearn.model_selection import cross_val_score
from scipy.signal import welch

from pyriemann.estimation import Covariances
from pyriemann.tangentspace import TangentSpace

from braindecode.models import EEGNetv4
from braindecode import EEGClassifier

mne.set_log_level("WARNING")
np.random.seed(42); torch.manual_seed(42)
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Setup complete. Device: {device}")
#@title Helpers (run once, then collapse) { display-mode: "form" }

BANDS = (("alpha", 8, 13), ("beta", 13, 30))


def extract_bandpower(Xin, sfreq=160, bands=BANDS):
    """Log bandpower per trial × channel × band, flattened."""
    f, psd = welch(Xin, fs=sfreq, nperseg=min(256, Xin.shape[-1]), axis=-1)
    powers = np.stack([psd[..., (f >= lo) & (f <= hi)].mean(-1) for _, lo, hi in bands], axis=-1)
    return np.log(powers + 1e-12).reshape(len(Xin), -1).astype(np.float32)


def balanced_split(X_t, y_t, n_per_class, seed=0):
    """Class-balanced split: n_per_class trials per class as 'train', the rest as 'test'."""
    rng = np.random.RandomState(seed)
    chosen = np.concatenate([
        rng.choice(np.where(y_t == c)[0], size=n_per_class, replace=False)
        for c in np.unique(y_t)
    ])
    rest = np.setdiff1d(np.arange(len(y_t)), chosen)
    return X_t[chosen], y_t[chosen], X_t[rest], y_t[rest]


def lr_pipe(C=1.0):
    return make_pipeline(StandardScaler(), LogisticRegression(max_iter=1000, C=C))


def corrupt_labels(y, noise_rate, seed=0):
    """Flip noise_rate * len(y) labels (binary case)."""
    flip = np.random.RandomState(seed).rand(len(y)) < noise_rate
    return np.where(flip, 1 - y, y)


def few_label_curve(features, y_arr, n_list=(5, 10, 20, 40, 60), n_seeds=20, cv=3):
    """For each n in n_list, subsample n trials per class, cross-validate LR, return mean ± std."""
    rows = []
    for n in n_list:
        accs = []
        for seed in range(n_seeds):
            rng = np.random.RandomState(seed)
            chosen = np.concatenate([
                rng.choice(np.where(y_arr == c)[0], size=n, replace=False)
                for c in np.unique(y_arr)
            ])
            scores = cross_val_score(lr_pipe(), features[chosen], y_arr[chosen], cv=cv)
            accs.append(scores.mean())
        rows.append({"n_per_class": n, "mean": np.mean(accs), "std": np.std(accs)})
    return pd.DataFrame(rows)


sns.set_theme(context="notebook", style="whitegrid")

1. Data

We use the PhysioNet Motor Imagery dataset. Each trial is a few seconds of EEG recorded while the subject was imagining either a left hand or a right hand movement.

dataset = datasets.PhysionetMI()
paradigm = paradigms.LeftRightImagery()

# About 500 MB the first time (subjects 1-10).
subjects = list(range(1, 11))
X, y, metadata = paradigm.get_data(dataset=dataset, subjects=subjects)
X = X.astype(np.float32)
y_int = LabelEncoder().fit_transform(y)

sfreq = 160
n_chans, n_times = X.shape[1], X.shape[2]
print(f"X shape: {X.shape}")
print(f"Subjects available: {sorted(metadata['subject'].unique())}")

For this notebook we work only with subject 1.

mask_subj1 = (metadata["subject"] == 1).values
X_s1, y_s1 = X[mask_subj1], y_int[mask_subj1]
print(f"S1: {len(y_s1)} trials, class balance: {dict(zip(*np.unique(y_s1, return_counts=True)))}")

Can you tell left- from right-hand imagery from the raw signal?

times = np.arange(n_times) / sfreq
idx_left  = np.where(y[mask_subj1] == "left_hand")[0][0]
idx_right = np.where(y[mask_subj1] == "right_hand")[0][0]

fig, axes = plt.subplots(1, 2, figsize=(11, 3.5), sharey=True)
for ax, idx, label in zip(axes, [idx_left, idx_right], ["left_hand", "right_hand"]):
    ax.plot(times, X_s1[idx, 4], label="C3 (left motor)", alpha=0.8)
    ax.plot(times, X_s1[idx, 5], label="C4 (right motor)", alpha=0.8)
    ax.set_title(f"Imagined {label}"); ax.set_xlabel("Time (s)"); ax.legend()
plt.show()

Probably not! The signal is there, like alpha (8–13 Hz) and beta (13–30 Hz) over the contralateral motor cortex, but it’s buried in noise. We need to extract it.

Use band power for alpha and beta on each of the 64 channels. Log-scale it because EEG power is heavy-tailed.

bp_s1 = extract_bandpower(X_s1)
print(f"Bandpower features: {bp_s1.shape}  (n_trials, 64 channels × 2 bands)")

2. Baseline Classifier

Now we train and test a classifier on S1 with all of the labels available. Input: bandpower features, output: left vs right. lr_pipe is a linear logistic regression.

baseline_scores = cross_val_score(lr_pipe(), bp_s1, y_s1, cv=5)
print(f"Within-S1 5-fold accuracy: {baseline_scores.mean():.3f} (std {baseline_scores.std():.3f})")

You should see something in the range of 0.70 to 0.85.


3. Failure: Not Enough Labels

Now take away the labels. What if S1 had only 5, 10, or 20 trials per class? Fit the same pipeline, repeat for 20 times for the error bars.

learning_curve = few_label_curve(bp_s1, y_s1)
learning_curve
sns.lineplot(data=learning_curve, x="n_per_class", y="mean", marker="o")
plt.axhline(0.5, ls="--", c="k", alpha=0.5)
plt.show()

A monotonic climb: more labels means better accuracy. With 5 trials per class the model is barely above chance. With 60 it approaches the all-labels baseline.

This is the central failure of Ch. 19. Every method in this notebook is a different way to make the curve climb faster.


4. Fix 1: Better Features

A fix that needs neither external data nor another model: feature engineering. Each trial gives a covariance matrix across channels (shape = ch x ch). Those matrices live on a curved manifold (i.e., constrained by some rules). Project data to a flat tangent space and a standard classifier can use them. This method is competitive on motor imagery task for over a decade.

cov_s1 = Covariances(estimator="oas").fit_transform(X_s1)
ri_s1 = TangentSpace().fit_transform(cov_s1).astype(np.float32)
print(f"Riemannian features: {ri_s1.shape}")

Re-run the same pipeline with this new feature.

curves = pd.concat({
    "bandpower":  few_label_curve(bp_s1, y_s1),
    "Riemannian": few_label_curve(ri_s1, y_s1),
}, names=["features"]).reset_index(level=0)

sns.lineplot(data=curves, x="n_per_class", y="mean", hue="features", marker="o")
plt.axhline(0.5, ls="--", c="k", alpha=0.5)
plt.show()

The tangent features sit above bandpower. Same data, better features. The gap is largest at small label budgets.


5. Fix 2: Augmentation

Augmentation is another fix that needs no external data. we apply some transformation to the data, and assume “those transformations do change the label”, apply them during training, and let the model learn. More data, no new labels!

Four standard EEG augmentations:

def time_shift(x, max_shift=20, rng=None):
    rng = rng or np.random
    return np.roll(x, rng.randint(-max_shift, max_shift + 1), axis=-1)


def channel_dropout(x, p=0.1, rng=None):
    """Zero out each channel with probability p."""
    rng = rng or np.random
    out = x.copy()
    out[rng.rand(x.shape[0]) <= p] = 0
    return out


def add_gaussian_noise(x, sigma=0.1, rng=None):
    """Add white noise scaled by sigma x trial std."""
    rng = rng or np.random
    return x + rng.randn(*x.shape).astype(x.dtype) * x.std() * sigma


def mixup_pair(x1, x2, alpha=0.2, rng=None):
    """Linear mix of two trials of the same label."""
    rng = rng or np.random
    lam = rng.beta(alpha, alpha) if alpha > 0 else 1.0
    return (lam * x1 + (1 - lam) * x2).astype(x1.dtype)


def augment_trials(X_in, y_in, n_copies=3, rng=None):
    """Helper to apply multiple augmentation techniques."""
    rng = rng or np.random.RandomState(0)
    X_list, y_list = [X_in], [y_in]
    for _ in range(n_copies):
        X_aug = np.stack([
            add_gaussian_noise(channel_dropout(time_shift(x, rng=rng), p=0.05, rng=rng),
                               sigma=0.05, rng=rng)
            for x in X_in
        ])
        X_list.append(X_aug); y_list.append(y_in)
    return np.concatenate(X_list), np.concatenate(y_list)

E.g., one trial before and after time-shifting:

trial = X_s1[0, 4]
plt.plot(times, trial, alpha=0.6, label="original")
plt.plot(times, time_shift(trial[None, :])[0], alpha=0.8, label="time-shifted")
plt.legend(); plt.xlabel("Time (s)"); plt.show()

Now testing. Train two pipelines on S1 data: baseline (original trials) and augmented (original + augmented copies).

def few_label_aug(X_pool, y_pool, n_per_class, augment, n_seeds=10):
    accs = []
    for seed in range(n_seeds):
        rng = np.random.RandomState(seed)
        chosen = np.concatenate([
            rng.choice(np.where(y_pool == c)[0], size=n_per_class, replace=False)
            for c in np.unique(y_pool)
        ])
        X_tr_raw, y_tr = X_pool[chosen], y_pool[chosen]
        if augment:
            X_tr_raw, y_tr = augment_trials(X_tr_raw, y_tr, n_copies=3, rng=rng)
        X_tr = extract_bandpower(X_tr_raw)
        rest = np.setdiff1d(np.arange(len(y_pool)), chosen)
        X_te = extract_bandpower(X_pool[rest])
        accs.append(lr_pipe().fit(X_tr, y_tr).score(X_te, y_pool[rest]))
    return np.mean(accs), np.std(accs)


aug_rows = []
for n in [5, 10, 20]:
    for method, augment in [("plain", False), ("+ augmentation", True)]:
        m, s = few_label_aug(X_s1, y_s1, n, augment=augment)
        aug_rows.append({"n_per_class": n, "method": method, "mean": m, "std": s})
augmentation_curve = pd.DataFrame(aug_rows)
augmentation_curve
sns.lineplot(data=augmentation_curve, x="n_per_class", y="mean", hue="method", marker="o")
plt.axhline(0.5, ls="--", c="k", alpha=0.5)
plt.show()

The augmented line sits above the baseline, especially at n=5 and n=10; the gap shrinks as labeled data grows. That is the canonical signature of a working augmentation.


6. Fix 3: Semi-supervised Pseudo-labeling

S1 data has many trials. We pretended only some came with labels, but the unlabeled ones are still there. And their distribution matches the labeled ones. Pseudo-labeling is the simplest semi-supervised strategy to exploit that.

  1. Train the model on the labeled set.
  2. Predict on the unlabeled data. Keep high-confidence predictions.
  3. Add them to training. Retrain.
def pseudo_label_run(features, y_pool, n_labeled_per_class,
                     n_rounds=4, threshold=0.85, seed=0):
    """Iteratively add high-confidence pseudo-labels. Returns per-round history."""
    test_idx = (pd.Series(np.arange(len(y_pool))).groupby(y_pool)
                .sample(frac=1/3, random_state=seed).to_numpy())
    nontest = np.setdiff1d(np.arange(len(y_pool)), test_idx)
    labeled_idx = (pd.Series(nontest).groupby(y_pool[nontest])
                   .sample(n=n_labeled_per_class, random_state=seed).to_numpy())
    pool_idx = np.setdiff1d(nontest, labeled_idx)

    X_test, y_test = features[test_idx], y_pool[test_idx]
    sup_acc = lr_pipe().fit(features[labeled_idx], y_pool[labeled_idx]).score(X_test, y_test)

    train_idx = list(labeled_idx)
    pool = set(pool_idx.tolist())
    history = [{"round": 0, "n_train": len(train_idx),
                "supervised": sup_acc, "pseudo": sup_acc}]

    for r in range(1, n_rounds + 1):
        if not pool:
            break
        pipe = lr_pipe().fit(features[train_idx], y_pool[train_idx])
        pool_arr = np.array(sorted(pool))
        proba = pipe.predict_proba(features[pool_arr])
        confident = proba.max(axis=1) >= threshold
        if not confident.any():
            break
        new_idx = pool_arr[confident]
        train_idx.extend(new_idx.tolist())
        pool -= set(new_idx.tolist())
        history.append({"round": r, "n_train": len(train_idx),
                        "supervised": sup_acc,
                        "pseudo": pipe.score(X_test, y_test)})
    return pd.DataFrame(history)


hist = pseudo_label_run(bp_s1, y_s1, n_labeled_per_class=10, n_rounds=4, threshold=0.85)
hist
sns.lineplot(data=hist.melt(id_vars="round", value_vars=["supervised", "pseudo"],
                            var_name="model", value_name="accuracy"),
             x="round", y="accuracy", hue="model", marker="o")
plt.axhline(0.5, ls="--", c="k", alpha=0.5)
plt.show()

The pseudo-labeled curve sits above the baseline. If your run shows it below baseline, raise the threshold to 0.90.

Note. Pseudo-labeling amplifies your initial bias. If the labeled set is unrepresentative, the pseudo-labels will be too, and the model gets confidently worse. The relevant literature (consistency regularization, FixMatch, MixMatch) is largely about preventing this bias.


7. Fix 4: Weakly Supervised Label Smoothing

Real labels are messy. Motor imagery: we cannot verify what the subject actually imagined. Maybe they got distracted, maybe they imagined both hands, maybe the experimenter mislabeled the trial. If the model tries to fit those labels perfectly, it will overfit to noise and fail to generalize.

The simplest fix: label smoothing. Standard cross-entropy says “class 1, 100% certain”; label smoothing says “class 1, 90% certain, class 2 10%”.

For this section we briefly train a small EEGNet from scratch. The details don’t matter; the point is that the loss function has a label_smoothing hyperparameter that we can toggle on and off.

def build_small_eegnet(label_smoothing=0.0, epochs=15, lr=0.001):
    return EEGClassifier(
        EEGNetv4,
        module__n_chans=n_chans, module__n_outputs=2, module__n_times=n_times,
        criterion=torch.nn.CrossEntropyLoss, criterion__label_smoothing=label_smoothing,
        optimizer=torch.optim.AdamW, optimizer__lr=lr, optimizer__weight_decay=0.01,
        train_split=None, batch_size=32, max_epochs=epochs, device=device, verbose=0,
    )

We corrupt part of the training labels and train two models (CE vs CE + smoothing).

X_tr, y_tr, X_te, y_te = balanced_split(X_s1, y_s1, n_per_class=30, seed=0)

noise_rows = []
for nr in [0.0, 0.1, 0.2, 0.3]:
    for ls in [0.0, 0.1]:
        accs = []
        for s in range(3):
            y_noisy = corrupt_labels(y_tr, nr, seed=s)
            clf = build_small_eegnet(label_smoothing=ls)
            accs.append(clf.fit(X_tr, y_noisy).score(X_te, y_te))
        noise_rows.append({"noise_rate": nr, "smoothing": ls,
                           "mean": np.mean(accs), "std": np.std(accs)})
noise_curve = pd.DataFrame(noise_rows)
noise_curve
sns.lineplot(data=noise_curve, x="noise_rate", y="mean",
             hue="smoothing", marker="o", palette="deep")
plt.axhline(0.5, ls="--", c="k", alpha=0.5)
plt.show()

At 0% noise the two models are about the same. As noise increases, the standard model drops sharply; the smoothed one drops gradually. By 30% noise, the gap is clear.

Cog-sci framing. “Trust your sources but not absolutely.” Other methods also exist (co-teaching, MentorNet, noise-robust losses).


8. Takeaways

Left: How much does each within-dataset strategy improve over the baseline?

Right: how does label smoothing improves as noise climbs?

N = 10  # trials per class for left panel

def acc_at_N(features, y_arr, n_per_class=N, n_seeds=10, augment_fn=None, raw=None):
    accs = []
    for seed in range(n_seeds):
        rng = np.random.RandomState(seed)
        chosen = np.concatenate([
            rng.choice(np.where(y_arr == c)[0], size=n_per_class, replace=False)
            for c in np.unique(y_arr)
        ])
        X_tr_raw = raw[chosen] if raw is not None else None
        y_tr = y_arr[chosen]
        if augment_fn is not None:
            X_tr_raw, y_tr = augment_fn(X_tr_raw, y_tr, n_copies=3, rng=rng)
            X_tr = extract_bandpower(X_tr_raw)
        else:
            X_tr = features[chosen]
        rest = np.setdiff1d(np.arange(len(y_arr)), chosen)
        X_te = (extract_bandpower(raw[rest]) if augment_fn is not None else features[rest])
        accs.append(lr_pipe().fit(X_tr, y_tr).score(X_te, y_arr[rest]))
    return np.mean(accs)


# Pseudo-labels
def acc_pseudo(features, y_arr, n_per_class=N, n_seeds=10):
    accs = []
    for seed in range(n_seeds):
        h = pseudo_label_run(features, y_arr, n_per_class, n_rounds=4, threshold=0.85, seed=seed)
        accs.append(h["pseudo"].iloc[-1])
    return np.mean(accs)


summary = pd.DataFrame([
    {"technique": "LR baseline",     "accuracy": acc_at_N(bp_s1, y_s1)},
    {"technique": "Riemannian",      "accuracy": acc_at_N(ri_s1, y_s1)},
    {"technique": "+ augmentation",  "accuracy": acc_at_N(bp_s1, y_s1, augment_fn=augment_trials, raw=X_s1)},
    {"technique": "+ pseudo-labels", "accuracy": acc_pseudo(bp_s1, y_s1)},
])

fig, axes = plt.subplots(1, 2, figsize=(13, 4))
sns.barplot(data=summary, x="technique", y="accuracy", ax=axes[0])
axes[0].axhline(0.5, ls="--", c="k", alpha=0.5)
axes[0].set_title(f"Label scarcity (N={N} per class)")

sns.lineplot(data=noise_curve, x="noise_rate", y="mean", hue="smoothing", marker="o", ax=axes[1])
axes[1].axhline(0.5, ls="--", c="k", alpha=0.5)
axes[1].set_title("Label noise (EEGNet)")
plt.show()

Each technique gives a few points over the baseline; they target different failures. Better feature engineering and pseudo-labels are the largest wins on this specific dataset. Augmentation helps, and label smoothing only works once labels are actually noisy.