From a pytorch model to a deep explainable model#
For a quick introduction to the Xpdeep APIs, this section demonstrates, on the Air Quality dataset, how to adapt a standard deep model's PyTorch code to transition to designing an explainable deep model.
We will review the key steps involved in designing a deep model, from architecture specification and training to generating explanations (for Xpdeep).
For each step in building a deep model, we provide:
- Tabs labeled "SOTA and Xpdeep" for code that is identical for both the SOTA deep model and the Xpdeep explainable model.
- Tabs labeled "Xpdeep" for code specific to the Xpdeep explainable model.#
1. Project Setup#
Setup Api Key and URL#
Create a Project#
2. Data preparation#
Read Raw Data#
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from datetime import UTC, datetime
import torch
# 1. Split and Convert your Raw Data
# Remove the first rows (incorrect values for some columns) .
data = pd.read_csv("air_quality.csv")[24:]
# Fill NA/NaN values by propagating the last valid observation to next valid value.
data.update({"pm2.5": data["pm2.5"].ffill()})
# Convert time to python datetime.
data["time"] = data.apply(
lambda x: datetime(year=x["year"], month=x["month"], day=x["day"], hour=x["hour"], tzinfo=UTC), axis=1
)
# Remove unnecessary columns.
data.drop(columns=["year", "month", "day", "hour", "No"], inplace=True)
data.drop(columns=["cbwd"], inplace=True)
# Set the "time" column as index
data = data.set_index("time")
data.head()
# Create the samples
lookback = 24
horizon = 5
# Calculate the number of samples based on dataset length, look_back, and horizon. Each sample overlap the
# next by 1 timestamp.
num_samples = len(data) - lookback - horizon + 1
data_input_numpy = data.to_numpy() # Inputs contains the target channel as well
# (with its lookback we predict the horizon)
data_target_numpy = data[["pm2.5"]].to_numpy()
# Broadcast the data input and target
repeated_data_input = np.broadcast_to(data_input_numpy, (num_samples, *data_input_numpy.shape))
repeated_data_target = np.broadcast_to(data_target_numpy, (num_samples, *data_target_numpy.shape))
# Generate tensor slices with overlap
tensor_slices = torch.arange(lookback + horizon).unsqueeze(0) + torch.arange(num_samples).unsqueeze(1)
# Get the input and target slices
input_slices = tensor_slices[:, :lookback]
target_slices = tensor_slices[:, lookback:]
time_dimension = 1
# Number of dimensions apart from the temporal one (for multivariate, it's 1)
number_of_data_dims = len(data.shape) - 1
# Gather input and target data using the slices
input_indices_to_gather = input_slices.unsqueeze(*list(range(-number_of_data_dims, 0))).repeat(
1, 1, *repeated_data_input.shape[2:]
)
target_indices_to_gather = target_slices.unsqueeze(*list(range(-number_of_data_dims, 0))).repeat(
1, 1, *repeated_data_target.shape[2:]
)
# Reshape the input and target data
transformed_inputs = (torch.gather(torch.from_numpy(repeated_data_input.copy()), time_dimension, input_indices_to_gather).numpy().copy())
transformed_targets = (torch.gather(torch.from_numpy(repeated_data_target.copy()), time_dimension, target_indices_to_gather).numpy().copy())
data = pd.DataFrame({
"sensor airquality": transformed_inputs.tolist(), # Convert to a list of arrays for storage in DataFrame
"target pm2.5": transformed_targets.tolist(),
})
Split Data#
Conversion to Parquet Format#
import pyarrow as pa
import pyarrow.parquet as pq
# Convert to pyarrow Table format
train_table = pa.Table.from_pandas(train_data, preserve_index=False)
val_table = pa.Table.from_pandas(val_data, preserve_index=False)
test_table = pa.Table.from_pandas(test_data, preserve_index=False)
# Save each split as ".parquet" file
pq.write_table(train_table, "train.parquet")
pq.write_table(val_table, "val.parquet")
pq.write_table(test_table, "test.parquet")
Upload#
Preprocess Data#
from sklearn.preprocessing import StandardScaler
input_data_for_preprocessor = np.array(train_data["sensor airquality"].to_list())[:,0,:]
target_data_for_preprocessor = np.array(train_data["target pm2.5"].to_list())[:,0,:]
input_encoder = StandardScaler().fit(input_data_for_preprocessor)
target_encoder = StandardScaler().fit(target_data_for_preprocessor)
preprocessed_input = input_encoder.transform(np.array(data["sensor airquality"].to_list()).reshape(-1,7)).reshape(-1, lookback, 7)
preprocessed_target = target_encoder.transform(np.array(data["target pm2.5"].to_list()).reshape(-1,1)).reshape(-1, horizon, 1)
x_train = input_encoder.transform(np.array(train_data["sensor airquality"].to_list()).reshape(-1,7)).reshape(-1, 7, lookback)
y_train = target_encoder.transform(np.array(train_data["target pm2.5"].to_list()).reshape(-1,1)).reshape(-1, horizon, 1)
x_val = input_encoder.transform(np.array(val_data["sensor airquality"].to_list()).reshape(-1,7)).reshape(-1, 7, lookback)
y_val = target_encoder.transform(np.array(val_data["target pm2.5"].to_list()).reshape(-1,1)).reshape(-1, horizon, 1)
x_test = input_encoder.transform(np.array(test_data["sensor airquality"].to_list()).reshape(-1,7)).reshape(-1, 7, lookback)
y_test = target_encoder.transform(np.array(test_data["target pm2.5"].to_list()).reshape(-1,1)).reshape(-1, horizon, 1)
from xpdeep.dataset.schema.feature.feature import (
MultivariateTimeSeries, UnivariateTimeSeries
)
from xpdeep.dataset.schema.preprocessor import TorchPreprocessor, SklearnPreprocessor
import torch
from xpdeep.dataset.parquet_dataset import FittedParquetDataset
from xpdeep.dataset.schema.schema import FittedSchema
# 1/ Create Preprocessor class
class ScaleAirQInput(TorchPreprocessor):
def transform(self, inputs: torch.Tensor) -> torch.Tensor:
"""Transform."""
return (inputs - self.mean) / self.scale
def inverse_transform(self, output: torch.Tensor) -> torch.Tensor:
"""Apply inverse transform."""
return output * self.scale + self.mean
class ScaleAirQTarget(TorchPreprocessor):
def transform(self, inputs: torch.Tensor) -> torch.Tensor:
"""Transform."""
return (inputs - self.mean) / self.scale
def inverse_transform(self, output: torch.Tensor) -> torch.Tensor:
"""Apply inverse transform."""
return output * self.scale + self.mean
mean_input = torch.Tensor(data["sensor airquality"].to_list())[:,0,:].mean(dim=0)
scale_input = torch.Tensor(data["sensor airquality"].to_list())[:,0,:].std(dim=0)
mean_target = torch.Tensor(data["target pm2.5"].to_list())[:,0,:].mean(dim=0)
scale_target = torch.Tensor(data["target pm2.5"].to_list())[:,0,:].std(dim=0)
# 2/ Create Fitted Parquet Datasets
fitted_schema = FittedSchema(
MultivariateTimeSeries(
asynchronous=True,
channel_names=["pm2.5", "DEWP", "TEMP", "PRES", "Iws", "Is", "Ir"],
name="sensor airquality",
preprocessor=ScaleAirQInput((24, 7), mean=mean_input, scale=scale_input),
),
UnivariateTimeSeries(
asynchronous=True,
name="target pm2.5",
is_target=True,
mirrored_channel="sensor airquality",
preprocessor=ScaleAirQTarget((5, 1), mean=mean_target, scale=scale_target),
),
)
fit_train_dataset = FittedParquetDataset(
split_name="train",
identifier_name="my_local_dataset",
path=directory["train_set_path"],
fitted_schema=fitted_schema,
)
print(fitted_schema)
fit_test_dataset = FittedParquetDataset(
split_name="test",
identifier_name="my_local_dataset",
path=directory["test_set_path"],
fitted_schema=fit_train_dataset.fitted_schema,
)
fit_val_dataset = FittedParquetDataset(
split_name="validation",
identifier_name="my_local_dataset",
path=directory["val_set_path"],
fitted_schema=fit_train_dataset.fitted_schema,
)
# ##### Prepare the Model #######
# 1. Create the required torch models
input_size = fit_train_dataset.fitted_schema.input_size[1:]
target_size = fit_train_dataset.fitted_schema.target_size[1:]
print(f"input_size: {input_size} - target_size: {target_size}")
3. Model Construction#
Architecture Specification#
from torch.nn import Sequential
class SotaModel(Sequential):
def __init__(self):
layers = [
torch.nn.Conv1d(7, 16, kernel_size=3, stride=1, padding=1),
torch.nn.ReLU(),
torch.nn.Conv1d(16, 32, kernel_size=3, stride=1, padding=1),
torch.nn.ReLU(),
torch.nn.Conv1d(32, 64, kernel_size=3, stride=1, padding=1),
torch.nn.ReLU(),
torch.nn.Flatten(),
torch.nn.LazyLinear(out_features=horizon)
]
super().__init__(*layers)
def forward(self, inputs: torch.Tensor) -> torch.Tensor:
x = super().forward(inputs)
x = x.reshape(-1, horizon, 1)
return x
from torch.nn import Sequential
class FeatureExtractor(Sequential):
def __init__(self):
layers = [
torch.nn.Conv1d(7, 16, kernel_size=3, stride=1, padding=1),
torch.nn.ReLU(),
torch.nn.Conv1d(16, 32, kernel_size=3, stride=1, padding=1),
torch.nn.ReLU(),
torch.nn.Conv1d(32, 64, kernel_size=3, stride=1, padding=1),
torch.nn.ReLU(),
torch.nn.Flatten(),
]
super().__init__(*layers)
def forward(self, inputs: torch.Tensor) -> torch.Tensor:
x = inputs.reshape(-1, 7, lookback)
return super().forward(x)
class TaskLearner(Sequential):
def __init__(self):
layers = [
torch.nn.LazyLinear(out_features=horizon)
]
super().__init__(*layers)
def forward(self, inputs: torch.Tensor) -> torch.Tensor:
x = super().forward(inputs)
x = x.reshape(-1,horizon,1)
return x
Model Instantiation#
from xpdeep.model.model_builder import ModelDecisionGraphParameters
from xpdeep.model.xpdeep_model import XpdeepModel
# Explanation Architecture
explanation_architecture = ModelDecisionGraphParameters(
graph_depth=3,
discrimination_weight=0.1,
target_homogeneity_weight=2.0,
prune_step=11,
target_homogeneity_pruning_threshold=0.7,
population_pruning_threshold=0.05,
balancing_weight=1.0,
)
# XPDEEP Model Architecture
xpdeep_model = XpdeepModel.from_torch(
fitted_schema=fit_train_dataset.fitted_schema,
feature_extraction=FeatureExtractor(),
task_learner=TaskLearner(),
decision_graph_parameters=explanation_architecture,
)
4. Training#
Training Specification#
from xpdeep.trainer.callbacks import EarlyStopping, Scheduler
from functools import partial
from xpdeep.metric import DictMetrics, TorchGlobalMetric, TorchLeafMetric
from torch.optim.lr_scheduler import ReduceLROnPlateau
from xpdeep.trainer.trainer import Trainer
from torch import nn
from torchmetrics import MeanSquaredError
target_size = fit_train_dataset.fitted_schema.target_size[1]
# Explanation Metrics
metrics = DictMetrics(
mse=TorchGlobalMetric(metric=partial(MeanSquaredError), on_raw_data=True),
leaf_metric_mse=TorchLeafMetric(metric=partial(MeanSquaredError), on_raw_data=True)
)
callbacks = [
EarlyStopping(monitoring_metric="Total loss", mode="minimize", patience=10),
Scheduler(
pre_scheduler=partial(ReduceLROnPlateau, patience=3, mode="max"),
step_method="epoch",
monitoring_metric="Total loss",
),
]
# XPDEEP Training Specifications
trainer = Trainer(
loss=nn.MSELoss(reduction="none"),
optimizer=partial(torch.optim.AdamW, lr=0.01, foreach=False, fused=False),
start_epoch=0,
max_epochs=60,
metrics=metrics,
callbacks=callbacks,
)
Model Training#
import torch
from sklearn.metrics import mean_squared_error, root_mean_squared_error
device = "cpu"
def train(X_train, y_train, model, loss_fn, optimizer):
size = len(X_train)
model.train()
total_loss = 0
for batch in range(size//batch_size):
X_batch, y_batch = torch.tensor(X_train[batch*batch_size:(batch+1)*batch_size,:,:], dtype=torch.float32).to(device), torch.tensor(y_train[batch*batch_size:(batch+1)*batch_size,:], dtype=torch.float32).to(device)
# X_batch, y_batch = X_batch.reshape(-1, lookback*7), y_batch.reshape(-1,horizon)
# Compute prediction error
pred = model(X_batch)
loss = loss_fn(pred, y_batch)
# Backpropagation
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
average_loss = total_loss/(size//batch_size)
return average_loss
def eval_(X_test, y_test, model, loss_fn):
model.eval()
with torch.no_grad():
X_test, y_test = torch.tensor(X_test, dtype=torch.float32).to(device), torch.tensor(y_test, dtype=torch.float32).to(device)
# X_test, y_test = X_test.reshape(-1, lookback*7), y_test.reshape(-1,horizon)
pred = model(X_test)
test_loss = loss_fn(pred, y_test).item()
mse = mean_squared_error(target_encoder.inverse_transform(y_test.reshape(-1,horizon)), target_encoder.inverse_transform(pred.reshape(-1,horizon)))
rmse = root_mean_squared_error(target_encoder.inverse_transform(y_test.reshape(-1,horizon)), target_encoder.inverse_transform(pred.reshape(-1,horizon)))
return target_encoder.inverse_transform(pred.reshape(-1,horizon)), test_loss, mse, rmse
for t in range(epochs):
print(f"\nEpoch {t+1}\n-------------------------------")
training_loss = train(
x_train,
y_train,
sota_model,
loss_fn,
optimizer
)
_, val_loss, _, _ = eval_(
x_val,
y_val,
sota_model,
loss_fn
)
print(f"Training Loss: {training_loss}\nValidation Loss: {val_loss}")
_, _, mse_on_train , rmse_on_train = eval_(x_train, y_train, sota_model, loss_fn)
_, _, mse_on_validation, rmse_on_validation = eval_(x_val, y_val, sota_model, loss_fn)
_, _, mse_on_test, rmse_on_test = eval_(x_test, y_test, sota_model, loss_fn)
print(f"\nMSEs: "
f"\nMSE on train set : {mse_on_train}"
f"\nMSE on validation set : {mse_on_validation}"
f"\nMSE on test set : {mse_on_test}"
)
5. Explanation Generation#
from xpdeep.explain.explainer import Explainer
from xpdeep.explain.quality_metrics import Infidelity, Sensitivity
from xpdeep.explain.statistic import DictStats, HistogramStat, VarianceStat
statistics = DictStats(
histogram_target=HistogramStat(on="target", num_bins=20, num_items=1000, on_raw_data=True),
histogram_prediction=HistogramStat(on="prediction", num_bins=20, num_items=1000, on_raw_data=True),
histogram_error=HistogramStat(on="prediction_error", num_bins=20, num_items=1000, on_raw_data=True),
variance_target=VarianceStat(on="target", on_raw_data=True),
variance_prediction=VarianceStat(on="prediction", on_raw_data=True),
)
quality_metrics = [Sensitivity(), Infidelity()]
explainer = Explainer(
description_representativeness=1000, quality_metrics=quality_metrics, metrics=metrics, statistics=statistics
)
model_explanations = explainer.global_explain(
trained_model,
train_set=fit_train_dataset,
test_set=fit_test_dataset,
validation_set=fit_val_dataset,
)
print(model_explanations.visualisation_link)