Other examples#
Banking dataset#
Banking is a dataset for binary classification from tabular inputs.
Please download the dataset here and update the tutorial data path accordingly.
The data is related to direct marketing campaigns (phone calls) of a Portuguese banking institution. The classification goal is to predict if the client will subscribe to a term deposit (variable y).
👀 Full file preview
import pandas as pd
import torch
import pyarrow as pa
import pyarrow.parquet as pq
from sklearn.model_selection import train_test_split
from torch.optim.lr_scheduler import ReduceLROnPlateau
from xpdeep import init, set_project
from xpdeep.dataset.parquet_dataset import ParquetDataset, FittedParquetDataset
from xpdeep.dataset.upload import upload
from xpdeep.explain.explainer import Explainer
from xpdeep.explain.quality_metrics import Sensitivity, Infidelity
from xpdeep.explain.statistic import DictStats, DistributionStat
from xpdeep.metrics.metric import DictMetrics
from xpdeep.metrics.zoo.multiclass_metrics import MulticlassF1Score, MulticlassAccuracy, MulticlassConfusionMatrix
from xpdeep.model.model_builder import ModelDecisionGraphParameters
from xpdeep.model.xpdeep_model import XpdeepModel
from xpdeep.model.zoo.cross_entropy_loss_from_proba import CrossEntropyLossFromProbabilities
from xpdeep.project import Project
from xpdeep.trainer.callbacks import EarlyStopping, Scheduler, ModelCheckpoint
from xpdeep.trainer.trainer import Trainer
from functools import partial
from torch import nn
from torch.nn import Sequential
torch.random.manual_seed(42)
# ##### Prepare the Dataset #######
# 1. Load and preprocess data
data = pd.read_csv("banking_dataset/train.csv", sep=";")
filtered_data = data[data["y"] == "no"].sample(n=10000, random_state=42)
data_train = pd.concat([data[data["y"] != "no"], filtered_data])
test_data = pd.read_csv("banking_dataset/test.csv", sep=";")
# 2. Split training set into training and validation
train_data, val_data = train_test_split(data_train, test_size=0.15, random_state=42)
# 3. Add index for xpdeep
train_data["index_xp_deep"] = range(len(train_data))
test_data["index_xp_deep"] = range(len(test_data))
val_data["index_xp_deep"] = range(len(val_data))
# 4. Convert to pyarrow Table format and save as parquet files
pq.write_table(pa.Table.from_pandas(train_data, preserve_index=False), "train.parquet")
pq.write_table(pa.Table.from_pandas(val_data, preserve_index=False), "val.parquet")
pq.write_table(pa.Table.from_pandas(test_data, preserve_index=False), "test.parquet")
# 5. Initialize the API connection and set project
init(api_key="api_key", api_url="api_url")
project = Project(id="BankingDatasetId", name="Banking Dataset Tutorial")
set_project(project)
# 6. Upload dataset and analyze
directory = upload(
directory_name="banking_dataset",
train_set_path="train.parquet",
test_set_path="test.parquet",
val_set_path="val.parquet",
)
train_dataset = ParquetDataset(
split_name="train",
identifier_name="my_local_dataset",
path=directory["train_set_path"],
)
analyzed_train_dataset = train_dataset.analyze(target_names=["y"])
fit_train_dataset = analyzed_train_dataset.fit()
# Create test and validation datasets based on the fitted schema
fit_test_dataset = FittedParquetDataset(
split_name="test",
identifier_name="my_local_dataset",
path=directory["test_set_path"],
fitted_schema=fit_train_dataset.fitted_schema,
)
fit_val_dataset = FittedParquetDataset(
split_name="val",
identifier_name="my_local_dataset",
path=directory["val_set_path"],
fitted_schema=fit_train_dataset.fitted_schema,
)
# ##### Prepare the Model #######
# 1. Set input and target sizes based on the fitted schema
input_size = fit_train_dataset.fitted_schema.input_size[1]
target_size = fit_train_dataset.fitted_schema.target_size[1]
# 2. Initialize models for feature extraction and task learning
feature_extraction = Sequential(nn.Linear(input_size, 128), nn.ReLU(), nn.Linear(128, 50), nn.ReLU())
task_learner = Sequential(nn.Linear(50, target_size), nn.Softmax(dim=1))
# 3. Specify the model's structure and constraints
model_specifications = ModelDecisionGraphParameters(
graph_depth=3,
target_homogeneity_pruning_threshold=0.8,
population_pruning_threshold=0.01,
prune_step=11,
target_homogeneity_weight=0.7,
discrimination_weight=0.8,
balancing_weight=0.3,
)
# 4. Create the explainable model using the xpdeep API
xpdeep_model = XpdeepModel.from_torch(
fitted_schema=fit_train_dataset.fitted_schema,
feature_extraction=feature_extraction,
task_learner=task_learner,
backbone=None,
decision_graph_parameters=model_specifications,
)
# ##### Train #######
# 1. Define metrics and callbacks for training
metrics = DictMetrics(
multi_class_F1_score=partial(MulticlassF1Score, average="macro", num_classes=target_size),
multi_class_accuracy=partial(MulticlassAccuracy, num_classes=target_size),
confusion_matrix=partial(MulticlassConfusionMatrix, num_classes=target_size),
)
callbacks = [
EarlyStopping(monitoring_metric="Total loss", mode="minimize", patience=5),
Scheduler(pre_scheduler=partial(ReduceLROnPlateau), step_method="epoch", monitoring_metric="Total loss"),
ModelCheckpoint(monitoring_metric="global_multi_class_F1_score", mode="minimize"),
]
# 2. Define optimizer
optimizer = partial(torch.optim.AdamW, lr=0.001, fused=True)
# 3. Create and run the trainer
trainer = Trainer(
loss=CrossEntropyLossFromProbabilities(reduction="none"),
optimizer=optimizer,
callbacks=callbacks,
start_epoch=0,
max_epochs=20,
metrics=metrics,
)
trained_model = trainer.train(
model=xpdeep_model,
train_set=fit_train_dataset,
validation_set=fit_val_dataset,
batch_size=2048,
)
# ##### Explain #######
# 1. Build the Explainer
statistics = DictStats(
distribution_target=DistributionStat(on="target"), distribution_prediction=DistributionStat(on="prediction")
)
quality_metrics = [Sensitivity(), Infidelity()]
explainer = Explainer(
description_representativeness=1000, quality_metrics=quality_metrics, metrics=metrics, statistics=statistics
)
# 2. Generate global model explanations
model_explanations = explainer.global_explain(
trained_model,
train_set=fit_train_dataset,
test_set=fit_test_dataset,
validation_set=fit_val_dataset,
)
visualisation_link = model_explanations.visualisation_link
print(visualisation_link)
Insurance dataset#
Insurance is a dataset for regression from tabular inputs.
Please download the dataset here and update the tutorial data path accordingly.
The "Insurance Dataset for Predicting Health Insurance Premiums in the US" is a collection of data on various factors that can influence medical costs and premiums for health insurance in the United States. The dataset includes information on 10 variables, including age, gender, body mass index (BMI), number of children, smoking status, region, income, education, occupation, and type of insurance plan.
👀 Full file preview
import pandas as pd
import torch
import pyarrow as pa
import pyarrow.parquet as pq
from sklearn.model_selection import train_test_split
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torchmetrics import MeanSquaredError
from xpdeep import init, set_project
from xpdeep.dataset.parquet_dataset import ParquetDataset, FittedParquetDataset
from xpdeep.dataset.upload import upload
from xpdeep.explain.explainer import Explainer
from xpdeep.explain.quality_metrics import Sensitivity, Infidelity
from xpdeep.explain.statistic import DictStats, HistogramStat, VarianceStat
from xpdeep.metrics.metric import DictMetrics, TorchGlobalMetric, TorchLeafMetric
from xpdeep.model.model_builder import ModelDecisionGraphParameters
from xpdeep.model.xpdeep_model import XpdeepModel
from xpdeep.project import Project
from xpdeep.trainer.callbacks import EarlyStopping, Scheduler, ModelCheckpoint
from xpdeep.trainer.trainer import Trainer
from functools import partial
from torch import nn
torch.random.manual_seed(42)
# ##### Prepare the Dataset #######
# 1. Load and preprocess data
data = pd.read_csv("insurance/insurance_dataset.csv")
data["medical_history"] = data["medical_history"].fillna("No Record")
data["family_medical_history"] = data["family_medical_history"].fillna("No Record")
data.rename(columns={"medical_history": "indv_medical_history"}, inplace=True)
# Split dataset into training and test sets
train_data, test_data = train_test_split(data, test_size=0.25, random_state=42)
# Add index for xpdeep
train_data["index_xp_deep"] = range(len(train_data))
test_data["index_xp_deep"] = range(len(test_data))
# Convert to pyarrow Table format and save as parquet files
pq.write_table(pa.Table.from_pandas(train_data, preserve_index=False), "train.parquet")
pq.write_table(pa.Table.from_pandas(test_data, preserve_index=False), "test.parquet")
# Initialize the API connection and set project
init(api_key="api_key", api_url="api_url")
project = Project(id="InsuranceId", name="Insurance Dataset Tutorial")
set_project(project)
# Upload dataset and analyze
directory = upload(
directory_name="insurance_dataset_uploaded",
train_set_path="train.parquet",
test_set_path="test.parquet",
)
# Create and fit training dataset
train_dataset = ParquetDataset(
split_name="train",
identifier_name="my_local_dataset",
path=directory["train_set_path"],
)
analyzed_train_dataset = train_dataset.analyze(target_names=["charges"])
fit_train_dataset = analyzed_train_dataset.fit()
# Create test dataset based on the fitted schema
fit_test_dataset = FittedParquetDataset(
split_name="test",
identifier_name="my_local_dataset",
path=directory["test_set_path"],
fitted_schema=fit_train_dataset.fitted_schema,
)
# ##### Prepare the Model #######
# 1. Set input and target sizes based on the fitted schema
input_size = fit_train_dataset.fitted_schema.input_size[1]
target_size = fit_train_dataset.fitted_schema.target_size[1] # Should be 1 for regression
# 2. Initialize models for feature extraction and task learning
feature_extraction = nn.Sequential(
nn.Linear(input_size, 128), nn.ReLU(), nn.Linear(128, 128), nn.ReLU(), nn.Linear(128, 64), nn.ReLU()
)
task_learner = nn.Sequential(nn.Linear(64, target_size))
# 3. Specify the model's structure and constraints
model_specifications = ModelDecisionGraphParameters(
graph_depth=3,
discrimination_weight=0.1,
target_homogeneity_weight=0.5,
target_homogeneity_pruning_threshold=0.8,
population_pruning_threshold=0.2,
balancing_weight=0.4,
prune_step=7,
)
# 4. Create the explainable model using the xpdeep API
xpdeep_model = XpdeepModel.from_torch(
fitted_schema=fit_train_dataset.fitted_schema,
feature_extraction=feature_extraction,
task_learner=task_learner,
backbone=None,
decision_graph_parameters=model_specifications,
)
# ##### Train #######
# 1. Define metrics and callbacks for training
metrics = DictMetrics(
mse=TorchGlobalMetric(metric=partial(MeanSquaredError), on_raw_data=True),
leaf_metric_mse=TorchLeafMetric(metric=partial(MeanSquaredError), on_raw_data=True),
rmse=partial(MeanSquaredError, squared=False),
)
callbacks = [
EarlyStopping(monitoring_metric="Total loss", mode="minimize", patience=10),
Scheduler(pre_scheduler=partial(ReduceLROnPlateau), step_method="epoch", monitoring_metric="Total loss"),
ModelCheckpoint(monitoring_metric="mse", mode="minimize"),
]
# 2. Define optimizer
optimizer = partial(torch.optim.AdamW, lr=0.001)
# 3. Create and run the trainer
trainer = Trainer(
loss=torch.nn.MSELoss(reduction="none"),
optimizer=optimizer,
callbacks=callbacks,
start_epoch=0,
max_epochs=10,
metrics=metrics,
)
trained_model = trainer.train(
model=xpdeep_model,
train_set=fit_train_dataset,
validation_set=None,
batch_size=4096,
)
# ##### Explain #######
# 1. Build the Explainer
statistics = DictStats(
histogram_target=HistogramStat(on="target", num_bins=20, num_items=1000, on_raw_data=True),
histogram_prediction=HistogramStat(on="prediction", num_bins=20, num_items=1000, on_raw_data=True),
histogram_error=HistogramStat(on="prediction_error", num_bins=20, num_items=1000, on_raw_data=True),
variance_target=VarianceStat(on="target", on_raw_data=True),
variance_prediction=VarianceStat(on="prediction", on_raw_data=True),
)
quality_metrics = [Sensitivity(), Infidelity()]
explainer = Explainer(
description_representativeness=1000, quality_metrics=quality_metrics, metrics=metrics, statistics=statistics
)
# 2. Generate global model explanations
model_explanations = explainer.global_explain(
trained_model,
train_set=fit_train_dataset,
test_set=fit_test_dataset,
)
visualisation_link = model_explanations.visualisation_link
print(visualisation_link)
ECG dataset#
ECG is a dataset for regression with time-series inputs.
Please download the dataset here and update the tutorial data path accordingly. The MIT-BIH Arrhythmia Database contains 48 half-hour excerpts of two-channel ambulatory ECG recordings, obtained from 47 subjects studied by the BIH Arrhythmia Laboratory between 1975 and 1979. Twenty-three recordings were chosen at random from a set of 4,000 24-hour ambulatory ECG recordings collected from a mixed population of inpatients (about 60%) and outpatients (about 40%) at Boston's Beth Israel Hospital. The remaining 25 recordings were selected from the same set to include less common but clinically significant arrhythmias that would not be well-represented in a small random sample.
👀 Full file preview
import csv
import os
from collections import Counter
import datasets
import numpy as np
import pandas as pd
import pywt
import torch
from datasets import Features
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from torch import nn, Tensor
from torch.optim.lr_scheduler import ReduceLROnPlateau
from xpdeep import init, set_project
from xpdeep.dataset.parquet_dataset import AnalyzedParquetDataset, FittedParquetDataset
from xpdeep.dataset.schema.feature.feature import Metadata, CategoricalFeature, UnivariateSynchronousTimeSerie
from xpdeep.dataset.schema.preprocessor import SklearnPreprocessor, TorchPreprocessor
from xpdeep.dataset.schema.schema import AnalyzedSchema
from xpdeep.dataset.upload import upload
from xpdeep.explain.explainer import Explainer
from xpdeep.explain.quality_metrics import Sensitivity, Infidelity
from xpdeep.explain.statistic import DictStats, DistributionStat
from xpdeep.metrics.metric import DictMetrics
from xpdeep.metrics.zoo.multiclass_metrics import MulticlassAccuracy, MulticlassF1Score, MulticlassConfusionMatrix
from xpdeep.model.model_builder import ModelDecisionGraphParameters
from xpdeep.model.xpdeep_model import XpdeepModel
from xpdeep.model.zoo.cross_entropy_loss_from_proba import CrossEntropyLossFromProbabilities
from xpdeep.model.zoo.mlp import MLP
from xpdeep.project import Project
from xpdeep.trainer.callbacks import EarlyStopping, Scheduler, ModelCheckpoint
from xpdeep.trainer.trainer import Trainer
from functools import partial
torch.random.manual_seed(42)
##### Prepare the Dataset #######
path = "mitbih_database/"
window_size = 180
maximum_counting = 10000
class ScaleUnivariate(TorchPreprocessor):
def __init__(self, input_size: tuple[int, ...], target_mean: torch.Tensor, target_scale: torch.Tensor):
"""ScaleGasPriceTarget."""
super().__init__(input_size)
self.mean = torch.nn.Parameter(target_mean, requires_grad=False)
self.scale = torch.nn.Parameter(target_scale, requires_grad=False)
def transform(self, inputs: torch.Tensor) -> torch.Tensor:
"""Transform the target (Price)."""
return (inputs - self.mean) / self.scale
def inverse_transform(self, output: torch.Tensor) -> torch.Tensor:
"""Inverse the transformation for target (Price)."""
return output * self.scale + self.mean
class ResidualBlock1D(nn.Module):
"""Residual block with 1D convolutions."""
def __init__(self, in_channels: int, out_channels: int, stride: int = 1, kernel_size: int = 3, padding: int = 1):
"""Init residual block."""
super().__init__()
self.conv1 = nn.Sequential(
nn.Conv1d(in_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=padding),
nn.ReLU(),
)
self.conv2 = nn.Sequential(
nn.Conv1d(out_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=padding),
)
self.cast_layer = nn.Linear(in_channels, out_channels)
self.relu = nn.ReLU()
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""Forward function."""
# cast number of channels to number of out channels.
# It should be called as a CNN, with channels in 2nd dimension
residual = self.cast_layer(x.transpose(1, -1)).transpose(1, -1)
out = self.conv1(x)
out = self.conv2(out)
out += residual
return self.relu(out)
class ResidualCNN1D(nn.Module):
"""Cnn with 1D residual blocks."""
def __init__( # noqa: PLR0913
self,
in_channels: int,
*,
dropout: float = 0.0,
out_channels: tuple[int, ...] = (32, 64, 128),
max_pool_size: int = 2,
output_size: int | None = 50,
with_softmax: bool = False,
transpose_input: bool = False,
):
"""Initialize a basic CNN model for classification."""
super().__init__()
layers = []
for channel_dim in out_channels:
layers.extend([
ResidualBlock1D(in_channels=in_channels, out_channels=channel_dim),
nn.MaxPool1d(max_pool_size),
nn.Dropout(p=dropout),
])
in_channels = channel_dim
self.cnn = nn.Sequential(*layers)
self.mlp: None | nn.Sequential = None
if output_size is not None:
mlp_layers = [nn.Flatten(), nn.LazyLinear(output_size)]
if with_softmax:
mlp_layers.append(nn.Softmax(dim=-1))
self.mlp = nn.Sequential(*mlp_layers)
self.transpose_input = transpose_input
def forward(self, x: Tensor) -> Tensor:
"""Forward pass."""
if hasattr(self, "transpose_input") and self.transpose_input: # retro compatibility
features = self.cnn(x.permute(*(0, x.dim() - 1, *range(1, x.dim() - 1))))
else:
features = self.cnn(x)
if self.mlp is not None:
return self.mlp(features)
return features
class EcgCNN(ResidualCNN1D):
"""Backbone CNN model for ECG."""
def __init__(
self,
dropout: float = 0.0,
output_size: int = 50,
*,
out_channels: tuple[int, ...] = (32, 64, 128),
with_softmax: bool = False,
):
"""Initialize a basic CNN model for classification."""
super().__init__(
in_channels=1,
out_channels=out_channels,
output_size=output_size,
dropout=dropout,
with_softmax=with_softmax,
)
def forward(self, x: Tensor) -> Tensor:
"""Forward pass."""
return super().forward(x.transpose(1, 2))
def denoise(data):
w = pywt.Wavelet("sym4")
maxlev = pywt.dwt_max_level(len(data), w.dec_len)
threshold = 0.04 # Threshold for filtering
coeffs = pywt.wavedec(data, "sym4", level=maxlev)
for i in range(1, len(coeffs)):
coeffs[i] = pywt.threshold(coeffs[i], threshold * max(coeffs[i]))
datarec = pywt.waverec(coeffs, "sym4")
return datarec
classes = ["N", "L", "R", "A", "V"]
count_classes = Counter(classes)
X = list()
y = list()
# Read files
filenames = next(os.walk(path))[2]
# Split and save .csv , .txt
records = list()
annotations = list()
filenames.sort()
# segrefating filenames and annotations
for f in filenames:
filename, file_extension = os.path.splitext(f)
# *.csv
if file_extension == ".csv":
records.append(path + filename + file_extension)
# *.txt
else:
annotations.append(path + filename + file_extension)
# Records
for r in range(0, len(records)):
signals = []
with open(records[r], "rt") as csvfile:
spamreader = csv.reader(csvfile, delimiter=",", quotechar="|") # read CSV file\
row_index = -1
for row in spamreader:
if row_index >= 0:
signals.insert(row_index, int(row[1]))
row_index += 1
signals = denoise(signals)
# signals = stats.zscore(signals)
# Read anotations: R position and Arrhythmia class
example_beat_printed = False
with open(annotations[r], "r", encoding="utf-8") as fileID:
data = fileID.readlines()
beat = list()
for d in range(1, len(data)): # 0 index is Chart Head
splitted = data[d].split(" ")
splitted = filter(None, splitted)
next(splitted) # Time... Clipping
pos = int(next(splitted)) # Sample ID
arrhythmia_type = next(splitted) # Type
if arrhythmia_type in classes:
if window_size <= pos and pos < (len(signals) - window_size):
beat = signals[pos - window_size : pos + window_size] ## REPLACE WITH R-PEAK DETECTION
X.append(beat)
y.append(arrhythmia_type)
for i in range(0, len(X)):
X[i] = np.append(X[i], y[i])
X_train_df = pd.DataFrame(X)
per_class = X_train_df[X_train_df.shape[1] - 1].value_counts()
inputs = X_train_df.iloc[:, :-1].values # Select all columns except the last one and convert to a NumPy array
inputs = inputs.reshape(inputs.shape[0], inputs.shape[1], 1) # Reshape to (number_of_rows, 360, 1)
# 2. Extract the target column as a NumPy array
targets = X_train_df.iloc[:, -1].values # Select the last column as the target
train_combined = pd.DataFrame.from_dict({"ecg_arrhythmia": inputs.tolist(), "target": targets})
# Check the shape of the first value in the combined column
first_value = train_combined.iloc[0, 0] # Get the first value of the combined column
# Add index for xpdeep
train_val_data, test_data = train_test_split(train_combined, test_size=0.20)
train_data, val_data = train_test_split(train_val_data, test_size=0.20)
# Prepare the dataset and add the 'index_xp_deep' column
train_data["index_xp_deep"] = range(len(train_data))
val_data["index_xp_deep"] = range(len(val_data))
test_data["index_xp_deep"] = range(len(test_data))
ecg_arrhythmia_array = np.array([item for sublist in train_data["ecg_arrhythmia"] for item in sublist], dtype=float)
data_mean = torch.tensor(np.mean(ecg_arrhythmia_array), dtype=torch.float32)
data_scale = torch.tensor(np.var(ecg_arrhythmia_array), dtype=torch.float32)
# Define the schema for the dataset
features_schema = Features({
"ecg_arrhythmia": datasets.Array2D(dtype="float32", shape=(360, 1)),
"target": datasets.Value(dtype="string"),
"index_xp_deep": datasets.Value(dtype="int32"),
})
# Convert the dataframes directly into the desired Parquet format using the `datasets` library
# Train dataset
train_parquet = datasets.Dataset.from_pandas(train_data, preserve_index=False)
train_parquet = train_parquet.cast(features_schema)
train_parquet.to_parquet("train_fast.parquet")
# Validation dataset
val_parquet = datasets.Dataset.from_pandas(val_data, preserve_index=False)
val_parquet = val_parquet.cast(features_schema)
val_parquet.to_parquet("val_fast.parquet")
# Test dataset
test_parquet = datasets.Dataset.from_pandas(test_data, preserve_index=False)
test_parquet = test_parquet.cast(features_schema)
test_parquet.to_parquet("test_fast.parquet")
# 1. Initialize the API connection and set project
init(api_key="api_key", api_url="api_url")
project = Project(id="ECGArrhythmiaId", name="ECG Arrhythmia Tutorial")
set_project(project)
# 2. Upload dataset and analyze
directory = upload(
directory_name="ecg_arrhythmia_uploaded",
train_set_path="train_fast.parquet",
val_set_path="val_fast.parquet",
test_set_path="test_fast.parquet",
)
# 3. Define and analyze schema
analyzed_schema = AnalyzedSchema(
Metadata(name="index_xp_deep"),
UnivariateSynchronousTimeSerie(
name="ecg_arrhythmia",
is_target=False,
size=360,
preprocessor=ScaleUnivariate((360, 1), target_mean=data_mean, target_scale=data_scale),
),
CategoricalFeature(
is_target=True,
name="target",
categories=[],
preprocessor=SklearnPreprocessor(preprocess_function=OneHotEncoder(sparse_output=False)),
),
)
# 4. Create a train dataset from the analyzed schema and fit it
analyzed_train_dataset = AnalyzedParquetDataset(
split_name="train",
identifier_name="my_local_dataset",
path=directory["train_set_path"],
analyzed_schema=analyzed_schema,
)
fit_train_dataset = analyzed_train_dataset.fit()
fitted_schema = fit_train_dataset.fitted_schema
# 5. Create test and validation datasets based on the fitted schema
fit_val_dataset = FittedParquetDataset(
split_name="val",
identifier_name="my_local_dataset",
path=directory["val_set_path"],
fitted_schema=fitted_schema,
)
fit_test_dataset = FittedParquetDataset(
split_name="test",
identifier_name="my_local_dataset",
path=directory["test_set_path"],
fitted_schema=fitted_schema,
)
# ##### Prepare the Model #######
# 1. Set input and target sizes based on the fitted schema
input_size = fitted_schema.input_size[1:]
target_size = fitted_schema.target_size[1] # The number of classes
# Initialize models
backbone = EcgCNN(
with_softmax=False, # No softmax for latent space
output_size=256,
)
feature_extraction = MLP(
norm_layer=None,
flatten_input=True,
dropout=0.2,
input_size=256,
hidden_channels=[128, 64],
)
task_learner = MLP(
norm_layer=None,
input_size=64,
hidden_channels=[target_size],
last_activation=partial(torch.nn.Softmax, dim=-1),
)
# 3. Specify the model's structure and constraints
model_specifications = ModelDecisionGraphParameters(
graph_depth=3,
discrimination_weight=0.6,
target_homogeneity_weight=0.5,
prune_step=21,
target_homogeneity_pruning_threshold=0.8,
population_pruning_threshold=0.05,
balancing_weight=0.8,
)
# 4. Create the explainable model using the xpdeep API
xpdeep_model = XpdeepModel.from_torch(
fitted_schema=fitted_schema,
feature_extraction=feature_extraction,
task_learner=task_learner,
backbone=backbone,
decision_graph_parameters=model_specifications,
)
# ##### Train #######
# 1. Define metrics and callbacks for training
metrics = DictMetrics(
multi_class_F1_score=partial(MulticlassF1Score, average="macro", num_classes=target_size),
multi_class_accuracy=partial(MulticlassAccuracy, num_classes=target_size),
confusion_matrix=partial(MulticlassConfusionMatrix, num_classes=target_size, normalize="all"),
)
callbacks = [
EarlyStopping(monitoring_metric="Total loss", mode="minimize", patience=5),
Scheduler(pre_scheduler=partial(ReduceLROnPlateau), step_method="epoch", monitoring_metric="Total loss"),
ModelCheckpoint(monitoring_metric="global_multi_class_F1_score", mode="minimize"),
]
# 2. Define optimizer
optimizer = partial(torch.optim.AdamW, lr=0.001)
# 3. Create and run the trainer
trainer = Trainer(
loss=CrossEntropyLossFromProbabilities(reduction="none"),
optimizer=optimizer,
callbacks=callbacks,
start_epoch=0,
max_epochs=40,
metrics=metrics,
)
trained_model = trainer.train(
model=xpdeep_model,
train_set=fit_train_dataset,
validation_set=fit_val_dataset,
batch_size=4096,
)
# ##### Explain #######
# 1. Build the Explainer
statistics = DictStats(
distribution_target=DistributionStat(on="target"), distribution_prediction=DistributionStat(on="prediction")
)
quality_metrics = [Sensitivity(), Infidelity()]
explainer = Explainer(
description_representativeness=1000, quality_metrics=quality_metrics, metrics=metrics, statistics=statistics
)
# 2. Generate global model explanations
model_explanations = explainer.global_explain(
trained_model,
train_set=fit_train_dataset,
validation_set=fit_val_dataset,
test_set=fit_test_dataset,
)
visualisation_link = model_explanations.visualisation_link
print(visualisation_link)
Gas Price dataset#
Gas Price is a dataset for forecasting with time-series inputs.
Please download the dataset here and update the tutorial data path accordingly. Natural gas account for 1/4 of the global demand and roughly 1/3 of the US energy demand. After oil, Natural gas is the most dominate sort of energy. So, being about to improve natural gas demand prediction is extremely valuable. The dataset is designed for the task of forecasting natural gas prices.
👀 Full file preview
import numpy as np
import pandas as pd
import torch
from sklearn.model_selection import train_test_split
import datasets
from datasets import Features
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torchmetrics import MeanSquaredError, MeanAbsoluteError
from xpdeep import init, set_project, Project
from xpdeep.dataset.parquet_dataset import FittedParquetDataset
from xpdeep.dataset.schema.feature.feature import Metadata, UnivariateAsynchronousTimeSerie
from xpdeep.dataset.schema.preprocessor import TorchPreprocessor
from xpdeep.dataset.schema.schema import FittedSchema
from xpdeep.dataset.upload import upload
from xpdeep.explain.explainer import Explainer
from xpdeep.explain.quality_metrics import Sensitivity, Infidelity
from xpdeep.explain.statistic import DictStats, HistogramStat, VarianceStat
from xpdeep.metrics.metric import DictMetrics, TorchGlobalMetric, TorchLeafMetric
from xpdeep.model.model_builder import ModelDecisionGraphParameters
from xpdeep.model.xpdeep_model import XpdeepModel
from xpdeep.model.zoo.mlp import MLP
from xpdeep.trainer.callbacks import EarlyStopping, Scheduler
from xpdeep.trainer.trainer import Trainer
from functools import partial
torch.random.manual_seed(42)
class ScaleUnivariate(TorchPreprocessor):
def __init__(self, input_size: tuple[int, ...], mean: torch.Tensor, scale: torch.Tensor):
"""ScaleGasPriceTarget."""
super().__init__(input_size)
self.mean = torch.nn.Parameter(mean, requires_grad=False)
self.scale = torch.nn.Parameter(scale, requires_grad=False)
def transform(self, inputs: torch.Tensor) -> torch.Tensor:
"""Transform the target (Price)."""
return (inputs - self.mean) / self.scale
def inverse_transform(self, output: torch.Tensor) -> torch.Tensor:
"""Inverse the transformation for target (Price)."""
return output * self.scale + self.mean
class GazMlp(MLP):
"""Cnn for time series forecasting."""
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""Forward pass of the model."""
x = super().forward(x)
return x.reshape(-1, 5, 1)
# ##### Prepare the Dataset #######
# 1. Load and preprocess data
data = pd.read_csv("gas_price/daily_csv.csv")
data = data.set_index("Date")
data["Price"].fillna(data["Price"].mean(), inplace=True)
# Create the samples for time series prediction
lookback = 24
horizon = 5
num_samples = len(data) - lookback - horizon + 1
data_input_numpy = data.values
data_target_numpy = data[["Price"]].values
# Generate tensor slices for inputs and targets
tensor_slices = torch.arange(lookback + horizon).unsqueeze(0) + torch.arange(num_samples).unsqueeze(1)
input_slices = tensor_slices[:, :lookback]
target_slices = tensor_slices[:, lookback:]
transformed_inputs = torch.gather(
torch.from_numpy(data_input_numpy).unsqueeze(0).repeat(num_samples, 1, 1), 1, input_slices.unsqueeze(-1)
).numpy()
transformed_targets = torch.gather(
torch.from_numpy(data_target_numpy).unsqueeze(0).repeat(num_samples, 1, 1), 1, target_slices.unsqueeze(-1)
).numpy()
# Store the processed data in a DataFrame
data = pd.DataFrame({"Price": transformed_inputs.tolist(), "target Price": transformed_targets.tolist()})
# Split dataset into training and test sets and add indices for xpdeep
train_data, test_data = train_test_split(data, test_size=0.2, random_state=42)
# Calculate mean and scale for the target (Price)
price_array = np.array([item for sublist in train_data["Price"] for item in sublist], dtype=float)
# Now you can compute the mean and variance
data_mean = torch.tensor(np.mean(price_array), dtype=torch.float32)
data_scale = torch.tensor(np.var(price_array), dtype=torch.float32)
target_price_array = np.array([item for sublist in train_data["target Price"] for item in sublist], dtype=float)
# Now you can compute the mean and variance
target_mean = torch.tensor(np.mean(target_price_array), dtype=torch.float32)
target_scale = torch.tensor(np.var(target_price_array), dtype=torch.float32)
train_data["index_xp_deep"] = range(len(train_data))
test_data["index_xp_deep"] = range(len(test_data))
# Save directly to Parquet format with schema using `datasets`
features_schema = Features({
"Price": datasets.Array2D(dtype="float32", shape=(24, 1)),
"target Price": datasets.Array2D(dtype="float32", shape=(5, 1)),
"index_xp_deep": datasets.Value(dtype="int32"),
})
# Create datasets and save as Parquet
train_parquet = datasets.Dataset.from_pandas(train_data, preserve_index=False).cast(features_schema)
train_parquet.to_parquet("train.parquet")
test_parquet = datasets.Dataset.from_pandas(test_data, preserve_index=False).cast(features_schema)
test_parquet.to_parquet("test.parquet")
# Use the test dataset as validation set
val_parquet = test_parquet
val_parquet.to_parquet("val.parquet")
# 2. Initialize the API connection and set project
init(api_key="api_key", api_url="api_url")
project = Project(id="GasPriceId", name="Gas Price Tutorial")
set_project(project)
# 3. Upload datasets and create the schema
directory = upload(
directory_name="gas_price_dataset_uploaded",
train_set_path="train.parquet",
test_set_path="test.parquet",
val_set_path="val.parquet",
)
fitted_schema = FittedSchema(
Metadata(name="index_xp_deep"),
UnivariateAsynchronousTimeSerie(
name="Price",
size=24,
preprocessor=ScaleUnivariate((24, 1), mean=data_mean, scale=data_scale), # TODO ERROR MEAN AND VAR
),
UnivariateAsynchronousTimeSerie(
name="target Price",
is_target=True,
size=5,
mirrored_channel="Price",
preprocessor=ScaleUnivariate((5, 1), mean=target_mean, scale=target_scale),
),
)
# Create train, test, and validation datasets using the fitted schema
fit_train_dataset = FittedParquetDataset(
split_name="train",
identifier_name="my_local_dataset",
path=directory["train_set_path"],
fitted_schema=fitted_schema,
)
fit_test_dataset = FittedParquetDataset(
split_name="test",
identifier_name="my_local_dataset",
path=directory["test_set_path"],
fitted_schema=fitted_schema,
)
fit_val_dataset = FittedParquetDataset(
split_name="val",
identifier_name="my_local_dataset",
path=directory["val_set_path"],
fitted_schema=fitted_schema,
)
# ##### Prepare the Model #######
# 1. Set input and target sizes based on the fitted schema
input_size = fitted_schema.input_size[1:]
target_size = fitted_schema.target_size[1:] # Should be 1 for regression
# 2. Initialize models for feature extraction and task learning
# Initialize models
feature_extraction = MLP(
norm_layer=partial(torch.nn.BatchNorm1d, track_running_stats=False),
flatten_input=True,
dropout=0.2,
input_size=24 * 1,
hidden_channels=[128, 64, 32],
)
task_learner = GazMlp(input_size=32, hidden_channels=[target_size[0]])
# 3. Specify the model's structure and constraints
model_specifications = ModelDecisionGraphParameters(
graph_depth=3,
discrimination_weight=0.2,
target_homogeneity_weight=0.9,
target_homogeneity_pruning_threshold=0.85,
population_pruning_threshold=0.05,
balancing_weight=0.1,
prune_step=10,
)
# 4. Create the explainable model using the xpdeep API
xpdeep_model = XpdeepModel.from_torch(
fitted_schema=fitted_schema,
feature_extraction=feature_extraction,
task_learner=task_learner,
backbone=None,
decision_graph_parameters=model_specifications,
)
# ##### Train #######
# 1. Define metrics and callbacks for training
metrics = DictMetrics(
mse=TorchGlobalMetric(metric=partial(MeanSquaredError), on_raw_data=True),
mae=TorchGlobalMetric(metric=partial(MeanAbsoluteError), on_raw_data=True),
leaf_metric_mse=TorchLeafMetric(metric=partial(MeanSquaredError), on_raw_data=True),
)
callbacks = [
EarlyStopping(monitoring_metric="mse", mode="minimize", patience=10),
Scheduler(
pre_scheduler=partial(ReduceLROnPlateau, patience=5, mode="min"),
step_method="epoch",
monitoring_metric="Total loss",
),
]
# 2. Define optimizer
optimizer = partial(torch.optim.AdamW, lr=0.001, fused=True)
# 3. Create and run the trainer
trainer = Trainer(
loss=torch.nn.MSELoss(reduction="none"),
optimizer=optimizer,
callbacks=callbacks,
start_epoch=0,
max_epochs=39,
metrics=metrics,
)
trained_model = trainer.train(
model=xpdeep_model,
train_set=fit_train_dataset,
validation_set=fit_val_dataset,
batch_size=2048,
)
# ##### Explain #######
# 1. Build the Explainer
statistics = DictStats(
histogram_target=HistogramStat(on="target", num_bins=20, num_items=1000, on_raw_data=True),
histogram_prediction=HistogramStat(on="prediction", num_bins=20, num_items=1000, on_raw_data=True),
histogram_error=HistogramStat(on="prediction_error", num_bins=20, num_items=1000, on_raw_data=True),
variance_target=VarianceStat(on="target", on_raw_data=True),
variance_prediction=VarianceStat(on="prediction", on_raw_data=True),
distribution_input=HistogramStat(on="input", num_bins=20, num_items=1000, feature_name="Price", on_raw_data=True),
)
# Here we add a per-time stamp mse (num_outputs is the number of timestamps)
leaf_metric_per_timestamp_mse = TorchLeafMetric(
metric=partial(MeanSquaredError, num_outputs=5), on_raw_data=True, reduced_dimensions=[0, 2]
)
metrics.update({"leaf_metric_per_timestamp_mse": leaf_metric_per_timestamp_mse})
quality_metrics = [Sensitivity(), Infidelity()]
explainer = Explainer(
description_representativeness=1000, quality_metrics=quality_metrics, metrics=metrics, statistics=statistics
)
# 2. Generate global model explanations
model_explanations = explainer.global_explain(
trained_model,
train_set=fit_train_dataset,
test_set=fit_test_dataset,
validation_set=fit_val_dataset,
)
visualisation_link = model_explanations.visualisation_link
print(visualisation_link)