In this article we aim to reproduce the results presented in the paper Variational Autoencoders: A Hands-Off Approach to Volatility, by Maxime Bergeron, Nicholas Fung, John Hull, and Zissis Poulos, where the authors apply variational autoencoders to implied volatility surfaces. We will not use real market data, but rather the Heston model, which was discussed in a previous article; in another article we have covered in variational autoencoders in depth, to which we refer for more details.

The procedure is composed by three parts:

  • in the first part, we look at how to generate implied volatility surfaces for the Heston model using the python binding of https://www.quantlib.org/;
  • in the second part, we create the dataset by generating a large number of such surfaces, using some predefined ranges for the Heston parameters;
  • in the third part, we train the variational autoencoder and check the quality of the results.

This notebook was run using the following environment:

conda create --name volatility-surface-generator python==3.9 --no-default-packages -y
conda activate volatility-surface-generator
pip install torch numpy sk-learn scipy matplotlib torch torchvision seaborn QuantLib ipykernel nbconvert tqdm ipywidgets

In the first part we define some helper functions and classes to connect to QuantLib. The parameters for the Heston model are contained in a namedtuple; the QuantLib surfaces is created by the function generate_ql_surface(), while compute_smile() computes a smile in strike space for a given maturity.

from collections import namedtuple
import numpy as np
from pathlib import Path
import QuantLib as ql
from matplotlib import cm
from matplotlib import pyplot as plt
import matplotlib.colors as mcolors
from mpl_toolkits.mplot3d import Axes3D
from scipy.stats import norm
Φ = norm.cdf
Model = namedtuple('Model', 'model_date, S_0, r, q, V_0, κ, θ, ζ, ρ')
def generate_ql_surface(model):
    # model coefficients
    model_date, S_0, r, q, V_0, κ, θ, ζ, ρ = model
    # rates and dividends, constant
    r_ts = ql.YieldTermStructureHandle(ql.FlatForward(model_date, r, ql.Actual365Fixed()))
    q_ts = ql.YieldTermStructureHandle(ql.FlatForward(model_date, q, ql.Actual365Fixed()))
    # Heston process, all params constant
    process = ql.HestonProcess(r_ts, q_ts, ql.QuoteHandle(ql.SimpleQuote(S_0)), V_0, κ, θ, ζ, ρ)
    heston_model = ql.HestonModel(process)
    heston_handle = ql.HestonModelHandle(heston_model)
    return ql.HestonBlackVolSurface(heston_handle)

The compute_smile function takes a built surface and looks at the at-the-money volatility for the specified expiry. Using this volatility it builds a vector of strikes that are close enough in probability to the forward, and computes the Black-Scholes implied volatility at those strikes. By default we do it with 101 points.

def compute_smile(model, ql_surface, period, factor=6, num_points=101):
    expiry_date = model.model_date + ql.Period(period)
    T = (expiry_date - model.model_date) / 365
    F = model.S_0 * np.exp((model.r - model.q) * T)
    σ_ATM = ql_surface.blackVol(expiry_date, F)
    width = factor * σ_ATM * np.sqrt(T)
    σ_all = []
    K_all = np.linspace(min(model.S_0, F) * np.exp(-width), max(model.S_0, F) * np.exp(width), num_points)
    for K in K_all:
        σ_all.append(ql_surface.blackVol(T, K))
    return T, F, K_all, σ_all

The small utility function get_date generates a random date in the QuantLib format. For what we do here the date is not important but it is still required by QuantLib.

def get_date():
    day = np.random.randint(1, 28)
    month = np.random.randint(1, 13)
    year = np.random.randint(2000, 2030)
    return ql.Date(day, month, year)

As an example, we visualize one such surface in strike and maturity space.

model = Model(get_date(), S_0=100.0, r=0.05, q=0.03, V_0=0.1**2, κ=0.25, θ=0.1**2, ζ=0.5, ρ=0.5)
ql_surface = generate_ql_surface(model)
X, Y, Z, C = [], [], [], []
colors = list(mcolors.TABLEAU_COLORS.values())
for color, period in zip(colors, ['1M', '3M', '6M', '1Y', '2Y', '3Y', '4Y', '5Y']):
    T, _, K_all, σ_all = compute_smile(model, ql_surface, period)
    X.append([T] * len(K_all))
    Y.append(K_all)
    Z.append(σ_all)
    C.append([color] * len(K_all))
fig, ax = plt.subplots(subplot_kw={'projection': '3d'}, figsize=(8, 8))
ax.plot_surface(np.array(X), np.array(Y), np.array(Z), facecolors=C)
ax.view_init(elev=20, azim=10, roll=0)
ax.set_xlabel('T')
ax.set_ylabel('K')
ax.set_zlabel('$σ_{impl}$');

png

The problem with strikes is that they change for each maturity; they also depend on the volatility itself, while we would prefer a fixed grid as input to the autoencoder. To do that, we use deltas instead of strikes. The delta can be roughly interpreted as the probability of the option of ending up in-the-money. The delta can be defined in several ways; here we will use the so-called simple delta,

\[\delta = \Phi\left( \frac{\log\frac{F}{K}}{\sigma \sqrt{T} } \right),\]

where $T$ is the option expiry, F$ is the forward at $T$, and $\sigma$ the implied volatility defined by the surface itself. This is done by the compute_simple_delta_surface function below.

def compute_simple_delta_surface(model, num_deltas, periods):
    ql_surface = generate_ql_surface(model)
    d_grid = np.linspace(0.1, 0.9, num_deltas)
    X, Y, Z = [], [], []
    for period in periods:
        # compute on a strike grid
        T, F, K_all, σ_all = compute_smile(model, ql_surface, period)
        # interpolate on the simple delta grid
        d_all = Φ(np.log(K_all / F) / σ_all / np.sqrt(T))
        σ_grid = np.interp(d_grid, d_all, σ_all)
        # add to result
        X.append([T] * len(σ_grid))
        Y.append(d_grid)
        Z.append(σ_grid)
    return np.array(X), np.array(Y), np.array(Z)
model = Model(get_date(), S_0=1.0, r=-0.05, q=0.0, V_0=0.1**2, κ=0.25, θ=0.1**2, ζ=1, ρ=0.8)
periods = ['1M', '2M', '3M', '4M', '5M', '6M', '7M', '8M', '9M', '1Y', '2Y', '3Y', '4Y', '5Y']
X, Y, Z = compute_simple_delta_surface(model, 51, periods)
print(f'Surface shape: {Z.shape[0]}x{Z.shape[1]}')
fig, (ax0, ax1) = plt.subplots(figsize=(12, 5), ncols=2)
for y, z in zip(Y, Z):
    ax0.plot(z, y)
ax0.set_xlabel('Implied Vol'); ax0.set_ylabel('Simple Delta')
ax1.contour(X, Y, Z, colors='k', levels=25, alpha=0.5)
ax1.contourf(X, Y, Z, cmap='binary', levels=25)
ax1.set_xlabel('T'); ax1.set_ylabel('Simple Delta')
fig.tight_layout()
Surface shape: 14x51

png

We are ready for the second part. The Heston model depends on seven parameters, for which we define the following bounds:

  • for the domestic interest rate, we take $r \in [-0.05, 0.05]$;
  • for the continuously compounded dividends, we take $q \in [-0.05, 0.05]$;
  • for the square root of the initial variance $V_0$, we have $\sqrt{V_0} \in [0.01, 0.5]$;
  • for the mean reversion, we use $\kappa \in [0.05, 2]$;
  • for the square root of the long-term variance, we consider $\sqrt{\theta} \in [0.01, 0.5]$;
  • for the volatility of the variance process, we use $\zeta \in [0.01, 3]$;
  • for the correlation, we take $\rho \in [-0.5, 0.5]$.
from scipy.stats import qmc, uniform
from tqdm import tqdm, trange
import pickle
bounds = np.array([
    [-0.01, 0.01],  # r 
    [-0.01, 0.01],  # q
    [0.01, 0.5],  # sqrt(V_0)
    [0.05, 2.0],  # κ
    [0.01, 0.5],  # sqrt(θ)
    [0.01, 3],  # ζ
    [-0.5, +0.5],  # ρ
])

We use Sobol sequences to span through this sever-dimensional space. For each combination of the parameters, we need to sample the surface over some deltas and maturities. For the detlas, we use $\mathfrak{D} = { \delta_1, \ldots, \delta_d }$,with $\delta_1 = 0.1$, $\delta_d$ = 0.9, and the other points equally spaced, with $d=12$. This number is a bit small; much larger samples are needed for better results but we limit ourselves to small sample sizes.

For the maturities, we take $\mathfrak{M} = { \texttt{1M}, \texttt{2M}, \texttt{3M}, \texttt{4M}, \texttt{5M}, \texttt{6M}, \texttt{7M}, \texttt{8M}, \texttt{9M}, \texttt{1Y}, \texttt{18M}, \texttt{2Y}, \texttt{3Y}, \texttt{4Y}, \texttt{5Y} }$. The resulting surface is defined on a space $\mathfrak{Z} = \mathfrak{D} \times \mathfrak{M}$ and has dimension $20 \times 15 = 300$.

sobol = qmc.Sobol(d=7, scramble=False)
W_all = sobol.random_base2(m=12)
W_all = qmc.scale(W_all, bounds[:, 0], bounds[:, 1])
print(f'Generated {len(W_all)} entries.')
Generated 4096 entries.
periods = ['1M', '2M', '3M', '6M', '9M', '1Y', '2Y', '3Y']

model_date = get_date()
num_deltas = 5
num_tenors = len(periods)
filename = Path('surfaces.pickle')

import pickle
if filename.exists():
    with open(filename, 'rb') as f:
        params, XX, YY, surfaces = pickle.load(f)
        assert len(params) == len(surfaces)
        print(f'Read {len(surfaces)} surfaces.')
else:
    params, surfaces = [], []
    for r, q, sqrt_V_0, κ, sqrt_θ, ζ, ρ in tqdm(W_all):
        V_0 = sqrt_V_0**2
        θ = sqrt_θ**2
        # to limit by the Feller ratio
        # if κ * θ > ζ**2 / 2:
        model = Model(model_date, S_0=1, r=r, q=q, V_0=V_0, κ=κ, θ=θ, ζ=ζ, ρ=ρ)
        params.append((r, q, V_0, κ, θ, ζ, ρ))
        XX, YY, surface = compute_simple_delta_surface(model, num_deltas, periods)
        surfaces.append(surface)
    num_surfaces = len(surfaces)
    with open(filename, 'wb') as f:
        pickle.dump((params, XX, YY, surfaces), f)
surfaces = np.array(surfaces)
print(f'# surfaces: {num_surfaces}, min vol: {surfaces.min()}, max vol: {surfaces.max()}')
100%|██████████| 4096/4096 [01:46<00:00, 38.35it/s]

# surfaces: 4096, min vol: 0.0016473183503660689, max vol: 0.7606227339577064

We randomly split the generated data into a train dataset, containing 95% of the data, and a test dataset with the remainig 5% of the data. Using historical surfaces we would have split chronologically, however in this case a random split is enough.

from sklearn.model_selection import train_test_split
params_train, params_test, surfaces_train, surfaces_test = train_test_split(params, surfaces, test_size=0.05, random_state=42)
print(f"Using {len(surfaces_train)} for training and {len(surfaces_test)} for testing.")
Using 3891 for training and 205 for testing.

Although the numbers are already reasonably well scaled, being all in the range (0, 1), we make sure they are centered in zero and have unitary standard deviation.

class Scaler:

    def fit(self, X):
        self.mean = X.mean()
        self.std = X.std()
    
    def transform(self, X):
        return (X - self.mean) / self.std

    def inverse_transform(self, X):
        return X * self.std + self.mean

    def fit_transform(self, X):
        self.fit(X)
        return self.transform(X)
scaler = Scaler()
scaled_surfaces_train = scaler.fit_transform(surfaces_train)
scaled_surfaces_test = scaler.transform(surfaces_test)

As baseline we use principal component analysis, or PCA. A few components are enough to take into account almost all variance, so we set of 9 components. The method as implemented by scikit-learn is extremely simple and fast. Even if the results of the PCA aren’t exceptionally good, its simplicity, speed and numerical robustness make it a very good baseline.

from sklearn.decomposition import PCA

PCA requires vectors, not matrices, therefore we need to flatten the data by lumping together the second and third dimensions.

X_train = np.array([k.flatten() for k in scaled_surfaces_train])
X_test = np.array([k.flatten() for k in scaled_surfaces_test])
num_components = 9
pca = PCA(n_components=num_components, whiten=False).fit(X_train)
plt.figure(figsize=(6, 4))
plt.plot(np.cumsum(pca.explained_variance_ratio_), 'o-')
plt.xlabel('number of components')
plt.ylabel('cumulative explained variance')
plt.axhline(y=0.75, linestyle='dashed', color='orange')
plt.axhline(y=0.85, linestyle='dashed', color='salmon')
plt.axhline(y=0.95, linestyle='dashed', color='red')
plt.grid();

png

As seens in the visualizition below, components number 0 and 1 are the main shape of the surface, component number 2 is the skew in delta space, component number 3 the convexity in delta space, with the remaining ones giving the additional cross effects between the different parts of the surface.

components = pca.components_.reshape((-1, num_tenors, num_deltas))

fig, axes = plt.subplots(subplot_kw={'projection': '3d'}, 
                         figsize=(12, 12), nrows=3, ncols=3,
                         sharex=True, sharey=True)
axes = axes.flatten()
for i, ax in enumerate(axes.flatten()):
    ax.plot_surface(XX, YY, components[i])
    ax.view_init(elev=20, azim=10, roll=0)
    ax.set_xlabel('T')
    ax.set_ylabel('Δ')
    ax.set_zlabel('$σ_{impl}$');
    ax.set_title(f'Component #{i}', fontsize=10)
fig.tight_layout()

png

from sklearn.metrics import mean_absolute_error, mean_absolute_percentage_error
X_test_proj_pca = pca.inverse_transform(pca.transform(X_test))
surfaces_proj_pca = [scaler.inverse_transform(k.reshape(num_tenors, num_deltas)) for k in X_test_proj_pca]
mean_error_proj_pca = 10_000 * np.mean(abs(surfaces_test - surfaces_proj_pca), axis=0)
q95_error_proj_pca = 10_000 * np.quantile(abs(surfaces_test - surfaces_proj_pca), 0.95, axis=0)
max_error_proj_pca = 10_000 * np.max(abs(surfaces_test - surfaces_proj_pca), axis=0)

fig, (ax0, ax1, ax2) = plt.subplots(subplot_kw={'projection': '3d'}, figsize=(10, 6), ncols=3)

ax0.plot_surface(XX, YY, mean_error_proj_pca)
ax0.view_init(elev=20, azim=10, roll=0)
ax0.set_xlabel('T')
ax0.set_ylabel('Δ')
ax0.set_zlabel('Error (bps)');
ax0.set_title(f'Mean error: {mean_error_proj_pca.mean():.2f} (bps)')

ax1.plot_surface(XX, YY, q95_error_proj_pca)
ax1.view_init(elev=20, azim=10, roll=0)
ax1.set_xlabel('T')
ax1.set_ylabel('Δ')
ax1.set_zlabel('90% Quantile (%)');
ax1.set_title(f'Mean 95% quantile: {q95_error_proj_pca.max():.2f} (bps)')

ax2.plot_surface(XX, YY, max_error_proj_pca)
ax2.view_init(elev=20, azim=10, roll=0)
ax2.set_xlabel('T')
ax2.set_ylabel('Δ')
ax2.set_zlabel('Relative Error (%)');
ax2.set_title(f'Max error: {max_error_proj_pca.max():.2f} (bps)')

fig.tight_layout;

png

from scipy.optimize import minimize
def fit_pca(surface_target):
    surface_target = surface_target.flatten()
    
    def func(x):
        x_pred = scaler.inverse_transform(pca.inverse_transform(x))
        return np.linalg.norm(surface_target - x_pred)

    res = minimize(func, [0] * num_components, method='Powell')

    return scaler.inverse_transform(pca.inverse_transform(res.x)).reshape(num_tenors, num_deltas), res
errors_reconstr_pca = []
for surface_test in surfaces_test:
    surface_reconstr = fit_pca(surface_test)[0]
    errors_reconstr_pca.append(surface_reconstr - surface_test)
errors_reconstr_pca = np.array(errors_reconstr_pca).reshape(-1, num_tenors, num_deltas)
mean_error_reconstr_pca = 10_000 * np.mean(abs(errors_reconstr_pca), axis=0)
q95_error_reconstr_pca = 10_000 * np.quantile(abs(errors_reconstr_pca), 0.95, axis=0)
max_error_reconstr_pca = 10_000 * np.max(abs(errors_reconstr_pca), axis=0)

fig, (ax0, ax1, ax2) = plt.subplots(subplot_kw={'projection': '3d'}, figsize=(10, 6), ncols=3)

ax0.plot_surface(XX, YY, mean_error_reconstr_pca)
ax0.view_init(elev=20, azim=10, roll=0)
ax0.set_xlabel('T')
ax0.set_ylabel('Δ')
ax0.set_zlabel('Error (bps)');
ax0.set_title(f'Mean error: {mean_error_reconstr_pca.mean():.2f} (bps)')

ax1.plot_surface(XX, YY, q95_error_reconstr_pca)
ax1.view_init(elev=20, azim=10, roll=0)
ax1.set_xlabel('T')
ax1.set_ylabel('Δ')
ax1.set_zlabel('90% Quantile (%)');
ax1.set_title(f'Mean 95% quantile: {q95_error_reconstr_pca.max():.2f} (bps)')

ax2.plot_surface(XX, YY, max_error_reconstr_pca)
ax2.view_init(elev=20, azim=10, roll=0)
ax2.set_xlabel('T')
ax2.set_ylabel('Δ')
ax2.set_zlabel('Relative Error (%)');
ax2.set_title(f'Max error: {max_error_reconstr_pca.max():.2f} (bps)')

fig.tight_layout;

png

def test_pca(i):
    r, q, V_0, κ, θ, ζ, ρ = params_test[i]
    surface_target  = surfaces_test[i]

    surface_pca, res = fit_pca(surface_target)

    fig, (ax0, ax1, ax2) = plt.subplots(subplot_kw={'projection': '3d'}, figsize=(15, 5), ncols=3)
    vmin = 100 * min(surface_target.min(), surface_pca.min())
    vmax = 100 * max(surface_target.max(), surface_pca.max())
    ax0.plot_surface(XX, YY, 100 * surface_target, cmap=cm.coolwarm, norm=plt.Normalize(vmin, vmax))
    ax0.set_title(f"r={r:.3f}, q={q:.3f}, V_0={V_0:.3f}, κ={κ:.3f}, θ={θ:.3f}, ζ={ζ:.3f}, ρ={ρ:.3f}")
    ax1.plot_surface(XX, YY, 100 * surface_pca, cmap=cm.coolwarm, norm=plt.Normalize(vmin, vmax))
    ax1.set_title(f'Optimizer status: ' + 'converged' if res.success else 'not converged')
    ax2.set_title('PCA components: ' + ', '.join([f'{k:.3f}' for k in res.x]))
    error = 10_000 * surface_pca - 10_000 * surface_target
    ax2.plot_surface(XX, YY, error)
    for ax in (ax0, ax1, ax2):
        ax.set_xlabel('Tenor (years)'); ax.set_ylabel('Simple Delta'); ax.set_zlabel('Implied Vol (bps)')
    fig.tight_layout()
test_pca(10)

png

test_pca(20)

png

test_pca(30)

png

After having established a baseline, we are ready to start with the variational autoencoder.

import seaborn as sns
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils
import torch.distributions
from torch.utils.data import Dataset, DataLoader

torch.manual_seed(0)
torch.set_default_dtype(torch.float64)
device = 'cpu'

We reuse the code from the article on variational autoencoders, with tiny differences. One of them is the usage of $\beta$-VAE and different scaling for the loglikelihood.

class VariationalEncoder(nn.Module):

    def __init__(self, num_tenors, num_deltas, num_latent, num_hidden):
        super().__init__()
        self.num_points = num_tenors * num_deltas
        self.linear1 = nn.Linear(self.num_points, 4 * num_hidden)
        self.linear2 = nn.Linear(4 * num_hidden, 2 * num_hidden)
        self.linear3 = nn.Linear(2 * num_hidden, num_hidden)
        self.linear_mu = nn.Linear(num_hidden, num_latent)
        self.linear_logsigma2 = nn.Linear(num_hidden, num_latent)

    def forward(self, x):
        x = x.view(-1, self.num_points)
        x = F.tanh(self.linear1(x))
        x = F.tanh(self.linear2(x))
        x = F.tanh(self.linear3(x))
        mu =  self.linear_mu(x)
        logsigma2 = self.linear_logsigma2(x)
        sigma = torch.exp(0.5 * logsigma2)
        ξ = torch.randn_like(mu)
        z = mu + sigma * ξ
        kl = torch.mean(torch.sum((sigma**2 + mu**2 - logsigma2 - 1) / 2, dim=1), dim=0)
        return z, kl
class Decoder(nn.Module):

    def __init__(self, num_tenors, num_deltas, num_latent, num_hidden):
        super().__init__()
        self.num_deltas = num_deltas
        self.num_tenors = num_tenors
        self.linear1 = nn.Linear(num_latent, num_hidden)
        self.linear2 = nn.Linear(num_hidden, 2 * num_hidden)
        self.linear3 = nn.Linear(2 * num_hidden, 4 * num_hidden)
        self.linear4 = nn.Linear(4 * num_hidden, self.num_tenors * self.num_deltas)

    def forward(self, z):
        z = F.tanh(self.linear1(z))
        z = F.tanh(self.linear2(z))
        z = F.tanh(self.linear3(z))
        z = self.linear4(z)
        return z.view(-1, self.num_tenors, self.num_deltas)
class VariationalAutoencoder(nn.Module):

    def __init__(self, num_tenors, num_deltas, num_latent, num_hidden):
        super().__init__()
        self.num_latent = num_latent
        self.num_points = num_tenors * num_deltas
        self.encoder = VariationalEncoder(num_tenors, num_deltas, num_latent, num_hidden)
        self.decoder = Decoder(num_tenors, num_deltas, num_latent, num_hidden)

    def forward(self, x):
        z, kl = self.encoder(x)
        x_hat = self.decoder(z)
        return x_hat, kl
    
    def compute_log_likelihood(self, X, X_hat, η):
        X = X.view(-1, self.num_points)
        X_hat = X_hat.view(-1, self.num_points)
        ξ = torch.randn_like(X_hat)
        X_sampled = X_hat + η.sqrt() * ξ
        return 0.5 * F.mse_loss(X, X_sampled) / η + η.log()

As customary, we derive from the Torch’s Dataset class to wrap our training dataset.

class HestonDataset(Dataset):
    
    def __init__(self, X):
        self.X = X

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return torch.tensor(self.X[idx])
def train(vae, data, epochs=20, lr=1e-3, gamma=0.95, η=0.1, β_kl=1.0, print_every=1):
    η = torch.tensor(η)
    optimizer = torch.optim.RMSprop(vae.parameters(), lr=lr)
    scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=gamma)
    history = []
    for epoch in trange(epochs):
        last_lr = scheduler.get_last_lr()
        total_log_loss, total_kl_loss, total_loss = 0.0, 0.0, 0.0
        for x in data:
            x = x.to(device)
            optimizer.zero_grad()
            x_hat, kl_loss = vae(x)
            log_loss = vae.compute_log_likelihood(x, x_hat, η)
            loss = log_loss + β_kl * kl_loss
            total_log_loss += log_loss.item()
            total_kl_loss += β_kl * kl_loss.item()
            total_loss += loss.item()
            loss.backward()
            optimizer.step()
        scheduler.step()
        if (epoch + 1) % print_every == 0:
            with torch.no_grad():
                _, (ax0, ax1) = plt.subplots(figsize=(10, 5), ncols=2)
                Z = vae.encoder(torch.tensor(scaled_surfaces_test).to(device))[0].cpu().detach().numpy()
                sns.kdeplot(data=Z, ax=ax0)

                for z in np.linspace(-3, 3):
                    z = [z] + [0.0] * (vae.num_latent - 1)
                    surface_vae = vae.decoder(torch.tensor(z).unsqueeze(0).to(device))[0].cpu().detach().numpy().reshape(num_tenors, num_deltas)
                    
                    ax1.plot(surface_vae[0])
                plt.show()
            print(f"Epoch: {epoch + 1:3d}, lr=: {last_lr[0]:.4e}, " \
                  f"total log loss: {total_log_loss:.4f}, total KL loss: {total_kl_loss:.4f}, total loss: {total_loss:.4f}")
        history.append((total_log_loss, total_kl_loss))
    return np.array(history)
dataset = HestonDataset(scaled_surfaces_train)
data_loader = DataLoader(dataset, batch_size=128, shuffle=True)
def init_weights(m):
    if isinstance(m, nn.Linear):
        torch.nn.init.xavier_uniform_(m.weight)
        m.bias.data.fill_(0.01)
num_latent = 5
vae = VariationalAutoencoder(num_tenors, num_deltas, num_latent, num_hidden=32).to(device)
torch.manual_seed(0)
vae.apply(init_weights);
history = train(vae, data_loader, epochs=5_000, lr=5e-4, gamma=1, η=0.01, β_kl=1e-3, print_every=5_000)
100%|█████████▉| 4998/5000 [02:51<00:00, 30.53it/s]

png

100%|██████████| 5000/5000 [02:51<00:00, 29.11it/s]

Epoch: 5000, lr=: 5.0000e-04, total log loss: -125.6315, total KL loss: 0.6787, total loss: -124.9528
def fit_vae(surface_target):
    def func(x):
        surface_pred = scaler.inverse_transform(vae.decoder(torch.tensor(x).unsqueeze(0).to(device))[0])
        surface_pred = surface_pred.cpu().detach().numpy()
        diff = np.linalg.norm(surface_target.flatten() - surface_pred.flatten(), ord=np.inf)
        return diff

    best_res = None
    for method in ['Nelder-Mead', 'Powell', 'BFGS']:
        res = minimize(func, [0] * num_latent, method=method)
        if best_res is None or (res.fun < best_res.fun and res.success):
            best_res = res

    pred = vae.decoder(torch.tensor(best_res.x).unsqueeze(0).to(device))[0].cpu().detach().numpy().reshape(num_tenors, num_deltas)
    return scaler.inverse_transform(pred), best_res
def test_vae(i, use_train=False):
    r, q, V_0, κ, θ, ζ, ρ = params_train[i] if use_train else params_test[i]
    surface_target  = surfaces_train[i] if use_train else surfaces_test[i]

    surface_vae, res = fit_vae(surface_target)

    fig, (ax0, ax1, ax2) = plt.subplots(subplot_kw={'projection': '3d'}, figsize=(15, 5), ncols=3)
    vmin = 100 * min(surface_target.min(), surface_vae.min())
    vmax = 100 * max(surface_target.max(), surface_vae.max())
    ax0.plot_surface(XX, YY, 100 * surface_target, cmap=cm.coolwarm, norm=plt.Normalize(vmin, vmax))
    ax0.set_title(f"r={r:.3f}, q={q:.3f}, V_0={V_0:.3f}, κ={κ:.3f}, θ={θ:.3f}, ζ={ζ:.3f}, ρ={ρ:.3f}")
    ax1.plot_surface(XX, YY, 100 * surface_vae, cmap=cm.coolwarm, norm=plt.Normalize(vmin, vmax))
    ax1.set_title(f'Optimizer status: ' + 'converged' if res.success else 'not converged')
    ax2.set_title('Latent vector: ' + ', '.join([f'{k:.3f}' for k in res.x]))
    error = 10_000 * surface_vae - 10_000 * surface_target
    ax2.plot_surface(XX, YY, error)
    for ax in (ax0, ax1, ax2):
        ax.set_xlabel('Tenor (years)'); ax.set_ylabel('Simple Delta'); ax.set_zlabel('Implied Vol (%)')
    fig.tight_layout()
test_vae(10)

png

test_vae(100)

png

test_vae(200)

png

errors_reconstr_vae = []
for surface_test in (pbar := tqdm(surfaces_test)):
    surface_reconstr = fit_vae(surface_test)[0]
    error = surface_reconstr - surface_test
    pbar.set_description(f'{abs(error).max():.2f}')
    errors_reconstr_vae.append(error)
errors_reconstr_vae = np.array(errors_reconstr_vae).reshape(-1, num_tenors, num_deltas)
0.00: 100%|██████████| 205/205 [00:23<00:00,  8.79it/s]
plt.hist(np.max(errors_reconstr_vae.reshape(205, -1), axis=0), bins=20);

png

mean_error_reconstr_vae = 10_000 * np.mean(abs(errors_reconstr_vae), axis=0)
q90_error_reconstr_vae = 10_000 * np.quantile(abs(errors_reconstr_vae), 0.90, axis=0)
q95_error_reconstr_vae = 10_000 * np.quantile(abs(errors_reconstr_vae), 0.95, axis=0)
max_error_reconstr_vae = 10_000 * np.max(abs(errors_reconstr_vae), axis=0)

fig, (ax0, ax1, ax2, ax3) = plt.subplots(subplot_kw={'projection': '3d'}, figsize=(12, 6), ncols=4)

ax0.plot_surface(XX, YY, mean_error_reconstr_vae)
ax0.set_zlabel('Error (bps)');
ax0.set_title(f'Mean Error: {mean_error_reconstr_vae.mean():.2f} (bps)')

ax1.plot_surface(XX, YY, q90_error_reconstr_vae)
ax1.set_zlabel('90% Quantile (%)');
ax1.set_title(f'90% Quantile: {q90_error_reconstr_vae.max():.2f} (bps)')

ax2.plot_surface(XX, YY, q95_error_reconstr_vae)
ax2.set_zlabel('95% Quantile (%)');
ax2.set_title(f'95% Quantile: {q95_error_reconstr_vae.max():.2f} (bps)')

ax3.plot_surface(XX, YY, max_error_reconstr_vae)
ax3.set_zlabel('Relative Error (%)');
ax3.set_title(f'Max Error: {max_error_reconstr_vae.max():.2f} (bps)')

for ax in [ax0, ax1, ax2, ax3]:
    ax.view_init(elev=20, azim=10, roll=0)
    ax.set_xlabel('T')
    ax.set_ylabel('Δ')

fig.tight_layout;

png

Finally, we check the shape of the latent space on the training set. The pair plot shows the distribution of each latent variable on the diagonals and the scatter plots of two latent variables on the off-diagonal plots. On the upper triangular part we have superimposed the circles with the ideal distribution, while on the lower triangular part the lines show the actual isolines from the kernel density estimations.

import pandas as pd
from matplotlib.patches import Circle
Z = vae.encoder(torch.tensor(scaled_surfaces_test).to(device))[0].cpu().detach().numpy()
g = sns.pairplot(pd.DataFrame(Z, columns=[f"Z_{i}" for i in range(num_latent)]), diag_kind='kde', corner=False)
g.map_lower(sns.kdeplot, levels=4, color="orange")
for r in [1, 2, 3, 4]:
    for j in range(1, num_latent):
        for i in range(0, j):
            g.axes[i, j].add_patch(Circle((0, 0), r, linestyle='dashed', color='crimson', alpha=1, fill=None))
            g.axes[i, j].set_xlim(-4, 4)
            g.axes[i, j].set_ylim(-4, 4)
g.figure.tight_layout()

png

This shows that we can effectively generate Heston-type surfaces using variational autoencoders. The training dataset is of modest dimension – the quality of the reconstructed entities could be improved with more samples.