Other examples#
Banking dataset#
Banking is a dataset for binary classification from tabular inputs.
Please download the dataset here and update the tutorial data path accordingly.
The data is related to direct marketing campaigns (phone calls) of a Portuguese banking institution. The classification goal is to predict if the client will subscribe to a term deposit (variable y).
👀 Full file preview
from functools import partial
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import torch
from sklearn.model_selection import train_test_split
from torch import nn
from torch.nn import Sequential
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torchmetrics.classification import MulticlassAccuracy, MulticlassConfusionMatrix, MulticlassF1Score
from xpdeep import init, set_project
from xpdeep.dataset.parquet_dataset import FittedParquetDataset, ParquetDataset
from xpdeep.dataset.upload import upload
from xpdeep.explain.explainer import Explainer
from xpdeep.explain.quality_metrics import Infidelity, Sensitivity
from xpdeep.explain.statistic import DictStats, DistributionStat
from xpdeep.metric import DictMetrics, TorchGlobalMetric, TorchLeafMetric
from xpdeep.model.model_builder import ModelDecisionGraphParameters
from xpdeep.model.xpdeep_model import XpdeepModel
from xpdeep.model.zoo.cross_entropy_loss_from_proba import CrossEntropyLossFromProbabilities
from xpdeep.project import Project, get_project
from xpdeep.trainer.callbacks import EarlyStopping, ModelCheckpoint, Scheduler
from xpdeep.trainer.trainer import Trainer
"""Process the dataset, train, and explain the model."""
torch.random.manual_seed(42)
# ##### Prepare the Dataset #######
init(api_key="api_key", api_url="api_url")
set_project(Project.create_or_get(name="Banking Dataset Tutorial"))
# 1. Load and preprocess data
data = pd.read_csv("banking_dataset/train.csv", sep=";")
filtered_data = data[data["y"] == "no"].sample(n=10000, random_state=42)
data_train = pd.concat([data[data["y"] != "no"], filtered_data])
test_data = pd.read_csv("banking_dataset/test.csv", sep=";")
# 2. Split training set into training and validation
train_data, val_data = train_test_split(data_train, test_size=0.15, random_state=42)
# 4. Convert to pyarrow Table format and save as parquet files
pq.write_table(pa.Table.from_pandas(train_data, preserve_index=False), "train.parquet")
pq.write_table(pa.Table.from_pandas(val_data, preserve_index=False), "val.parquet")
pq.write_table(pa.Table.from_pandas(test_data, preserve_index=False), "test.parquet")
# 5. Upload dataset and analyze
directory = upload(
directory_name="banking_dataset",
train_set_path="train.parquet",
test_set_path="test.parquet",
val_set_path="val.parquet",
)
train_dataset = ParquetDataset(
split_name="train",
identifier_name="my_local_dataset",
path=directory["train_set_path"],
)
analyzed_train_dataset = train_dataset.analyze(target_names=["y"])
fit_train_dataset = analyzed_train_dataset.fit()
# Create test and validation datasets based on the fitted schema
fit_test_dataset = FittedParquetDataset(
split_name="test",
identifier_name="my_local_dataset",
path=directory["test_set_path"],
fitted_schema=fit_train_dataset.fitted_schema,
)
fit_val_dataset = FittedParquetDataset(
split_name="val",
identifier_name="my_local_dataset",
path=directory["val_set_path"],
fitted_schema=fit_train_dataset.fitted_schema,
)
# ##### Prepare the Model #######
# 1. Set input and target sizes based on the fitted schema
input_size = fit_train_dataset.fitted_schema.input_size[1]
target_size = fit_train_dataset.fitted_schema.target_size[1]
# 2. Initialize models for feature extraction and task learning
feature_extraction = Sequential(nn.Linear(input_size, 128), nn.ReLU(), nn.Linear(128, 50), nn.ReLU())
task_learner = Sequential(nn.Linear(50, target_size), nn.Softmax(dim=1))
# 3. Specify the model's structure and constraints
model_specifications = ModelDecisionGraphParameters(
graph_depth=3,
target_homogeneity_pruning_threshold=0.8,
population_pruning_threshold=0.01,
prune_step=11,
target_homogeneity_weight=0.7,
discrimination_weight=0.8,
balancing_weight=0.3,
)
# 4. Create the explainable model using the xpdeep API
xpdeep_model = XpdeepModel.from_torch(
fitted_schema=fit_train_dataset.fitted_schema,
feature_extraction=feature_extraction,
task_learner=task_learner,
backbone=None,
decision_graph_parameters=model_specifications,
)
# ##### Train #######
# 1. Define metrics and callbacks for training
metrics = DictMetrics(
global_multi_class_accuracy=TorchGlobalMetric(
partial(MulticlassAccuracy, num_classes=target_size, average="micro"), target_as_indexes=True
),
leaf_multi_class_accuracy=TorchLeafMetric(
partial(MulticlassAccuracy, num_classes=target_size, average="micro"), target_as_indexes=True
),
global_multi_class_F1_score=TorchGlobalMetric(
partial(MulticlassF1Score, num_classes=target_size, average="macro"), target_as_indexes=True
),
leaf_multi_class_F1_score=TorchLeafMetric(
partial(MulticlassF1Score, num_classes=target_size, average="macro"), target_as_indexes=True
),
global_confusion_matrix=TorchGlobalMetric(
partial(MulticlassConfusionMatrix, normalize="all", num_classes=target_size), target_as_indexes=True
),
leaf_confusion_matrix=TorchLeafMetric(
partial(MulticlassConfusionMatrix, normalize="all", num_classes=target_size), target_as_indexes=True
),
)
callbacks = [
EarlyStopping(monitoring_metric="Total loss", mode="minimize", patience=5),
Scheduler(pre_scheduler=partial(ReduceLROnPlateau), step_method="epoch", monitoring_metric="Total loss"),
ModelCheckpoint(monitoring_metric="global_multi_class_F1_score", mode="minimize"),
]
# 2. Define optimizer
optimizer = partial(torch.optim.AdamW, lr=0.001, foreach=False, fused=False)
# 3. Create and run the trainer
trainer = Trainer(
loss=CrossEntropyLossFromProbabilities(reduction="none"),
optimizer=optimizer,
callbacks=callbacks,
start_epoch=0,
max_epochs=20,
metrics=metrics,
)
trained_model = trainer.train(
model=xpdeep_model,
train_set=fit_train_dataset,
validation_set=fit_val_dataset,
batch_size=2048,
)
# ##### Explain #######
# 1. Build the Explainer
statistics = DictStats(
distribution_target=DistributionStat(on="target"), distribution_prediction=DistributionStat(on="prediction")
)
quality_metrics = [Sensitivity(), Infidelity()]
explainer = Explainer(
description_representativeness=1000, quality_metrics=quality_metrics, metrics=metrics, statistics=statistics
)
# 2. Generate global model explanations
model_explanations = explainer.global_explain(
trained_model,
train_set=fit_train_dataset,
test_set=fit_test_dataset,
validation_set=fit_val_dataset,
)
print(model_explanations.visualisation_link)
Insurance dataset#
Insurance is a dataset for regression from tabular inputs.
Please download the dataset here and update the tutorial data path accordingly.
The "Insurance Dataset for Predicting Health Insurance Premiums in the US" is a collection of data on various factors that can influence medical costs and premiums for health insurance in the United States. The dataset includes information on 10 variables, including age, gender, body mass index (BMI), number of children, smoking status, region, income, education, occupation, and type of insurance plan.
👀 Full file preview
from functools import partial
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import torch
from sklearn.model_selection import train_test_split
from torch import nn
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torchmetrics import MeanSquaredError
from xpdeep import init, set_project
from xpdeep.dataset.parquet_dataset import FittedParquetDataset, ParquetDataset
from xpdeep.dataset.upload import upload
from xpdeep.explain.explainer import Explainer
from xpdeep.explain.quality_metrics import Infidelity, Sensitivity
from xpdeep.explain.statistic import DictStats, HistogramStat, VarianceStat
from xpdeep.metric import DictMetrics, TorchGlobalMetric, TorchLeafMetric
from xpdeep.model.model_builder import ModelDecisionGraphParameters
from xpdeep.model.xpdeep_model import XpdeepModel
from xpdeep.project import Project, get_project
from xpdeep.trainer.callbacks import EarlyStopping, ModelCheckpoint, Scheduler
from xpdeep.trainer.trainer import Trainer
"""Process the dataset, train, and explain the model."""
torch.random.manual_seed(42)
# ##### Prepare the Dataset #######
init(api_key="api_key", api_url="api_url")
set_project(Project.create_or_get(name="Insurance Dataset Tutorial"))
# 1. Load and preprocess data
data = pd.read_csv("insurance/insurance_dataset.csv")
data["medical_history"] = data["medical_history"].fillna("No Record")
data["family_medical_history"] = data["family_medical_history"].fillna("No Record")
data.rename(columns={"medical_history": "indv_medical_history"}, inplace=True) # noqa: PD002
# Split dataset into training and test sets
train_data, test_data = train_test_split(data, test_size=0.25, random_state=42)
# Convert to pyarrow Table format and save as parquet files
pq.write_table(pa.Table.from_pandas(train_data, preserve_index=False), "train.parquet")
pq.write_table(pa.Table.from_pandas(test_data, preserve_index=False), "test.parquet")
# Upload dataset and analyze
directory = upload(
directory_name="insurance_dataset_uploaded",
train_set_path="train.parquet",
test_set_path="test.parquet",
)
# Create and fit training dataset
train_dataset = ParquetDataset(
split_name="train",
identifier_name="my_local_dataset",
path=directory["train_set_path"],
)
analyzed_train_dataset = train_dataset.analyze(target_names=["charges"])
fit_train_dataset = analyzed_train_dataset.fit()
# Create test dataset based on the fitted schema
fit_test_dataset = FittedParquetDataset(
split_name="test",
identifier_name="my_local_dataset",
path=directory["test_set_path"],
fitted_schema=fit_train_dataset.fitted_schema,
)
# ##### Prepare the Model #######
# 1. Set input and target sizes based on the fitted schema
input_size = fit_train_dataset.fitted_schema.input_size[1]
target_size = fit_train_dataset.fitted_schema.target_size[1] # Should be 1 for regression
# 2. Initialize models for feature extraction and task learning
feature_extraction = nn.Sequential(
nn.Linear(input_size, 128), nn.ReLU(), nn.Linear(128, 128), nn.ReLU(), nn.Linear(128, 64), nn.ReLU()
)
task_learner = nn.Sequential(nn.Linear(64, target_size))
# 3. Specify the model's structure and constraints
model_specifications = ModelDecisionGraphParameters(
graph_depth=3,
discrimination_weight=0.1,
target_homogeneity_weight=0.5,
target_homogeneity_pruning_threshold=0.8,
population_pruning_threshold=0.2,
balancing_weight=0.4,
prune_step=7,
)
# 4. Create the explainable model using the xpdeep API
xpdeep_model = XpdeepModel.from_torch(
fitted_schema=fit_train_dataset.fitted_schema,
feature_extraction=feature_extraction,
task_learner=task_learner,
backbone=None,
decision_graph_parameters=model_specifications,
)
# ##### Train #######
# 1. Define metrics and callbacks for training
metrics = DictMetrics(
mse=TorchGlobalMetric(metric=partial(MeanSquaredError), on_raw_data=True),
leaf_metric_mse=TorchLeafMetric(metric=partial(MeanSquaredError), on_raw_data=True),
rmse=TorchGlobalMetric(partial(MeanSquaredError, squared=False), on_raw_data=True),
leaf_metricrmse=TorchLeafMetric(partial(MeanSquaredError, squared=False), on_raw_data=True),
)
# Here as there is no validation set the only available metric is "Total loss" which is the total train loss
# per epoch.
callbacks = [
EarlyStopping(monitoring_metric="Total loss", mode="minimize", patience=10),
Scheduler(pre_scheduler=partial(ReduceLROnPlateau), step_method="epoch", monitoring_metric="Total loss"),
ModelCheckpoint(monitoring_metric="Total loss", mode="minimize"),
]
# 2. Define optimizer
optimizer = partial(torch.optim.AdamW, lr=0.001, foreach=False, fused=False)
# 3. Create and run the trainer
trainer = Trainer(
loss=torch.nn.MSELoss(reduction="none"),
optimizer=optimizer,
callbacks=callbacks,
start_epoch=0,
max_epochs=10,
metrics=metrics,
)
trained_model = trainer.train(
model=xpdeep_model,
train_set=fit_train_dataset,
validation_set=None,
batch_size=4096,
)
# ##### Explain #######
# 1. Build the Explainer
statistics = DictStats(
histogram_target=HistogramStat(on="target", num_bins=20, num_items=1000, on_raw_data=True),
histogram_prediction=HistogramStat(on="prediction", num_bins=20, num_items=1000, on_raw_data=True),
histogram_error=HistogramStat(on="prediction_error", num_bins=20, num_items=1000, on_raw_data=True),
variance_target=VarianceStat(on="target", on_raw_data=True),
variance_prediction=VarianceStat(on="prediction", on_raw_data=True),
)
quality_metrics = [Sensitivity(), Infidelity()]
explainer = Explainer(
description_representativeness=1000, quality_metrics=quality_metrics, metrics=metrics, statistics=statistics
)
# 2. Generate global model explanations
model_explanations = explainer.global_explain(
trained_model,
train_set=fit_train_dataset,
test_set=fit_test_dataset,
)
print(model_explanations.visualisation_link)
ECG dataset#
ECG is a dataset for regression with time-series inputs.
Please download the dataset here and update the tutorial data path accordingly. The MIT-BIH Arrhythmia Database contains 48 half-hour excerpts of two-channel ambulatory ECG recordings, obtained from 47 subjects studied by the BIH Arrhythmia Laboratory between 1975 and 1979. Twenty-three recordings were chosen at random from a set of 4,000 24-hour ambulatory ECG recordings collected from a mixed population of inpatients (about 60%) and outpatients (about 40%) at Boston's Beth Israel Hospital. The remaining 25 recordings were selected from the same set to include less common but clinically significant arrhythmias that would not be well-represented in a small random sample.
👀 Full file preview
import csv
import os
from functools import partial
from pathlib import Path
import datasets
import numpy as np
import pandas as pd
import pywt
import torch
from datasets import Features
from models import EcgCNN
from preprocessors import ScaleUnivariate
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torchmetrics.classification import MulticlassAccuracy, MulticlassConfusionMatrix, MulticlassF1Score
from xpdeep import init, set_project
from xpdeep.dataset.parquet_dataset import AnalyzedParquetDataset, FittedParquetDataset
from xpdeep.dataset.schema.feature.feature import CategoricalFeature, UnivariateTimeSeries
from xpdeep.dataset.schema.preprocessor import SklearnPreprocessor
from xpdeep.dataset.schema.schema import AnalyzedSchema
from xpdeep.dataset.upload import upload
from xpdeep.explain.explainer import Explainer
from xpdeep.explain.quality_metrics import Infidelity, Sensitivity
from xpdeep.explain.statistic import DictStats, DistributionStat
from xpdeep.metric import DictMetrics, TorchGlobalMetric, TorchLeafMetric
from xpdeep.model.model_builder import ModelDecisionGraphParameters
from xpdeep.model.xpdeep_model import XpdeepModel
from xpdeep.model.zoo.cross_entropy_loss_from_proba import CrossEntropyLossFromProbabilities
from xpdeep.model.zoo.mlp import MLP
from xpdeep.project import Project, get_project
from xpdeep.trainer.callbacks import EarlyStopping, ModelCheckpoint, Scheduler
from xpdeep.trainer.trainer import Trainer
"""Process the dataset, train, and explain the model."""
torch.random.manual_seed(42)
# Prepare the Dataset #######
init(api_key="api_key", api_url="api_url")
set_project(Project.create_or_get(name="ECG Arrhythmia Tutorial"))
path = "mitbih_database/"
window_size = 180
def denoise(data: list[float] | np.ndarray) -> np.ndarray:
"""Denoise the ECG signal using wavelet decomposition."""
w = pywt.Wavelet("sym4")
maxlev = pywt.dwt_max_level(len(data), w.dec_len)
threshold = 0.04 # Threshold for filtering
coeffs = pywt.wavedec(data, "sym4", level=maxlev)
for i in range(1, len(coeffs)):
coeffs[i] = pywt.threshold(coeffs[i], threshold * max(coeffs[i]))
return pywt.waverec(coeffs, "sym4")
classes = ["N", "L", "R", "A", "V"]
x = []
y = []
# Read files
filenames = next(os.walk(path))[2]
filenames.sort()
# Segregate filenames and annotations
records = []
annotations = []
# segrefating filenames and annotations
for f in filenames:
file_path = Path(path) / f
if file_path.suffix == ".csv":
records.append(file_path)
else:
annotations.append(file_path)
# Records
for r in range(len(records)):
signals = []
with records[r].open(mode="r", encoding="utf-8") as csvfile:
spamreader = csv.reader(csvfile, delimiter=",", quotechar="|")
row_index = -1
for row in spamreader:
if row_index >= 0:
signals.insert(row_index, int(row[1]))
row_index += 1
signals = denoise(signals)
# signals = stats.zscore(signals)
# Read anotations: R position and Arrhythmia class
with annotations[r].open(mode="r", encoding="utf-8") as file_id:
data = file_id.readlines()
for d in range(1, len(data)): # 0 index is Chart Head
splitted = filter(None, data[d].split(" "))
next(splitted) # Skip time
pos = int(next(splitted)) # R-peak sample ID
arrhythmia_type = next(splitted)
# Combine nested `if` statements into a single condition
if arrhythmia_type in classes and window_size <= pos < (len(signals) - window_size):
beat = signals[pos - window_size : pos + window_size]
x.append(beat)
y.append(arrhythmia_type)
# Append label to each beat
for i in range(len(x)):
x[i] = np.append(x[i], y[i])
x_train_df = pd.DataFrame(x) # Lowercase variable name
# Convert input features and targets to NumPy arrays using `.to_numpy()`
inputs = x_train_df.iloc[:, :-1].to_numpy()
inputs = inputs.reshape(inputs.shape[0], inputs.shape[1], 1)
targets = x_train_df.iloc[:, -1].to_numpy()
train_combined = pd.DataFrame.from_dict({"ecg_arrhythmia": inputs.tolist(), "target": targets})
# Split dataset
train_val_data, test_data = train_test_split(train_combined, test_size=0.20)
train_data, val_data = train_test_split(train_val_data, test_size=0.20)
# Compute mean and variance for scaling
ecg_arrhythmia_array = np.array([item for sublist in train_data["ecg_arrhythmia"] for item in sublist], dtype=float)
data_mean = torch.tensor(np.mean(ecg_arrhythmia_array), dtype=torch.float32)
data_scale = torch.tensor(np.var(ecg_arrhythmia_array), dtype=torch.float32)
# Define the schema for the dataset
features_schema = Features({
"ecg_arrhythmia": datasets.Array2D(dtype="float32", shape=(360, 1)),
"target": datasets.Value(dtype="string"),
})
# Convert the dataframes directly into the desired Parquet format using the `datasets` library
# Train dataset
train_parquet = datasets.Dataset.from_pandas(train_data, preserve_index=False)
train_parquet = train_parquet.cast(features_schema)
train_parquet.to_parquet("train_fast.parquet")
# Validation dataset
val_parquet = datasets.Dataset.from_pandas(val_data, preserve_index=False)
val_parquet = val_parquet.cast(features_schema)
val_parquet.to_parquet("val_fast.parquet")
# Test dataset
test_parquet = datasets.Dataset.from_pandas(test_data, preserve_index=False)
test_parquet = test_parquet.cast(features_schema)
test_parquet.to_parquet("test_fast.parquet")
# 1. Upload dataset and analyze
directory = upload(
directory_name="ecg_arrhythmia_uploaded",
train_set_path="train_fast.parquet",
val_set_path="val_fast.parquet",
test_set_path="test_fast.parquet",
)
# 2. Define and analyze schema
analyzed_schema = AnalyzedSchema(
UnivariateTimeSeries(
asynchronous=False,
name="ecg_arrhythmia",
is_target=False,
preprocessor=ScaleUnivariate((360, 1), mean=data_mean, scale=data_scale),
),
CategoricalFeature(
is_target=True,
name="target",
preprocessor=SklearnPreprocessor(preprocess_function=OneHotEncoder(sparse_output=False)),
),
)
# 3. Create a train dataset from the analyzed schema and fit it
analyzed_train_dataset = AnalyzedParquetDataset(
split_name="train",
identifier_name="my_local_dataset",
path=directory["train_set_path"],
analyzed_schema=analyzed_schema,
)
fit_train_dataset = analyzed_train_dataset.fit()
fitted_schema = fit_train_dataset.fitted_schema
# 4. Create test and validation datasets based on the fitted schema
fit_val_dataset = FittedParquetDataset(
split_name="val",
identifier_name="my_local_dataset",
path=directory["val_set_path"],
fitted_schema=fitted_schema,
)
fit_test_dataset = FittedParquetDataset(
split_name="test",
identifier_name="my_local_dataset",
path=directory["test_set_path"],
fitted_schema=fitted_schema,
)
# ##### Prepare the Model #######
# 1. Set input and target sizes based on the fitted schema
target_size = fitted_schema.target_size[1] # The number of classes
# Initialize models
backbone = EcgCNN(
with_softmax=False, # No softmax for latent space
output_size=256,
)
feature_extraction = MLP(
norm_layer=None,
flatten_input=True,
dropout=0.2,
input_size=256,
hidden_channels=[128, 64],
)
task_learner = MLP(
norm_layer=None,
input_size=64,
hidden_channels=[target_size],
last_activation=partial(torch.nn.Softmax, dim=-1),
)
# 3. Specify the model's structure and constraints
model_specifications = ModelDecisionGraphParameters(
graph_depth=3,
discrimination_weight=0.6,
target_homogeneity_weight=0.5,
prune_step=21,
target_homogeneity_pruning_threshold=0.8,
population_pruning_threshold=0.05,
balancing_weight=0.8,
)
# 4. Create the explainable model using the xpdeep API
xpdeep_model = XpdeepModel.from_torch(
fitted_schema=fitted_schema,
feature_extraction=feature_extraction,
task_learner=task_learner,
backbone=backbone,
decision_graph_parameters=model_specifications,
)
# ##### Train #######
# 1. Define metrics and callbacks for training
metrics = DictMetrics(
global_multi_class_accuracy=TorchGlobalMetric(
partial(MulticlassAccuracy, num_classes=target_size, average="micro"), target_as_indexes=True
),
leaf_multi_class_accuracy=TorchLeafMetric(
partial(MulticlassAccuracy, num_classes=target_size, average="micro"), target_as_indexes=True
),
global_multi_class_F1_score=TorchGlobalMetric(
partial(MulticlassF1Score, num_classes=target_size, average="macro"), target_as_indexes=True
),
leaf_multi_class_F1_score=TorchLeafMetric(
partial(MulticlassF1Score, num_classes=target_size, average="macro"), target_as_indexes=True
),
global_confusion_matrix=TorchGlobalMetric(
partial(MulticlassConfusionMatrix, normalize="all", num_classes=target_size), target_as_indexes=True
),
leaf_confusion_matrix=TorchLeafMetric(
partial(MulticlassConfusionMatrix, normalize="all", num_classes=target_size), target_as_indexes=True
),
)
callbacks = [
EarlyStopping(monitoring_metric="Total loss", mode="minimize", patience=5),
Scheduler(pre_scheduler=partial(ReduceLROnPlateau), step_method="epoch", monitoring_metric="Total loss"),
ModelCheckpoint(monitoring_metric="global_multi_class_F1_score", mode="minimize"),
]
# 2. Define optimizer
optimizer = partial(torch.optim.AdamW, lr=0.001, foreach=False, fused=False)
# 3. Create and run the trainer
trainer = Trainer(
loss=CrossEntropyLossFromProbabilities(reduction="none"),
optimizer=optimizer,
callbacks=callbacks,
start_epoch=0,
max_epochs=40,
metrics=metrics,
)
trained_model = trainer.train(
model=xpdeep_model,
train_set=fit_train_dataset,
validation_set=fit_val_dataset,
batch_size=4096,
)
# ##### Explain #######
# 1. Build the Explainer
statistics = DictStats(
distribution_target=DistributionStat(on="target"), distribution_prediction=DistributionStat(on="prediction")
)
quality_metrics = [Sensitivity(), Infidelity()]
explainer = Explainer(
description_representativeness=1000, quality_metrics=quality_metrics, metrics=metrics, statistics=statistics
)
# 2. Generate global model explanations
model_explanations = explainer.global_explain(
trained_model,
train_set=fit_train_dataset,
validation_set=fit_val_dataset,
test_set=fit_test_dataset,
)
print(model_explanations.visualisation_link)
Gas Price dataset#
Gas Price is a dataset for forecasting with time-series inputs.
Please download the dataset here and update the tutorial data path accordingly. Natural gas account for 1/4 of the global demand and roughly 1/3 of the US energy demand. After oil, Natural gas is the most dominate sort of energy. So, being about to improve natural gas demand prediction is extremely valuable. The dataset is designed for the task of forecasting natural gas prices.
👀 Full file preview
from functools import partial
import datasets
import numpy as np
import pandas as pd
import torch
from datasets import Features
from models import GazMlp
from preprocessors import ScaleUnivariate
from sklearn.model_selection import train_test_split
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torchmetrics import MeanAbsoluteError, MeanSquaredError
from xpdeep import Project, get_project, init, set_project
from xpdeep.dataset.parquet_dataset import FittedParquetDataset
from xpdeep.dataset.schema.feature.feature import UnivariateTimeSeries
from xpdeep.dataset.schema.schema import FittedSchema
from xpdeep.dataset.upload import upload
from xpdeep.explain.explainer import Explainer
from xpdeep.explain.quality_metrics import Infidelity, Sensitivity
from xpdeep.explain.statistic import DictStats, HistogramStat, VarianceStat
from xpdeep.metric import DictMetrics, TorchGlobalMetric, TorchLeafMetric
from xpdeep.model.model_builder import ModelDecisionGraphParameters
from xpdeep.model.xpdeep_model import XpdeepModel
from xpdeep.model.zoo.mlp import MLP
from xpdeep.trainer.callbacks import EarlyStopping, Scheduler
from xpdeep.trainer.trainer import Trainer
"""Process the dataset, train, and explain the model."""
torch.random.manual_seed(42)
# ##### Prepare the Dataset #######
init(api_key="api_key", api_url="api_url")
set_project(Project.create_or_get(name="Gas Price Tutorial"))
# 1. Load and preprocess data
data = pd.read_csv("gas_price/daily_csv.csv")
data = data.set_index("Date")
data["Price"].fillna(data["Price"].mean(), inplace=True) # noqa: PD002
# Create the samples for time series prediction
lookback = 24
horizon = 5
num_samples = len(data) - lookback - horizon + 1
data_input_numpy = data.to_numpy()
data_target_numpy = data[["Price"]].to_numpy()
# Generate tensor slices for inputs and targets
tensor_slices = torch.arange(lookback + horizon).unsqueeze(0) + torch.arange(num_samples).unsqueeze(1)
input_slices = tensor_slices[:, :lookback]
target_slices = tensor_slices[:, lookback:]
transformed_inputs = torch.gather(
torch.from_numpy(data_input_numpy).unsqueeze(0).repeat(num_samples, 1, 1), 1, input_slices.unsqueeze(-1)
).numpy()
transformed_targets = torch.gather(
torch.from_numpy(data_target_numpy).unsqueeze(0).repeat(num_samples, 1, 1), 1, target_slices.unsqueeze(-1)
).numpy()
# Store the processed data in a DataFrame
data = pd.DataFrame({"Price": transformed_inputs.tolist(), "target Price": transformed_targets.tolist()})
# Split dataset into training and test sets and add indices for xpdeep
train_data, test_data = train_test_split(data, test_size=0.2, random_state=42)
# Calculate mean and scale for the target (Price)
price_array = np.array([item for sublist in train_data["Price"] for item in sublist], dtype=float)
# Now you can compute the mean and variance
data_mean = torch.tensor(np.mean(price_array), dtype=torch.float32)
data_scale = torch.tensor(np.var(price_array), dtype=torch.float32)
target_price_array = np.array([item for sublist in train_data["target Price"] for item in sublist], dtype=float)
# Now you can compute the mean and variance
target_mean = torch.tensor(np.mean(target_price_array), dtype=torch.float32)
target_scale = torch.tensor(np.var(target_price_array), dtype=torch.float32)
# Save directly to Parquet format with schema using `datasets`
features_schema = Features({
"Price": datasets.Array2D(dtype="float32", shape=(24, 1)),
"target Price": datasets.Array2D(dtype="float32", shape=(5, 1)),
})
# Create datasets and save as Parquet
train_parquet = datasets.Dataset.from_pandas(train_data, preserve_index=False).cast(features_schema)
train_parquet.to_parquet("train.parquet")
test_parquet = datasets.Dataset.from_pandas(test_data, preserve_index=False).cast(features_schema)
test_parquet.to_parquet("test.parquet")
# Use the test dataset as validation set
val_parquet = test_parquet
val_parquet.to_parquet("val.parquet")
# 2. Upload datasets and create the schema
directory = upload(
directory_name="gas_price_dataset_uploaded",
train_set_path="train.parquet",
test_set_path="test.parquet",
val_set_path="val.parquet",
)
fitted_schema = FittedSchema(
UnivariateTimeSeries(
asynchronous=True,
name="Price",
preprocessor=ScaleUnivariate((24, 1), mean=data_mean, scale=data_scale),
),
UnivariateTimeSeries(
asynchronous=True,
name="target Price",
is_target=True,
mirrored_channel="Price",
preprocessor=ScaleUnivariate((5, 1), mean=target_mean, scale=target_scale),
),
)
# Create train, test, and validation datasets using the fitted schema
fit_train_dataset = FittedParquetDataset(
split_name="train",
identifier_name="my_local_dataset",
path=directory["train_set_path"],
fitted_schema=fitted_schema,
)
fit_test_dataset = FittedParquetDataset(
split_name="test",
identifier_name="my_local_dataset",
path=directory["test_set_path"],
fitted_schema=fitted_schema,
)
fit_val_dataset = FittedParquetDataset(
split_name="val",
identifier_name="my_local_dataset",
path=directory["val_set_path"],
fitted_schema=fitted_schema,
)
# ##### Prepare the Model #######
# 1. Set input and target sizes based on the fitted schema
target_size = fitted_schema.target_size[1:] # Should be 1 for regression
# 2. Initialize models for feature extraction and task learning
# Initialize models
feature_extraction = MLP(
norm_layer=partial(torch.nn.BatchNorm1d, track_running_stats=False),
flatten_input=True,
dropout=0.2,
input_size=24 * 1,
hidden_channels=[128, 64, 32],
)
task_learner = GazMlp(input_size=32, hidden_channels=[target_size[0]])
# 3. Specify the model's structure and constraints
model_specifications = ModelDecisionGraphParameters(
graph_depth=3,
discrimination_weight=0.2,
target_homogeneity_weight=0.9,
target_homogeneity_pruning_threshold=0.85,
population_pruning_threshold=0.05,
balancing_weight=0.1,
prune_step=10,
)
# 4. Create the explainable model using the xpdeep API
xpdeep_model = XpdeepModel.from_torch(
fitted_schema=fitted_schema,
feature_extraction=feature_extraction,
task_learner=task_learner,
backbone=None,
decision_graph_parameters=model_specifications,
)
# ##### Train #######
# 1. Define metrics and callbacks for training
metrics = DictMetrics(
mse=TorchGlobalMetric(metric=partial(MeanSquaredError), on_raw_data=True),
mae=TorchGlobalMetric(metric=partial(MeanAbsoluteError), on_raw_data=True),
leaf_metric_mse=TorchLeafMetric(metric=partial(MeanSquaredError), on_raw_data=True),
)
callbacks = [
EarlyStopping(monitoring_metric="mse", mode="minimize", patience=10),
Scheduler(
pre_scheduler=partial(ReduceLROnPlateau, patience=5, mode="min"),
step_method="epoch",
monitoring_metric="Total loss",
),
]
# 2. Define optimizer
optimizer = partial(torch.optim.AdamW, lr=0.001, foreach=False, fused=False)
# 3. Create and run the trainer
trainer = Trainer(
loss=torch.nn.MSELoss(reduction="none"),
optimizer=optimizer,
callbacks=callbacks,
start_epoch=0,
max_epochs=39,
metrics=metrics,
)
trained_model = trainer.train(
model=xpdeep_model,
train_set=fit_train_dataset,
validation_set=fit_val_dataset,
batch_size=2048,
)
# ##### Explain #######
# 1. Build the Explainer
statistics = DictStats(
histogram_target=HistogramStat(on="target", num_bins=20, num_items=1000, on_raw_data=True),
histogram_prediction=HistogramStat(on="prediction", num_bins=20, num_items=1000, on_raw_data=True),
histogram_error=HistogramStat(on="prediction_error", num_bins=20, num_items=1000, on_raw_data=True),
variance_target=VarianceStat(on="target", on_raw_data=True),
variance_prediction=VarianceStat(on="prediction", on_raw_data=True),
distribution_input=HistogramStat(on="input", num_bins=20, num_items=1000, feature_name="Price", on_raw_data=True),
)
# Here we add a per-time stamp mse (num_outputs is the number of timestamps)
leaf_metric_per_timestamp_mse = TorchLeafMetric(
metric=partial(MeanSquaredError, num_outputs=5), on_raw_data=True, reduced_dimensions=[0, 2]
)
metrics.update({"leaf_metric_per_timestamp_mse": leaf_metric_per_timestamp_mse})
quality_metrics = [Sensitivity(), Infidelity()]
explainer = Explainer(
description_representativeness=1000, quality_metrics=quality_metrics, metrics=metrics, statistics=statistics
)
# 2. Generate global model explanations
model_explanations = explainer.global_explain(
trained_model,
train_set=fit_train_dataset,
test_set=fit_test_dataset,
validation_set=fit_val_dataset,
)
print(model_explanations.visualisation_link)