Har Dataset#
Human Activity Recognition (HAR) is a dataset for regression with time-series inputs.
Please download and unzip the dataset as a zip here and update the tutorial data path accordingly.
HAR has been collected from 30 subjects performing six different activities (Walking, Walking Upstairs, Walking Downstairs, Sitting, Standing, Laying). It consists in inertial sensor data that was collected with a smartphone carried by the subjects.
The following image summarizes the dataset.
Please follow this end-to-end tutorial to prepare the dataset, create and train the model, and finally compute explanations.
Prepare the Dataset#
1. Split and Convert your Raw Data#
The first step consists in creating your train, test and validation splits as StandardDataset
.
As we only have a train and test files, we will use 20% of the train split to create a validation split.
Each ".txt" file in "Inertial Signals" folder corresponds to a single input channel, the target being in the "y_test" and "y_train" files.
We extract each feature file by file, as numpy array
format, to get input features "human_activity" as a single array
batch_size x num_timestamps x num_channels.
Let's transform the train (and validation) data:
import pandas as pd
import numpy as np
from pathlib import Path
features_dict = {}
split_name = "train"
for feature_filepath in sorted(Path(f"{split_name}/Inertial Signals/").rglob("*.txt")):
feature_name = feature_filepath.stem
features_dict[feature_name] = np.squeeze(pd.read_csv(feature_filepath, sep=r"\s+", header=None).to_numpy(dtype=np.float32))
inputs = np.transpose(np.stack(list(features_dict.values()), axis=1), (0, 2, 1))
targets = np.squeeze(pd.read_csv(f"{split_name}/y_{split_name}.txt", sep=r"\s+", header=None).to_numpy(dtype=np.float32))
We map the target values to their labels, and build the DataFrame with train and validation data.
activity_mapping = {
1: "Walking",
2: "Walking upstairs",
3: "Walking downstairs",
4: "Sitting",
5: "Standing",
6: "Laying"
}
targets_mapper = np.vectorize(lambda x: activity_mapping[x])
targets = targets_mapper(targets)
train_val_data = pd.DataFrame.from_dict({"human_activity": inputs.tolist(), "activity": targets})
Warning
We need to convert the multidimensional array "human_activity" to a list of list, as pandas.DataFrame
does not handle
this format natively.
We can now split the train data into a train and validation set.
from sklearn.model_selection import train_test_split
# Split the data into training and validation sets
train_data, val_data = train_test_split(train_val_data, test_size=0.2, random_state=42)
Similarly, we transform the test data.
features_dict = {}
split_name = "test"
for feature_filepath in sorted(Path(f"{split_name}/Inertial Signals/").rglob("*.txt")):
feature_name = feature_filepath.stem
features_dict[feature_name] = np.squeeze(pd.read_csv(feature_filepath, sep=r"\s+", header=None).to_numpy(dtype=np.float32))
inputs = np.transpose(np.stack(list(features_dict.values()), axis=1), (0, 2, 1))
targets = np.squeeze(pd.read_csv(f"{split_name}/y_{split_name}.txt", sep=r"\s+", header=None).to_numpy(dtype=np.float32))
targets = targets_mapper(targets) # Map targets to their labels.
test_data = pd.DataFrame.from_dict({"human_activity": inputs.tolist(), "activity": targets})
In addition, we need to add an index_xp_deep
column on each split, please see the doc.
train_data['index_xp_deep'] = range(len(train_data["activity"]))
test_data['index_xp_deep'] = range(len(test_data["activity"]))
val_data['index_xp_deep'] = range(len(val_data["activity"]))
👀 Full file preview
"""HAR workflow, classification, time series data."""
from functools import partial
from pathlib import Path
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import torch
from torch.optim.lr_scheduler import ReduceLROnPlateau
from models import HarCNN
from preprocessors import ScaleHAR
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from xpdeep import init, set_project
from xpdeep.dataset.parquet_dataset import AnalyzedParquetDataset, FittedParquetDataset
from xpdeep.dataset.schema.feature.feature import (
CategoricalFeature,
Metadata,
MultivariateAsynchronousTimeSerie,
)
from xpdeep.dataset.schema.preprocessor import SklearnPreprocessor
from xpdeep.dataset.schema.schema import AnalyzedSchema
from xpdeep.dataset.upload import upload
from xpdeep.explain.explainer import Explainer
from xpdeep.explain.quality_metrics import Infidelity, Sensitivity
from xpdeep.explain.statistic import DictStats, DistributionStat
from xpdeep.filtering.filter import Filter
from xpdeep.metrics.metric import DictMetrics
from xpdeep.metrics.zoo.multiclass_metrics import MulticlassConfusionMatrix, MulticlassAccuracy, MulticlassF1Score
from xpdeep.model.model_builder import ModelDecisionGraphParameters
from xpdeep.model.xpdeep_model import XpdeepModel
from xpdeep.model.zoo.cross_entropy_loss_from_proba import CrossEntropyLossFromProbabilities
from xpdeep.model.zoo.mlp import MLP
from xpdeep.project import Project
from xpdeep.trainer.callbacks import EarlyStopping, Scheduler
from xpdeep.trainer.trainer import Trainer
torch.random.manual_seed(5)
# ##### Prepare the Dataset #######
# 1. Split and Convert your Raw Data
features_dict = {}
split_name = "train"
for feature_filepath in sorted(Path(f"{split_name}/Inertial Signals/").rglob("*.txt")):
feature_name = feature_filepath.stem
features_dict[feature_name] = np.squeeze(
pd.read_csv(feature_filepath, sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
inputs = np.transpose(np.stack(list(features_dict.values()), axis=1), (0, 2, 1))
targets = np.squeeze(
pd.read_csv(f"{split_name}/y_{split_name}.txt", sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
# Map the target to their labels
activity_mapping = {
1: "Walking",
2: "Walking upstairs",
3: "Walking downstairs",
4: "Sitting",
5: "Standing",
6: "Laying",
}
targets_mapper = np.vectorize(lambda x: activity_mapping[x])
targets = targets_mapper(targets) # Map targets to their labels.
train_val_data = pd.DataFrame.from_dict({"human_activity": inputs.tolist(), "activity": targets})
# Split the data into training and validation sets
train_data, val_data = train_test_split(train_val_data, test_size=0.2, random_state=42)
features_dict = {}
split_name = "test"
for feature_filepath in sorted(Path(f"{split_name}/Inertial Signals/").rglob("*.txt")):
feature_name = feature_filepath.stem
features_dict[feature_name] = np.squeeze(
pd.read_csv(feature_filepath, sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
inputs = np.transpose(np.stack(list(features_dict.values()), axis=1), (0, 2, 1))
targets = np.squeeze(
pd.read_csv(f"{split_name}/y_{split_name}.txt", sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
targets = targets_mapper(targets) # Map targets to their labels.
test_data = pd.DataFrame.from_dict({"human_activity": inputs.tolist(), "activity": targets})
train_data["index_xp_deep"] = range(len(train_data["activity"]))
test_data["index_xp_deep"] = range(len(test_data["activity"]))
val_data["index_xp_deep"] = range(len(val_data["activity"]))
# Convert to pyarrow Table format
train_table = pa.Table.from_pandas(train_data, preserve_index=False)
val_table = pa.Table.from_pandas(val_data, preserve_index=False)
test_table = pa.Table.from_pandas(test_data, preserve_index=False)
# Save each split as ".parquet" file
pq.write_table(train_table, "train.parquet")
pq.write_table(val_table, "val.parquet")
pq.write_table(test_table, "test.parquet")
# 2. Upload your Converted Data
init(api_key="api_key", api_url="api_url")
set_project(Project(id="HarId", name="Har Tutorial"))
directory = upload(
directory_name="har_uploaded",
train_set_path="train.parquet",
test_set_path="test.parquet",
val_set_path="val.parquet",
)
# 4. Find a schema
analyzed_schema = AnalyzedSchema(
Metadata(name="index_xp_deep"),
MultivariateAsynchronousTimeSerie(
channel_names=[
"body_acc_x",
"body_acc_y",
"body_acc_z",
"body_gyro_x",
"body_gyro_y",
"body_gyro_z",
"total_acc_x",
"total_acc_y",
"total_acc_z",
],
name="human_activity",
size=[128, 9],
preprocessor=ScaleHAR(input_size=(128, 9)),
),
CategoricalFeature(
is_target=True,
name="activity",
categories=[],
preprocessor=SklearnPreprocessor(preprocess_function=OneHotEncoder(sparse_output=False)),
),
)
# Create a dataset from the analyzed schema.
analyzed_train_dataset = AnalyzedParquetDataset(
split_name="train",
identifier_name="my_local_dataset",
path=directory["train_set_path"],
analyzed_schema=analyzed_schema,
)
print(analyzed_schema)
# 5. Fit the schema
fit_train_dataset = analyzed_train_dataset.fit()
fit_test_dataset = FittedParquetDataset(
split_name="test",
identifier_name="my_local_dataset",
path=directory["test_set_path"],
fitted_schema=fit_train_dataset.fitted_schema,
)
fit_val_dataset = FittedParquetDataset(
split_name="val",
identifier_name="my_local_dataset",
path=directory["val_set_path"],
fitted_schema=fit_train_dataset.fitted_schema,
)
# ##### Prepare the Model #######
# 1. Create the required torch models
input_size = fit_train_dataset.fitted_schema.input_size[1:]
target_size = fit_train_dataset.fitted_schema.target_size[1]
print(f"input_size: {input_size} - target_size: {target_size}")
backbone = HarCNN(
with_softmax=False, # No softmax for latent space
output_size=128,
)
feature_extraction = MLP(
norm_layer=None,
flatten_input=True,
dropout=0.2,
input_size=128,
hidden_channels=[64, 32],
)
task_learner = MLP(
norm_layer=None,
input_size=32,
hidden_channels=[target_size],
last_activation=partial(torch.nn.Softmax, dim=-1),
)
# 2. Explainable Model Specifications
model_specifications = ModelDecisionGraphParameters(
graph_depth=3,
discrimination_weight=0.1,
target_homogeneity_weight=2.0,
prune_step=11,
target_homogeneity_pruning_threshold=0.7,
population_pruning_threshold=0.05,
balancing_weight=1.0,
)
# 3. Create the Explainable Model
xpdeep_model = XpdeepModel.from_torch(
fitted_schema=fit_train_dataset.fitted_schema,
feature_extraction=feature_extraction,
task_learner=task_learner,
backbone=backbone,
decision_graph_parameters=model_specifications,
)
# ##### Train #######
# Metrics to monitor the training.
metrics = DictMetrics(
multi_class_accuracy=partial(MulticlassAccuracy, num_classes=target_size),
multi_class_F1_score=partial(MulticlassF1Score, num_classes=target_size),
confusion_matrix=partial(MulticlassConfusionMatrix, normalize="all", num_classes=target_size),
)
callbacks = [
EarlyStopping(monitoring_metric="Total loss", mode="minimize", patience=5),
Scheduler(
pre_scheduler=partial(ReduceLROnPlateau, patience=3, mode="max"),
step_method="epoch",
monitoring_metric="global_multi_class_accuracy",
),
]
# Optimizer is a partial object as pytorch needs to give the model as optimizer parameter.
optimizer = partial(torch.optim.AdamW, lr=0.001)
trainer = Trainer(
loss=CrossEntropyLossFromProbabilities(reduction="none"),
optimizer=optimizer,
callbacks=callbacks,
start_epoch=0,
max_epochs=20,
metrics=metrics,
)
trained_model = trainer.train(
model=xpdeep_model,
train_set=fit_train_dataset,
validation_set=fit_val_dataset,
batch_size=128,
)
# ##### Explain #######
# 1. Build the Explainer
statistics = DictStats(
distribution_target=DistributionStat(on="target"), distribution_prediction=DistributionStat(on="prediction")
)
quality_metrics = [Sensitivity(), Infidelity()]
explainer = Explainer(
description_representativeness=1000, quality_metrics=quality_metrics, metrics=metrics, statistics=statistics
)
# 2. Model Functioning Explanations
model_explanations = explainer.global_explain(
trained_model,
train_set=fit_train_dataset,
test_set=fit_test_dataset,
validation_set=fit_val_dataset,
)
visualisation_link = model_explanations.visualisation_link
# 3. Inference and their Causal Explanations
my_filter = Filter("testing_filter", fit_test_dataset, row_indexes=list(range(100)))
causal_explanations = explainer.local_explain(trained_model, fit_test_dataset, my_filter)
visualisation_link = causal_explanations.visualisation_link
As stated in the doc, Xpdeep requires a ".parquet" file to create the dataset. The original data is stored as a ".txt" file, therefore each split must be converted to a ".parquet" file.
Tip
To get your ".parquet" files, you can easily convert each split from pandas.DataFrame
to pyarrow.Table
first.
Like pandas.DataFrame
, pyarrow.Table
does not support multidimensional arrays, please ensure to convert
arrays to lists first.
Warning
Here with set preserve_index
to False in order to remove the DataFrame "index" column from the resulting Pyarrow Table.
import pyarrow as pa
import pyarrow.parquet as pq
# Convert to pyarrow Table format
train_table = pa.Table.from_pandas(train_data, preserve_index=False)
val_table = pa.Table.from_pandas(val_data, preserve_index=False)
test_table = pa.Table.from_pandas(test_data, preserve_index=False)
# Save each split as ".parquet" file
pq.write_table(train_table, "train.parquet")
pq.write_table(val_table, "val.parquet")
pq.write_table(test_table, "test.parquet")
👀 Full file preview
"""HAR workflow, classification, time series data."""
from functools import partial
from pathlib import Path
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import torch
from torch.optim.lr_scheduler import ReduceLROnPlateau
from models import HarCNN
from preprocessors import ScaleHAR
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from xpdeep import init, set_project
from xpdeep.dataset.parquet_dataset import AnalyzedParquetDataset, FittedParquetDataset
from xpdeep.dataset.schema.feature.feature import (
CategoricalFeature,
Metadata,
MultivariateAsynchronousTimeSerie,
)
from xpdeep.dataset.schema.preprocessor import SklearnPreprocessor
from xpdeep.dataset.schema.schema import AnalyzedSchema
from xpdeep.dataset.upload import upload
from xpdeep.explain.explainer import Explainer
from xpdeep.explain.quality_metrics import Infidelity, Sensitivity
from xpdeep.explain.statistic import DictStats, DistributionStat
from xpdeep.filtering.filter import Filter
from xpdeep.metrics.metric import DictMetrics
from xpdeep.metrics.zoo.multiclass_metrics import MulticlassConfusionMatrix, MulticlassAccuracy, MulticlassF1Score
from xpdeep.model.model_builder import ModelDecisionGraphParameters
from xpdeep.model.xpdeep_model import XpdeepModel
from xpdeep.model.zoo.cross_entropy_loss_from_proba import CrossEntropyLossFromProbabilities
from xpdeep.model.zoo.mlp import MLP
from xpdeep.project import Project
from xpdeep.trainer.callbacks import EarlyStopping, Scheduler
from xpdeep.trainer.trainer import Trainer
torch.random.manual_seed(5)
# ##### Prepare the Dataset #######
# 1. Split and Convert your Raw Data
features_dict = {}
split_name = "train"
for feature_filepath in sorted(Path(f"{split_name}/Inertial Signals/").rglob("*.txt")):
feature_name = feature_filepath.stem
features_dict[feature_name] = np.squeeze(
pd.read_csv(feature_filepath, sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
inputs = np.transpose(np.stack(list(features_dict.values()), axis=1), (0, 2, 1))
targets = np.squeeze(
pd.read_csv(f"{split_name}/y_{split_name}.txt", sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
# Map the target to their labels
activity_mapping = {
1: "Walking",
2: "Walking upstairs",
3: "Walking downstairs",
4: "Sitting",
5: "Standing",
6: "Laying",
}
targets_mapper = np.vectorize(lambda x: activity_mapping[x])
targets = targets_mapper(targets) # Map targets to their labels.
train_val_data = pd.DataFrame.from_dict({"human_activity": inputs.tolist(), "activity": targets})
# Split the data into training and validation sets
train_data, val_data = train_test_split(train_val_data, test_size=0.2, random_state=42)
features_dict = {}
split_name = "test"
for feature_filepath in sorted(Path(f"{split_name}/Inertial Signals/").rglob("*.txt")):
feature_name = feature_filepath.stem
features_dict[feature_name] = np.squeeze(
pd.read_csv(feature_filepath, sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
inputs = np.transpose(np.stack(list(features_dict.values()), axis=1), (0, 2, 1))
targets = np.squeeze(
pd.read_csv(f"{split_name}/y_{split_name}.txt", sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
targets = targets_mapper(targets) # Map targets to their labels.
test_data = pd.DataFrame.from_dict({"human_activity": inputs.tolist(), "activity": targets})
train_data["index_xp_deep"] = range(len(train_data["activity"]))
test_data["index_xp_deep"] = range(len(test_data["activity"]))
val_data["index_xp_deep"] = range(len(val_data["activity"]))
# Convert to pyarrow Table format
train_table = pa.Table.from_pandas(train_data, preserve_index=False)
val_table = pa.Table.from_pandas(val_data, preserve_index=False)
test_table = pa.Table.from_pandas(test_data, preserve_index=False)
# Save each split as ".parquet" file
pq.write_table(train_table, "train.parquet")
pq.write_table(val_table, "val.parquet")
pq.write_table(test_table, "test.parquet")
# 2. Upload your Converted Data
init(api_key="api_key", api_url="api_url")
set_project(Project(id="HarId", name="Har Tutorial"))
directory = upload(
directory_name="har_uploaded",
train_set_path="train.parquet",
test_set_path="test.parquet",
val_set_path="val.parquet",
)
# 4. Find a schema
analyzed_schema = AnalyzedSchema(
Metadata(name="index_xp_deep"),
MultivariateAsynchronousTimeSerie(
channel_names=[
"body_acc_x",
"body_acc_y",
"body_acc_z",
"body_gyro_x",
"body_gyro_y",
"body_gyro_z",
"total_acc_x",
"total_acc_y",
"total_acc_z",
],
name="human_activity",
size=[128, 9],
preprocessor=ScaleHAR(input_size=(128, 9)),
),
CategoricalFeature(
is_target=True,
name="activity",
categories=[],
preprocessor=SklearnPreprocessor(preprocess_function=OneHotEncoder(sparse_output=False)),
),
)
# Create a dataset from the analyzed schema.
analyzed_train_dataset = AnalyzedParquetDataset(
split_name="train",
identifier_name="my_local_dataset",
path=directory["train_set_path"],
analyzed_schema=analyzed_schema,
)
print(analyzed_schema)
# 5. Fit the schema
fit_train_dataset = analyzed_train_dataset.fit()
fit_test_dataset = FittedParquetDataset(
split_name="test",
identifier_name="my_local_dataset",
path=directory["test_set_path"],
fitted_schema=fit_train_dataset.fitted_schema,
)
fit_val_dataset = FittedParquetDataset(
split_name="val",
identifier_name="my_local_dataset",
path=directory["val_set_path"],
fitted_schema=fit_train_dataset.fitted_schema,
)
# ##### Prepare the Model #######
# 1. Create the required torch models
input_size = fit_train_dataset.fitted_schema.input_size[1:]
target_size = fit_train_dataset.fitted_schema.target_size[1]
print(f"input_size: {input_size} - target_size: {target_size}")
backbone = HarCNN(
with_softmax=False, # No softmax for latent space
output_size=128,
)
feature_extraction = MLP(
norm_layer=None,
flatten_input=True,
dropout=0.2,
input_size=128,
hidden_channels=[64, 32],
)
task_learner = MLP(
norm_layer=None,
input_size=32,
hidden_channels=[target_size],
last_activation=partial(torch.nn.Softmax, dim=-1),
)
# 2. Explainable Model Specifications
model_specifications = ModelDecisionGraphParameters(
graph_depth=3,
discrimination_weight=0.1,
target_homogeneity_weight=2.0,
prune_step=11,
target_homogeneity_pruning_threshold=0.7,
population_pruning_threshold=0.05,
balancing_weight=1.0,
)
# 3. Create the Explainable Model
xpdeep_model = XpdeepModel.from_torch(
fitted_schema=fit_train_dataset.fitted_schema,
feature_extraction=feature_extraction,
task_learner=task_learner,
backbone=backbone,
decision_graph_parameters=model_specifications,
)
# ##### Train #######
# Metrics to monitor the training.
metrics = DictMetrics(
multi_class_accuracy=partial(MulticlassAccuracy, num_classes=target_size),
multi_class_F1_score=partial(MulticlassF1Score, num_classes=target_size),
confusion_matrix=partial(MulticlassConfusionMatrix, normalize="all", num_classes=target_size),
)
callbacks = [
EarlyStopping(monitoring_metric="Total loss", mode="minimize", patience=5),
Scheduler(
pre_scheduler=partial(ReduceLROnPlateau, patience=3, mode="max"),
step_method="epoch",
monitoring_metric="global_multi_class_accuracy",
),
]
# Optimizer is a partial object as pytorch needs to give the model as optimizer parameter.
optimizer = partial(torch.optim.AdamW, lr=0.001)
trainer = Trainer(
loss=CrossEntropyLossFromProbabilities(reduction="none"),
optimizer=optimizer,
callbacks=callbacks,
start_epoch=0,
max_epochs=20,
metrics=metrics,
)
trained_model = trainer.train(
model=xpdeep_model,
train_set=fit_train_dataset,
validation_set=fit_val_dataset,
batch_size=128,
)
# ##### Explain #######
# 1. Build the Explainer
statistics = DictStats(
distribution_target=DistributionStat(on="target"), distribution_prediction=DistributionStat(on="prediction")
)
quality_metrics = [Sensitivity(), Infidelity()]
explainer = Explainer(
description_representativeness=1000, quality_metrics=quality_metrics, metrics=metrics, statistics=statistics
)
# 2. Model Functioning Explanations
model_explanations = explainer.global_explain(
trained_model,
train_set=fit_train_dataset,
test_set=fit_test_dataset,
validation_set=fit_val_dataset,
)
visualisation_link = model_explanations.visualisation_link
# 3. Inference and their Causal Explanations
my_filter = Filter("testing_filter", fit_test_dataset, row_indexes=list(range(100)))
causal_explanations = explainer.local_explain(trained_model, fit_test_dataset, my_filter)
visualisation_link = causal_explanations.visualisation_link
2. Upload your Converted Data#
Warning
Don't forget to set up a Project
and initialize the API with your credentials !
from xpdeep import init, set_project
from xpdeep.project import Project
init(api_key="api_key", api_url="api_url")
set_project(Project(id="HarId",name="Har Tutorial"))
With your Project
set up, you can upload the converted parquet files into Xpdeep server.
from xpdeep.dataset.upload import upload
directory = upload(
directory_name="har_uploaded",
train_set_path="train.parquet",
test_set_path="test.parquet",
val_set_path="val.parquet",
)
👀 Full file preview
"""HAR workflow, classification, time series data."""
from functools import partial
from pathlib import Path
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import torch
from torch.optim.lr_scheduler import ReduceLROnPlateau
from models import HarCNN
from preprocessors import ScaleHAR
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from xpdeep import init, set_project
from xpdeep.dataset.parquet_dataset import AnalyzedParquetDataset, FittedParquetDataset
from xpdeep.dataset.schema.feature.feature import (
CategoricalFeature,
Metadata,
MultivariateAsynchronousTimeSerie,
)
from xpdeep.dataset.schema.preprocessor import SklearnPreprocessor
from xpdeep.dataset.schema.schema import AnalyzedSchema
from xpdeep.dataset.upload import upload
from xpdeep.explain.explainer import Explainer
from xpdeep.explain.quality_metrics import Infidelity, Sensitivity
from xpdeep.explain.statistic import DictStats, DistributionStat
from xpdeep.filtering.filter import Filter
from xpdeep.metrics.metric import DictMetrics
from xpdeep.metrics.zoo.multiclass_metrics import MulticlassConfusionMatrix, MulticlassAccuracy, MulticlassF1Score
from xpdeep.model.model_builder import ModelDecisionGraphParameters
from xpdeep.model.xpdeep_model import XpdeepModel
from xpdeep.model.zoo.cross_entropy_loss_from_proba import CrossEntropyLossFromProbabilities
from xpdeep.model.zoo.mlp import MLP
from xpdeep.project import Project
from xpdeep.trainer.callbacks import EarlyStopping, Scheduler
from xpdeep.trainer.trainer import Trainer
torch.random.manual_seed(5)
# ##### Prepare the Dataset #######
# 1. Split and Convert your Raw Data
features_dict = {}
split_name = "train"
for feature_filepath in sorted(Path(f"{split_name}/Inertial Signals/").rglob("*.txt")):
feature_name = feature_filepath.stem
features_dict[feature_name] = np.squeeze(
pd.read_csv(feature_filepath, sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
inputs = np.transpose(np.stack(list(features_dict.values()), axis=1), (0, 2, 1))
targets = np.squeeze(
pd.read_csv(f"{split_name}/y_{split_name}.txt", sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
# Map the target to their labels
activity_mapping = {
1: "Walking",
2: "Walking upstairs",
3: "Walking downstairs",
4: "Sitting",
5: "Standing",
6: "Laying",
}
targets_mapper = np.vectorize(lambda x: activity_mapping[x])
targets = targets_mapper(targets) # Map targets to their labels.
train_val_data = pd.DataFrame.from_dict({"human_activity": inputs.tolist(), "activity": targets})
# Split the data into training and validation sets
train_data, val_data = train_test_split(train_val_data, test_size=0.2, random_state=42)
features_dict = {}
split_name = "test"
for feature_filepath in sorted(Path(f"{split_name}/Inertial Signals/").rglob("*.txt")):
feature_name = feature_filepath.stem
features_dict[feature_name] = np.squeeze(
pd.read_csv(feature_filepath, sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
inputs = np.transpose(np.stack(list(features_dict.values()), axis=1), (0, 2, 1))
targets = np.squeeze(
pd.read_csv(f"{split_name}/y_{split_name}.txt", sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
targets = targets_mapper(targets) # Map targets to their labels.
test_data = pd.DataFrame.from_dict({"human_activity": inputs.tolist(), "activity": targets})
train_data["index_xp_deep"] = range(len(train_data["activity"]))
test_data["index_xp_deep"] = range(len(test_data["activity"]))
val_data["index_xp_deep"] = range(len(val_data["activity"]))
# Convert to pyarrow Table format
train_table = pa.Table.from_pandas(train_data, preserve_index=False)
val_table = pa.Table.from_pandas(val_data, preserve_index=False)
test_table = pa.Table.from_pandas(test_data, preserve_index=False)
# Save each split as ".parquet" file
pq.write_table(train_table, "train.parquet")
pq.write_table(val_table, "val.parquet")
pq.write_table(test_table, "test.parquet")
# 2. Upload your Converted Data
init(api_key="api_key", api_url="api_url")
set_project(Project(id="HarId", name="Har Tutorial"))
directory = upload(
directory_name="har_uploaded",
train_set_path="train.parquet",
test_set_path="test.parquet",
val_set_path="val.parquet",
)
# 4. Find a schema
analyzed_schema = AnalyzedSchema(
Metadata(name="index_xp_deep"),
MultivariateAsynchronousTimeSerie(
channel_names=[
"body_acc_x",
"body_acc_y",
"body_acc_z",
"body_gyro_x",
"body_gyro_y",
"body_gyro_z",
"total_acc_x",
"total_acc_y",
"total_acc_z",
],
name="human_activity",
size=[128, 9],
preprocessor=ScaleHAR(input_size=(128, 9)),
),
CategoricalFeature(
is_target=True,
name="activity",
categories=[],
preprocessor=SklearnPreprocessor(preprocess_function=OneHotEncoder(sparse_output=False)),
),
)
# Create a dataset from the analyzed schema.
analyzed_train_dataset = AnalyzedParquetDataset(
split_name="train",
identifier_name="my_local_dataset",
path=directory["train_set_path"],
analyzed_schema=analyzed_schema,
)
print(analyzed_schema)
# 5. Fit the schema
fit_train_dataset = analyzed_train_dataset.fit()
fit_test_dataset = FittedParquetDataset(
split_name="test",
identifier_name="my_local_dataset",
path=directory["test_set_path"],
fitted_schema=fit_train_dataset.fitted_schema,
)
fit_val_dataset = FittedParquetDataset(
split_name="val",
identifier_name="my_local_dataset",
path=directory["val_set_path"],
fitted_schema=fit_train_dataset.fitted_schema,
)
# ##### Prepare the Model #######
# 1. Create the required torch models
input_size = fit_train_dataset.fitted_schema.input_size[1:]
target_size = fit_train_dataset.fitted_schema.target_size[1]
print(f"input_size: {input_size} - target_size: {target_size}")
backbone = HarCNN(
with_softmax=False, # No softmax for latent space
output_size=128,
)
feature_extraction = MLP(
norm_layer=None,
flatten_input=True,
dropout=0.2,
input_size=128,
hidden_channels=[64, 32],
)
task_learner = MLP(
norm_layer=None,
input_size=32,
hidden_channels=[target_size],
last_activation=partial(torch.nn.Softmax, dim=-1),
)
# 2. Explainable Model Specifications
model_specifications = ModelDecisionGraphParameters(
graph_depth=3,
discrimination_weight=0.1,
target_homogeneity_weight=2.0,
prune_step=11,
target_homogeneity_pruning_threshold=0.7,
population_pruning_threshold=0.05,
balancing_weight=1.0,
)
# 3. Create the Explainable Model
xpdeep_model = XpdeepModel.from_torch(
fitted_schema=fit_train_dataset.fitted_schema,
feature_extraction=feature_extraction,
task_learner=task_learner,
backbone=backbone,
decision_graph_parameters=model_specifications,
)
# ##### Train #######
# Metrics to monitor the training.
metrics = DictMetrics(
multi_class_accuracy=partial(MulticlassAccuracy, num_classes=target_size),
multi_class_F1_score=partial(MulticlassF1Score, num_classes=target_size),
confusion_matrix=partial(MulticlassConfusionMatrix, normalize="all", num_classes=target_size),
)
callbacks = [
EarlyStopping(monitoring_metric="Total loss", mode="minimize", patience=5),
Scheduler(
pre_scheduler=partial(ReduceLROnPlateau, patience=3, mode="max"),
step_method="epoch",
monitoring_metric="global_multi_class_accuracy",
),
]
# Optimizer is a partial object as pytorch needs to give the model as optimizer parameter.
optimizer = partial(torch.optim.AdamW, lr=0.001)
trainer = Trainer(
loss=CrossEntropyLossFromProbabilities(reduction="none"),
optimizer=optimizer,
callbacks=callbacks,
start_epoch=0,
max_epochs=20,
metrics=metrics,
)
trained_model = trainer.train(
model=xpdeep_model,
train_set=fit_train_dataset,
validation_set=fit_val_dataset,
batch_size=128,
)
# ##### Explain #######
# 1. Build the Explainer
statistics = DictStats(
distribution_target=DistributionStat(on="target"), distribution_prediction=DistributionStat(on="prediction")
)
quality_metrics = [Sensitivity(), Infidelity()]
explainer = Explainer(
description_representativeness=1000, quality_metrics=quality_metrics, metrics=metrics, statistics=statistics
)
# 2. Model Functioning Explanations
model_explanations = explainer.global_explain(
trained_model,
train_set=fit_train_dataset,
test_set=fit_test_dataset,
validation_set=fit_val_dataset,
)
visualisation_link = model_explanations.visualisation_link
# 3. Inference and their Causal Explanations
my_filter = Filter("testing_filter", fit_test_dataset, row_indexes=list(range(100)))
causal_explanations = explainer.local_explain(trained_model, fit_test_dataset, my_filter)
visualisation_link = causal_explanations.visualisation_link
3. Find a schema#
For HAR, we cannot use the AutoAnalyzer
as it does not support time serie feature yet. We need therefore to create an
AnalyzedSchema
from scratch. We add a time serie feature
(9 channels, asynchronous), and a categorical feature for the target (the 6 activities to classify).
We use the custom preprocessor class ScaleHAR
that inherit from TorchPreprocessor
, and allow us to use torch.Tensor
to scale the time serie.
import torch
from xpdeep.dataset.schema.preprocessor import TorchPreprocessor
class ScaleHAR(TorchPreprocessor):
def __init__(self, input_size: tuple[int, ...]):
"""Initialize the scaler."""
super().__init__(input_size)
self.mean = torch.nn.Parameter(
torch.tensor([
-6.36303054e-04,
-2.92296862e-04,
-2.75299412e-04,
5.06464655e-04,
-8.23780853e-04,
1.12948446e-04,
8.04749279e-01,
2.87554865e-02,
8.64980163e-02,
])
)
self.scale = torch.nn.Parameter(
torch.tensor([
0.19484634,
0.12242748,
0.10687881,
0.40681506,
0.38185432,
0.25574314,
0.41411195,
0.39099543,
0.35776881,
])
)
def transform(self, inputs: torch.Tensor) -> torch.Tensor:
"""Transform."""
return (inputs - self.mean) / self.scale
def inverse_transform(self, output: torch.Tensor) -> torch.Tensor:
"""Apply inverse transform."""
return output * self.scale + self.mean
Let's now define the AnalyzedSchema
.
Warning
The Metadata must be specified here as we create the schema from scratch. Metadata is inferred from its name index_xp_deep
only with the AutoAnalyzer
.
from xpdeep.dataset.schema.schema import AnalyzedSchema
from sklearn.preprocessing import OneHotEncoder
from xpdeep.dataset.schema.feature.feature import (
CategoricalFeature,
Metadata,
MultivariateAsynchronousTimeSerie,
)
from xpdeep.dataset.schema.preprocessor import SklearnPreprocessor
analyzed_schema = AnalyzedSchema(
Metadata(name="index_xp_deep"),
MultivariateAsynchronousTimeSerie(
channel_names=[
"body_acc_x",
"body_acc_y",
"body_acc_z",
"body_gyro_x",
"body_gyro_y",
"body_gyro_z",
"total_acc_x",
"total_acc_y",
"total_acc_z",
],
name="human_activity",
size=[128, 9],
preprocessor=ScaleHAR(input_size=(128, 9)),
),
CategoricalFeature(
is_target=True,
name="activity",
categories=[],
preprocessor=SklearnPreprocessor(preprocess_function=OneHotEncoder(sparse_output=False)),
),
)
print(analyzed_schema)
+----------------------------------------------------------------+
| Schema Contents |
+-----------------------------------+----------------+-----------+
| Type | Name | Is Target |
+-----------------------------------+----------------+-----------+
| Metadata | index_xp_deep | |
| MultivariateAsynchronousTimeSerie | human_activity | ❌ |
| CategoricalFeature | activity | ✅ |
+-----------------------------------+----------------+-----------+
Tip
The CategoricalFeature
doesn't need any category as they will be inferred with the preprocessor object after the fitting step.
However, you can still force the field with a list of categories if it respects the preprocessor fitted order.
Finally, we can create the AnalyzedParquetDataset
. Test and Validation datasets will be created later.
from xpdeep.dataset.parquet_dataset import AnalyzedParquetDataset
# Create a train dataset from the analyzed schema.
analyzed_train_dataset = AnalyzedParquetDataset(split_name="train",
identifier_name="my_local_dataset",
path=directory["train_set_path"],
analyzed_schema=analyzed_schema)
👀 Full file preview
"""HAR workflow, classification, time series data."""
from functools import partial
from pathlib import Path
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import torch
from torch.optim.lr_scheduler import ReduceLROnPlateau
from models import HarCNN
from preprocessors import ScaleHAR
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from xpdeep import init, set_project
from xpdeep.dataset.parquet_dataset import AnalyzedParquetDataset, FittedParquetDataset
from xpdeep.dataset.schema.feature.feature import (
CategoricalFeature,
Metadata,
MultivariateAsynchronousTimeSerie,
)
from xpdeep.dataset.schema.preprocessor import SklearnPreprocessor
from xpdeep.dataset.schema.schema import AnalyzedSchema
from xpdeep.dataset.upload import upload
from xpdeep.explain.explainer import Explainer
from xpdeep.explain.quality_metrics import Infidelity, Sensitivity
from xpdeep.explain.statistic import DictStats, DistributionStat
from xpdeep.filtering.filter import Filter
from xpdeep.metrics.metric import DictMetrics
from xpdeep.metrics.zoo.multiclass_metrics import MulticlassConfusionMatrix, MulticlassAccuracy, MulticlassF1Score
from xpdeep.model.model_builder import ModelDecisionGraphParameters
from xpdeep.model.xpdeep_model import XpdeepModel
from xpdeep.model.zoo.cross_entropy_loss_from_proba import CrossEntropyLossFromProbabilities
from xpdeep.model.zoo.mlp import MLP
from xpdeep.project import Project
from xpdeep.trainer.callbacks import EarlyStopping, Scheduler
from xpdeep.trainer.trainer import Trainer
torch.random.manual_seed(5)
# ##### Prepare the Dataset #######
# 1. Split and Convert your Raw Data
features_dict = {}
split_name = "train"
for feature_filepath in sorted(Path(f"{split_name}/Inertial Signals/").rglob("*.txt")):
feature_name = feature_filepath.stem
features_dict[feature_name] = np.squeeze(
pd.read_csv(feature_filepath, sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
inputs = np.transpose(np.stack(list(features_dict.values()), axis=1), (0, 2, 1))
targets = np.squeeze(
pd.read_csv(f"{split_name}/y_{split_name}.txt", sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
# Map the target to their labels
activity_mapping = {
1: "Walking",
2: "Walking upstairs",
3: "Walking downstairs",
4: "Sitting",
5: "Standing",
6: "Laying",
}
targets_mapper = np.vectorize(lambda x: activity_mapping[x])
targets = targets_mapper(targets) # Map targets to their labels.
train_val_data = pd.DataFrame.from_dict({"human_activity": inputs.tolist(), "activity": targets})
# Split the data into training and validation sets
train_data, val_data = train_test_split(train_val_data, test_size=0.2, random_state=42)
features_dict = {}
split_name = "test"
for feature_filepath in sorted(Path(f"{split_name}/Inertial Signals/").rglob("*.txt")):
feature_name = feature_filepath.stem
features_dict[feature_name] = np.squeeze(
pd.read_csv(feature_filepath, sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
inputs = np.transpose(np.stack(list(features_dict.values()), axis=1), (0, 2, 1))
targets = np.squeeze(
pd.read_csv(f"{split_name}/y_{split_name}.txt", sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
targets = targets_mapper(targets) # Map targets to their labels.
test_data = pd.DataFrame.from_dict({"human_activity": inputs.tolist(), "activity": targets})
train_data["index_xp_deep"] = range(len(train_data["activity"]))
test_data["index_xp_deep"] = range(len(test_data["activity"]))
val_data["index_xp_deep"] = range(len(val_data["activity"]))
# Convert to pyarrow Table format
train_table = pa.Table.from_pandas(train_data, preserve_index=False)
val_table = pa.Table.from_pandas(val_data, preserve_index=False)
test_table = pa.Table.from_pandas(test_data, preserve_index=False)
# Save each split as ".parquet" file
pq.write_table(train_table, "train.parquet")
pq.write_table(val_table, "val.parquet")
pq.write_table(test_table, "test.parquet")
# 2. Upload your Converted Data
init(api_key="api_key", api_url="api_url")
set_project(Project(id="HarId", name="Har Tutorial"))
directory = upload(
directory_name="har_uploaded",
train_set_path="train.parquet",
test_set_path="test.parquet",
val_set_path="val.parquet",
)
# 4. Find a schema
analyzed_schema = AnalyzedSchema(
Metadata(name="index_xp_deep"),
MultivariateAsynchronousTimeSerie(
channel_names=[
"body_acc_x",
"body_acc_y",
"body_acc_z",
"body_gyro_x",
"body_gyro_y",
"body_gyro_z",
"total_acc_x",
"total_acc_y",
"total_acc_z",
],
name="human_activity",
size=[128, 9],
preprocessor=ScaleHAR(input_size=(128, 9)),
),
CategoricalFeature(
is_target=True,
name="activity",
categories=[],
preprocessor=SklearnPreprocessor(preprocess_function=OneHotEncoder(sparse_output=False)),
),
)
# Create a dataset from the analyzed schema.
analyzed_train_dataset = AnalyzedParquetDataset(
split_name="train",
identifier_name="my_local_dataset",
path=directory["train_set_path"],
analyzed_schema=analyzed_schema,
)
print(analyzed_schema)
# 5. Fit the schema
fit_train_dataset = analyzed_train_dataset.fit()
fit_test_dataset = FittedParquetDataset(
split_name="test",
identifier_name="my_local_dataset",
path=directory["test_set_path"],
fitted_schema=fit_train_dataset.fitted_schema,
)
fit_val_dataset = FittedParquetDataset(
split_name="val",
identifier_name="my_local_dataset",
path=directory["val_set_path"],
fitted_schema=fit_train_dataset.fitted_schema,
)
# ##### Prepare the Model #######
# 1. Create the required torch models
input_size = fit_train_dataset.fitted_schema.input_size[1:]
target_size = fit_train_dataset.fitted_schema.target_size[1]
print(f"input_size: {input_size} - target_size: {target_size}")
backbone = HarCNN(
with_softmax=False, # No softmax for latent space
output_size=128,
)
feature_extraction = MLP(
norm_layer=None,
flatten_input=True,
dropout=0.2,
input_size=128,
hidden_channels=[64, 32],
)
task_learner = MLP(
norm_layer=None,
input_size=32,
hidden_channels=[target_size],
last_activation=partial(torch.nn.Softmax, dim=-1),
)
# 2. Explainable Model Specifications
model_specifications = ModelDecisionGraphParameters(
graph_depth=3,
discrimination_weight=0.1,
target_homogeneity_weight=2.0,
prune_step=11,
target_homogeneity_pruning_threshold=0.7,
population_pruning_threshold=0.05,
balancing_weight=1.0,
)
# 3. Create the Explainable Model
xpdeep_model = XpdeepModel.from_torch(
fitted_schema=fit_train_dataset.fitted_schema,
feature_extraction=feature_extraction,
task_learner=task_learner,
backbone=backbone,
decision_graph_parameters=model_specifications,
)
# ##### Train #######
# Metrics to monitor the training.
metrics = DictMetrics(
multi_class_accuracy=partial(MulticlassAccuracy, num_classes=target_size),
multi_class_F1_score=partial(MulticlassF1Score, num_classes=target_size),
confusion_matrix=partial(MulticlassConfusionMatrix, normalize="all", num_classes=target_size),
)
callbacks = [
EarlyStopping(monitoring_metric="Total loss", mode="minimize", patience=5),
Scheduler(
pre_scheduler=partial(ReduceLROnPlateau, patience=3, mode="max"),
step_method="epoch",
monitoring_metric="global_multi_class_accuracy",
),
]
# Optimizer is a partial object as pytorch needs to give the model as optimizer parameter.
optimizer = partial(torch.optim.AdamW, lr=0.001)
trainer = Trainer(
loss=CrossEntropyLossFromProbabilities(reduction="none"),
optimizer=optimizer,
callbacks=callbacks,
start_epoch=0,
max_epochs=20,
metrics=metrics,
)
trained_model = trainer.train(
model=xpdeep_model,
train_set=fit_train_dataset,
validation_set=fit_val_dataset,
batch_size=128,
)
# ##### Explain #######
# 1. Build the Explainer
statistics = DictStats(
distribution_target=DistributionStat(on="target"), distribution_prediction=DistributionStat(on="prediction")
)
quality_metrics = [Sensitivity(), Infidelity()]
explainer = Explainer(
description_representativeness=1000, quality_metrics=quality_metrics, metrics=metrics, statistics=statistics
)
# 2. Model Functioning Explanations
model_explanations = explainer.global_explain(
trained_model,
train_set=fit_train_dataset,
test_set=fit_test_dataset,
validation_set=fit_val_dataset,
)
visualisation_link = model_explanations.visualisation_link
# 3. Inference and their Causal Explanations
my_filter = Filter("testing_filter", fit_test_dataset, row_indexes=list(range(100)))
causal_explanations = explainer.local_explain(trained_model, fit_test_dataset, my_filter)
visualisation_link = causal_explanations.visualisation_link
Tip
Here we did not build a ParquetDataset
first as we create the dataset straight from the existing analyzed schema.
The ParquetDataset
interface serves only as an intermediate class, used to obtain an AnalyzedParquetDataset
via the AutoAnalyzer
and its analyze
method.
5. Fit the schema#
With your AnalyzedSchema
ready, you can now fit the schema to fit each feature preprocessor on the train set.
Note
Only the SklearnPreprocessor
will be fitted when calling analyzed_train_dataset.fit()
as ScaleHAR
does not
required a fitting step.
We use the same FittedSchema
to create a FittedParquetDataset
corresponding to the validation and test set.
from xpdeep.dataset.parquet_dataset import FittedParquetDataset
fit_test_dataset = FittedParquetDataset(split_name="test",
identifier_name="my_local_dataset",
path=directory["test_set_path"],
fitted_schema=fit_train_dataset.fitted_schema)
fit_val_dataset = FittedParquetDataset(split_name="val",
identifier_name="my_local_dataset",
path=directory["val_set_path"],
fitted_schema=fit_train_dataset.fitted_schema)
👀 Full file preview
"""HAR workflow, classification, time series data."""
from functools import partial
from pathlib import Path
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import torch
from torch.optim.lr_scheduler import ReduceLROnPlateau
from models import HarCNN
from preprocessors import ScaleHAR
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from xpdeep import init, set_project
from xpdeep.dataset.parquet_dataset import AnalyzedParquetDataset, FittedParquetDataset
from xpdeep.dataset.schema.feature.feature import (
CategoricalFeature,
Metadata,
MultivariateAsynchronousTimeSerie,
)
from xpdeep.dataset.schema.preprocessor import SklearnPreprocessor
from xpdeep.dataset.schema.schema import AnalyzedSchema
from xpdeep.dataset.upload import upload
from xpdeep.explain.explainer import Explainer
from xpdeep.explain.quality_metrics import Infidelity, Sensitivity
from xpdeep.explain.statistic import DictStats, DistributionStat
from xpdeep.filtering.filter import Filter
from xpdeep.metrics.metric import DictMetrics
from xpdeep.metrics.zoo.multiclass_metrics import MulticlassConfusionMatrix, MulticlassAccuracy, MulticlassF1Score
from xpdeep.model.model_builder import ModelDecisionGraphParameters
from xpdeep.model.xpdeep_model import XpdeepModel
from xpdeep.model.zoo.cross_entropy_loss_from_proba import CrossEntropyLossFromProbabilities
from xpdeep.model.zoo.mlp import MLP
from xpdeep.project import Project
from xpdeep.trainer.callbacks import EarlyStopping, Scheduler
from xpdeep.trainer.trainer import Trainer
torch.random.manual_seed(5)
# ##### Prepare the Dataset #######
# 1. Split and Convert your Raw Data
features_dict = {}
split_name = "train"
for feature_filepath in sorted(Path(f"{split_name}/Inertial Signals/").rglob("*.txt")):
feature_name = feature_filepath.stem
features_dict[feature_name] = np.squeeze(
pd.read_csv(feature_filepath, sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
inputs = np.transpose(np.stack(list(features_dict.values()), axis=1), (0, 2, 1))
targets = np.squeeze(
pd.read_csv(f"{split_name}/y_{split_name}.txt", sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
# Map the target to their labels
activity_mapping = {
1: "Walking",
2: "Walking upstairs",
3: "Walking downstairs",
4: "Sitting",
5: "Standing",
6: "Laying",
}
targets_mapper = np.vectorize(lambda x: activity_mapping[x])
targets = targets_mapper(targets) # Map targets to their labels.
train_val_data = pd.DataFrame.from_dict({"human_activity": inputs.tolist(), "activity": targets})
# Split the data into training and validation sets
train_data, val_data = train_test_split(train_val_data, test_size=0.2, random_state=42)
features_dict = {}
split_name = "test"
for feature_filepath in sorted(Path(f"{split_name}/Inertial Signals/").rglob("*.txt")):
feature_name = feature_filepath.stem
features_dict[feature_name] = np.squeeze(
pd.read_csv(feature_filepath, sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
inputs = np.transpose(np.stack(list(features_dict.values()), axis=1), (0, 2, 1))
targets = np.squeeze(
pd.read_csv(f"{split_name}/y_{split_name}.txt", sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
targets = targets_mapper(targets) # Map targets to their labels.
test_data = pd.DataFrame.from_dict({"human_activity": inputs.tolist(), "activity": targets})
train_data["index_xp_deep"] = range(len(train_data["activity"]))
test_data["index_xp_deep"] = range(len(test_data["activity"]))
val_data["index_xp_deep"] = range(len(val_data["activity"]))
# Convert to pyarrow Table format
train_table = pa.Table.from_pandas(train_data, preserve_index=False)
val_table = pa.Table.from_pandas(val_data, preserve_index=False)
test_table = pa.Table.from_pandas(test_data, preserve_index=False)
# Save each split as ".parquet" file
pq.write_table(train_table, "train.parquet")
pq.write_table(val_table, "val.parquet")
pq.write_table(test_table, "test.parquet")
# 2. Upload your Converted Data
init(api_key="api_key", api_url="api_url")
set_project(Project(id="HarId", name="Har Tutorial"))
directory = upload(
directory_name="har_uploaded",
train_set_path="train.parquet",
test_set_path="test.parquet",
val_set_path="val.parquet",
)
# 4. Find a schema
analyzed_schema = AnalyzedSchema(
Metadata(name="index_xp_deep"),
MultivariateAsynchronousTimeSerie(
channel_names=[
"body_acc_x",
"body_acc_y",
"body_acc_z",
"body_gyro_x",
"body_gyro_y",
"body_gyro_z",
"total_acc_x",
"total_acc_y",
"total_acc_z",
],
name="human_activity",
size=[128, 9],
preprocessor=ScaleHAR(input_size=(128, 9)),
),
CategoricalFeature(
is_target=True,
name="activity",
categories=[],
preprocessor=SklearnPreprocessor(preprocess_function=OneHotEncoder(sparse_output=False)),
),
)
# Create a dataset from the analyzed schema.
analyzed_train_dataset = AnalyzedParquetDataset(
split_name="train",
identifier_name="my_local_dataset",
path=directory["train_set_path"],
analyzed_schema=analyzed_schema,
)
print(analyzed_schema)
# 5. Fit the schema
fit_train_dataset = analyzed_train_dataset.fit()
fit_test_dataset = FittedParquetDataset(
split_name="test",
identifier_name="my_local_dataset",
path=directory["test_set_path"],
fitted_schema=fit_train_dataset.fitted_schema,
)
fit_val_dataset = FittedParquetDataset(
split_name="val",
identifier_name="my_local_dataset",
path=directory["val_set_path"],
fitted_schema=fit_train_dataset.fitted_schema,
)
# ##### Prepare the Model #######
# 1. Create the required torch models
input_size = fit_train_dataset.fitted_schema.input_size[1:]
target_size = fit_train_dataset.fitted_schema.target_size[1]
print(f"input_size: {input_size} - target_size: {target_size}")
backbone = HarCNN(
with_softmax=False, # No softmax for latent space
output_size=128,
)
feature_extraction = MLP(
norm_layer=None,
flatten_input=True,
dropout=0.2,
input_size=128,
hidden_channels=[64, 32],
)
task_learner = MLP(
norm_layer=None,
input_size=32,
hidden_channels=[target_size],
last_activation=partial(torch.nn.Softmax, dim=-1),
)
# 2. Explainable Model Specifications
model_specifications = ModelDecisionGraphParameters(
graph_depth=3,
discrimination_weight=0.1,
target_homogeneity_weight=2.0,
prune_step=11,
target_homogeneity_pruning_threshold=0.7,
population_pruning_threshold=0.05,
balancing_weight=1.0,
)
# 3. Create the Explainable Model
xpdeep_model = XpdeepModel.from_torch(
fitted_schema=fit_train_dataset.fitted_schema,
feature_extraction=feature_extraction,
task_learner=task_learner,
backbone=backbone,
decision_graph_parameters=model_specifications,
)
# ##### Train #######
# Metrics to monitor the training.
metrics = DictMetrics(
multi_class_accuracy=partial(MulticlassAccuracy, num_classes=target_size),
multi_class_F1_score=partial(MulticlassF1Score, num_classes=target_size),
confusion_matrix=partial(MulticlassConfusionMatrix, normalize="all", num_classes=target_size),
)
callbacks = [
EarlyStopping(monitoring_metric="Total loss", mode="minimize", patience=5),
Scheduler(
pre_scheduler=partial(ReduceLROnPlateau, patience=3, mode="max"),
step_method="epoch",
monitoring_metric="global_multi_class_accuracy",
),
]
# Optimizer is a partial object as pytorch needs to give the model as optimizer parameter.
optimizer = partial(torch.optim.AdamW, lr=0.001)
trainer = Trainer(
loss=CrossEntropyLossFromProbabilities(reduction="none"),
optimizer=optimizer,
callbacks=callbacks,
start_epoch=0,
max_epochs=20,
metrics=metrics,
)
trained_model = trainer.train(
model=xpdeep_model,
train_set=fit_train_dataset,
validation_set=fit_val_dataset,
batch_size=128,
)
# ##### Explain #######
# 1. Build the Explainer
statistics = DictStats(
distribution_target=DistributionStat(on="target"), distribution_prediction=DistributionStat(on="prediction")
)
quality_metrics = [Sensitivity(), Infidelity()]
explainer = Explainer(
description_representativeness=1000, quality_metrics=quality_metrics, metrics=metrics, statistics=statistics
)
# 2. Model Functioning Explanations
model_explanations = explainer.global_explain(
trained_model,
train_set=fit_train_dataset,
test_set=fit_test_dataset,
validation_set=fit_val_dataset,
)
visualisation_link = model_explanations.visualisation_link
# 3. Inference and their Causal Explanations
my_filter = Filter("testing_filter", fit_test_dataset, row_indexes=list(range(100)))
causal_explanations = explainer.local_explain(trained_model, fit_test_dataset, my_filter)
visualisation_link = causal_explanations.visualisation_link
And that's all for the dataset preparation. We now have three FittedParquetDataset
, each with its FittedSchema
,
ready to be used.
Prepare the Model#
We need now to create an explainable model XpdeepModel
.
1. Create the required torch models#
We have a classification task with time serie input data. We will use a
basic Multi Layer Perceptron (MLP) for this task, combined with a BackboneModel
.
Tip
Model input and output sizes (including the batch dimension) can be easily retrieved from the fitted schema.
Therefore, we chose:
- The
FeatureExtractionModel
that will embed input data into a 32 dimensions space. - The
TaskLearnerModel
that will return an output of size 6. - The
BackboneModel
that convert input data in a better embedding space size of 128.
from torch import Tensor, nn
import torch
class ResidualBlock1D(nn.Module):
"""Residual block with 1D convolutions."""
def __init__(self, in_channels: int, out_channels: int, stride: int = 1, kernel_size: int = 3, padding: int = 1):
"""Init residual block."""
super().__init__()
self.conv1 = nn.Sequential(
nn.Conv1d(in_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=padding),
nn.ReLU(),
)
self.conv2 = nn.Sequential(
nn.Conv1d(out_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=padding),
)
self.cast_layer = nn.Linear(in_channels, out_channels)
self.relu = nn.ReLU()
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""Forward function."""
# cast number of channels to number of out channels.
# It should be called as a CNN, with channels in 2nd dimension
residual = self.cast_layer(x.transpose(1, -1)).transpose(1, -1)
out = self.conv1(x)
out = self.conv2(out)
out += residual
return self.relu(out)
class ResidualCNN1D(nn.Module):
"""Cnn with 1D residual blocks."""
def __init__( # noqa: PLR0913
self,
in_channels: int,
*,
dropout: float = 0.0,
out_channels: tuple[int, ...] = (32, 64, 128),
max_pool_size: int = 2,
output_size: int | None = 50,
with_softmax: bool = False,
transpose_input: bool = False,
):
"""Initialize a basic CNN model for classification."""
super().__init__()
layers = []
for channel_dim in out_channels:
layers.extend([
ResidualBlock1D(in_channels=in_channels, out_channels=channel_dim),
nn.MaxPool1d(max_pool_size),
nn.Dropout(p=dropout),
])
in_channels = channel_dim
self.cnn = nn.Sequential(*layers)
self.mlp: None | nn.Sequential = None
if output_size is not None:
mlp_layers = [nn.Flatten(), nn.LazyLinear(output_size)]
if with_softmax:
mlp_layers.append(nn.Softmax(dim=-1))
self.mlp = nn.Sequential(*mlp_layers)
self.transpose_input = transpose_input
def forward(self, x: Tensor) -> Tensor:
"""Forward pass."""
if hasattr(self, "transpose_input") and self.transpose_input: # retro compatibility
features = self.cnn(x.permute(*(0, x.dim() - 1, *range(1, x.dim() - 1))))
else:
features = self.cnn(x)
if self.mlp is not None:
return self.mlp(features)
return features
class HarCNN(ResidualCNN1D):
"""Backbone CNN model for HAR."""
def __init__(
self,
dropout: float = 0.0,
output_size: int = 50,
*,
out_channels: tuple[int, ...] = (32, 64, 128),
with_softmax: bool = False,
):
"""Initialize a basic CNN model for classification."""
super().__init__(
in_channels=9,
out_channels=out_channels,
output_size=output_size,
dropout=dropout,
with_softmax=with_softmax,
)
def forward(self, x: Tensor) -> Tensor:
"""Forward pass."""
return super().forward(x.transpose(1, 2))
backbone = HarCNN(
with_softmax=False, # No softmax for latent space
output_size=128,
)
Here we define the FeatureExtractionModel
and the TaskLearnerModel
.
from xpdeep.model.zoo.mlp import MLP
feature_extraction = MLP(
norm_layer=None,
flatten_input=True,
dropout=0.2,
input_size=128,
hidden_channels=[64, 32],
)
task_learner = MLP(
norm_layer=None,
input_size=32,
hidden_channels=[target_size],
last_activation=partial(torch.nn.Softmax, dim=-1),
)
👀 Full file preview
"""HAR workflow, classification, time series data."""
from functools import partial
from pathlib import Path
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import torch
from torch.optim.lr_scheduler import ReduceLROnPlateau
from models import HarCNN
from preprocessors import ScaleHAR
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from xpdeep import init, set_project
from xpdeep.dataset.parquet_dataset import AnalyzedParquetDataset, FittedParquetDataset
from xpdeep.dataset.schema.feature.feature import (
CategoricalFeature,
Metadata,
MultivariateAsynchronousTimeSerie,
)
from xpdeep.dataset.schema.preprocessor import SklearnPreprocessor
from xpdeep.dataset.schema.schema import AnalyzedSchema
from xpdeep.dataset.upload import upload
from xpdeep.explain.explainer import Explainer
from xpdeep.explain.quality_metrics import Infidelity, Sensitivity
from xpdeep.explain.statistic import DictStats, DistributionStat
from xpdeep.filtering.filter import Filter
from xpdeep.metrics.metric import DictMetrics
from xpdeep.metrics.zoo.multiclass_metrics import MulticlassConfusionMatrix, MulticlassAccuracy, MulticlassF1Score
from xpdeep.model.model_builder import ModelDecisionGraphParameters
from xpdeep.model.xpdeep_model import XpdeepModel
from xpdeep.model.zoo.cross_entropy_loss_from_proba import CrossEntropyLossFromProbabilities
from xpdeep.model.zoo.mlp import MLP
from xpdeep.project import Project
from xpdeep.trainer.callbacks import EarlyStopping, Scheduler
from xpdeep.trainer.trainer import Trainer
torch.random.manual_seed(5)
# ##### Prepare the Dataset #######
# 1. Split and Convert your Raw Data
features_dict = {}
split_name = "train"
for feature_filepath in sorted(Path(f"{split_name}/Inertial Signals/").rglob("*.txt")):
feature_name = feature_filepath.stem
features_dict[feature_name] = np.squeeze(
pd.read_csv(feature_filepath, sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
inputs = np.transpose(np.stack(list(features_dict.values()), axis=1), (0, 2, 1))
targets = np.squeeze(
pd.read_csv(f"{split_name}/y_{split_name}.txt", sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
# Map the target to their labels
activity_mapping = {
1: "Walking",
2: "Walking upstairs",
3: "Walking downstairs",
4: "Sitting",
5: "Standing",
6: "Laying",
}
targets_mapper = np.vectorize(lambda x: activity_mapping[x])
targets = targets_mapper(targets) # Map targets to their labels.
train_val_data = pd.DataFrame.from_dict({"human_activity": inputs.tolist(), "activity": targets})
# Split the data into training and validation sets
train_data, val_data = train_test_split(train_val_data, test_size=0.2, random_state=42)
features_dict = {}
split_name = "test"
for feature_filepath in sorted(Path(f"{split_name}/Inertial Signals/").rglob("*.txt")):
feature_name = feature_filepath.stem
features_dict[feature_name] = np.squeeze(
pd.read_csv(feature_filepath, sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
inputs = np.transpose(np.stack(list(features_dict.values()), axis=1), (0, 2, 1))
targets = np.squeeze(
pd.read_csv(f"{split_name}/y_{split_name}.txt", sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
targets = targets_mapper(targets) # Map targets to their labels.
test_data = pd.DataFrame.from_dict({"human_activity": inputs.tolist(), "activity": targets})
train_data["index_xp_deep"] = range(len(train_data["activity"]))
test_data["index_xp_deep"] = range(len(test_data["activity"]))
val_data["index_xp_deep"] = range(len(val_data["activity"]))
# Convert to pyarrow Table format
train_table = pa.Table.from_pandas(train_data, preserve_index=False)
val_table = pa.Table.from_pandas(val_data, preserve_index=False)
test_table = pa.Table.from_pandas(test_data, preserve_index=False)
# Save each split as ".parquet" file
pq.write_table(train_table, "train.parquet")
pq.write_table(val_table, "val.parquet")
pq.write_table(test_table, "test.parquet")
# 2. Upload your Converted Data
init(api_key="api_key", api_url="api_url")
set_project(Project(id="HarId", name="Har Tutorial"))
directory = upload(
directory_name="har_uploaded",
train_set_path="train.parquet",
test_set_path="test.parquet",
val_set_path="val.parquet",
)
# 4. Find a schema
analyzed_schema = AnalyzedSchema(
Metadata(name="index_xp_deep"),
MultivariateAsynchronousTimeSerie(
channel_names=[
"body_acc_x",
"body_acc_y",
"body_acc_z",
"body_gyro_x",
"body_gyro_y",
"body_gyro_z",
"total_acc_x",
"total_acc_y",
"total_acc_z",
],
name="human_activity",
size=[128, 9],
preprocessor=ScaleHAR(input_size=(128, 9)),
),
CategoricalFeature(
is_target=True,
name="activity",
categories=[],
preprocessor=SklearnPreprocessor(preprocess_function=OneHotEncoder(sparse_output=False)),
),
)
# Create a dataset from the analyzed schema.
analyzed_train_dataset = AnalyzedParquetDataset(
split_name="train",
identifier_name="my_local_dataset",
path=directory["train_set_path"],
analyzed_schema=analyzed_schema,
)
print(analyzed_schema)
# 5. Fit the schema
fit_train_dataset = analyzed_train_dataset.fit()
fit_test_dataset = FittedParquetDataset(
split_name="test",
identifier_name="my_local_dataset",
path=directory["test_set_path"],
fitted_schema=fit_train_dataset.fitted_schema,
)
fit_val_dataset = FittedParquetDataset(
split_name="val",
identifier_name="my_local_dataset",
path=directory["val_set_path"],
fitted_schema=fit_train_dataset.fitted_schema,
)
# ##### Prepare the Model #######
# 1. Create the required torch models
input_size = fit_train_dataset.fitted_schema.input_size[1:]
target_size = fit_train_dataset.fitted_schema.target_size[1]
print(f"input_size: {input_size} - target_size: {target_size}")
backbone = HarCNN(
with_softmax=False, # No softmax for latent space
output_size=128,
)
feature_extraction = MLP(
norm_layer=None,
flatten_input=True,
dropout=0.2,
input_size=128,
hidden_channels=[64, 32],
)
task_learner = MLP(
norm_layer=None,
input_size=32,
hidden_channels=[target_size],
last_activation=partial(torch.nn.Softmax, dim=-1),
)
# 2. Explainable Model Specifications
model_specifications = ModelDecisionGraphParameters(
graph_depth=3,
discrimination_weight=0.1,
target_homogeneity_weight=2.0,
prune_step=11,
target_homogeneity_pruning_threshold=0.7,
population_pruning_threshold=0.05,
balancing_weight=1.0,
)
# 3. Create the Explainable Model
xpdeep_model = XpdeepModel.from_torch(
fitted_schema=fit_train_dataset.fitted_schema,
feature_extraction=feature_extraction,
task_learner=task_learner,
backbone=backbone,
decision_graph_parameters=model_specifications,
)
# ##### Train #######
# Metrics to monitor the training.
metrics = DictMetrics(
multi_class_accuracy=partial(MulticlassAccuracy, num_classes=target_size),
multi_class_F1_score=partial(MulticlassF1Score, num_classes=target_size),
confusion_matrix=partial(MulticlassConfusionMatrix, normalize="all", num_classes=target_size),
)
callbacks = [
EarlyStopping(monitoring_metric="Total loss", mode="minimize", patience=5),
Scheduler(
pre_scheduler=partial(ReduceLROnPlateau, patience=3, mode="max"),
step_method="epoch",
monitoring_metric="global_multi_class_accuracy",
),
]
# Optimizer is a partial object as pytorch needs to give the model as optimizer parameter.
optimizer = partial(torch.optim.AdamW, lr=0.001)
trainer = Trainer(
loss=CrossEntropyLossFromProbabilities(reduction="none"),
optimizer=optimizer,
callbacks=callbacks,
start_epoch=0,
max_epochs=20,
metrics=metrics,
)
trained_model = trainer.train(
model=xpdeep_model,
train_set=fit_train_dataset,
validation_set=fit_val_dataset,
batch_size=128,
)
# ##### Explain #######
# 1. Build the Explainer
statistics = DictStats(
distribution_target=DistributionStat(on="target"), distribution_prediction=DistributionStat(on="prediction")
)
quality_metrics = [Sensitivity(), Infidelity()]
explainer = Explainer(
description_representativeness=1000, quality_metrics=quality_metrics, metrics=metrics, statistics=statistics
)
# 2. Model Functioning Explanations
model_explanations = explainer.global_explain(
trained_model,
train_set=fit_train_dataset,
test_set=fit_test_dataset,
validation_set=fit_val_dataset,
)
visualisation_link = model_explanations.visualisation_link
# 3. Inference and their Causal Explanations
my_filter = Filter("testing_filter", fit_test_dataset, row_indexes=list(range(100)))
causal_explanations = explainer.local_explain(trained_model, fit_test_dataset, my_filter)
visualisation_link = causal_explanations.visualisation_link
2. Explainable Model Specifications#
Here comes the crucial part: we need to specify model specifications under ModelDecisionGraphParameters
to get the best explanations (Model Decision Graph and Inference Graph).
from xpdeep.model.model_builder import ModelDecisionGraphParameters
model_specifications = ModelDecisionGraphParameters(
graph_depth=3,
discrimination_weight=0.1,
target_homogeneity_weight=1.0,
prune_step=11,
target_homogeneity_pruning_threshold=0.7,
population_pruning_threshold=0.05,
balancing_weight=1.0.
)
👀 Full file preview
"""HAR workflow, classification, time series data."""
from functools import partial
from pathlib import Path
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import torch
from torch.optim.lr_scheduler import ReduceLROnPlateau
from models import HarCNN
from preprocessors import ScaleHAR
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from xpdeep import init, set_project
from xpdeep.dataset.parquet_dataset import AnalyzedParquetDataset, FittedParquetDataset
from xpdeep.dataset.schema.feature.feature import (
CategoricalFeature,
Metadata,
MultivariateAsynchronousTimeSerie,
)
from xpdeep.dataset.schema.preprocessor import SklearnPreprocessor
from xpdeep.dataset.schema.schema import AnalyzedSchema
from xpdeep.dataset.upload import upload
from xpdeep.explain.explainer import Explainer
from xpdeep.explain.quality_metrics import Infidelity, Sensitivity
from xpdeep.explain.statistic import DictStats, DistributionStat
from xpdeep.filtering.filter import Filter
from xpdeep.metrics.metric import DictMetrics
from xpdeep.metrics.zoo.multiclass_metrics import MulticlassConfusionMatrix, MulticlassAccuracy, MulticlassF1Score
from xpdeep.model.model_builder import ModelDecisionGraphParameters
from xpdeep.model.xpdeep_model import XpdeepModel
from xpdeep.model.zoo.cross_entropy_loss_from_proba import CrossEntropyLossFromProbabilities
from xpdeep.model.zoo.mlp import MLP
from xpdeep.project import Project
from xpdeep.trainer.callbacks import EarlyStopping, Scheduler
from xpdeep.trainer.trainer import Trainer
torch.random.manual_seed(5)
# ##### Prepare the Dataset #######
# 1. Split and Convert your Raw Data
features_dict = {}
split_name = "train"
for feature_filepath in sorted(Path(f"{split_name}/Inertial Signals/").rglob("*.txt")):
feature_name = feature_filepath.stem
features_dict[feature_name] = np.squeeze(
pd.read_csv(feature_filepath, sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
inputs = np.transpose(np.stack(list(features_dict.values()), axis=1), (0, 2, 1))
targets = np.squeeze(
pd.read_csv(f"{split_name}/y_{split_name}.txt", sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
# Map the target to their labels
activity_mapping = {
1: "Walking",
2: "Walking upstairs",
3: "Walking downstairs",
4: "Sitting",
5: "Standing",
6: "Laying",
}
targets_mapper = np.vectorize(lambda x: activity_mapping[x])
targets = targets_mapper(targets) # Map targets to their labels.
train_val_data = pd.DataFrame.from_dict({"human_activity": inputs.tolist(), "activity": targets})
# Split the data into training and validation sets
train_data, val_data = train_test_split(train_val_data, test_size=0.2, random_state=42)
features_dict = {}
split_name = "test"
for feature_filepath in sorted(Path(f"{split_name}/Inertial Signals/").rglob("*.txt")):
feature_name = feature_filepath.stem
features_dict[feature_name] = np.squeeze(
pd.read_csv(feature_filepath, sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
inputs = np.transpose(np.stack(list(features_dict.values()), axis=1), (0, 2, 1))
targets = np.squeeze(
pd.read_csv(f"{split_name}/y_{split_name}.txt", sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
targets = targets_mapper(targets) # Map targets to their labels.
test_data = pd.DataFrame.from_dict({"human_activity": inputs.tolist(), "activity": targets})
train_data["index_xp_deep"] = range(len(train_data["activity"]))
test_data["index_xp_deep"] = range(len(test_data["activity"]))
val_data["index_xp_deep"] = range(len(val_data["activity"]))
# Convert to pyarrow Table format
train_table = pa.Table.from_pandas(train_data, preserve_index=False)
val_table = pa.Table.from_pandas(val_data, preserve_index=False)
test_table = pa.Table.from_pandas(test_data, preserve_index=False)
# Save each split as ".parquet" file
pq.write_table(train_table, "train.parquet")
pq.write_table(val_table, "val.parquet")
pq.write_table(test_table, "test.parquet")
# 2. Upload your Converted Data
init(api_key="api_key", api_url="api_url")
set_project(Project(id="HarId", name="Har Tutorial"))
directory = upload(
directory_name="har_uploaded",
train_set_path="train.parquet",
test_set_path="test.parquet",
val_set_path="val.parquet",
)
# 4. Find a schema
analyzed_schema = AnalyzedSchema(
Metadata(name="index_xp_deep"),
MultivariateAsynchronousTimeSerie(
channel_names=[
"body_acc_x",
"body_acc_y",
"body_acc_z",
"body_gyro_x",
"body_gyro_y",
"body_gyro_z",
"total_acc_x",
"total_acc_y",
"total_acc_z",
],
name="human_activity",
size=[128, 9],
preprocessor=ScaleHAR(input_size=(128, 9)),
),
CategoricalFeature(
is_target=True,
name="activity",
categories=[],
preprocessor=SklearnPreprocessor(preprocess_function=OneHotEncoder(sparse_output=False)),
),
)
# Create a dataset from the analyzed schema.
analyzed_train_dataset = AnalyzedParquetDataset(
split_name="train",
identifier_name="my_local_dataset",
path=directory["train_set_path"],
analyzed_schema=analyzed_schema,
)
print(analyzed_schema)
# 5. Fit the schema
fit_train_dataset = analyzed_train_dataset.fit()
fit_test_dataset = FittedParquetDataset(
split_name="test",
identifier_name="my_local_dataset",
path=directory["test_set_path"],
fitted_schema=fit_train_dataset.fitted_schema,
)
fit_val_dataset = FittedParquetDataset(
split_name="val",
identifier_name="my_local_dataset",
path=directory["val_set_path"],
fitted_schema=fit_train_dataset.fitted_schema,
)
# ##### Prepare the Model #######
# 1. Create the required torch models
input_size = fit_train_dataset.fitted_schema.input_size[1:]
target_size = fit_train_dataset.fitted_schema.target_size[1]
print(f"input_size: {input_size} - target_size: {target_size}")
backbone = HarCNN(
with_softmax=False, # No softmax for latent space
output_size=128,
)
feature_extraction = MLP(
norm_layer=None,
flatten_input=True,
dropout=0.2,
input_size=128,
hidden_channels=[64, 32],
)
task_learner = MLP(
norm_layer=None,
input_size=32,
hidden_channels=[target_size],
last_activation=partial(torch.nn.Softmax, dim=-1),
)
# 2. Explainable Model Specifications
model_specifications = ModelDecisionGraphParameters(
graph_depth=3,
discrimination_weight=0.1,
target_homogeneity_weight=2.0,
prune_step=11,
target_homogeneity_pruning_threshold=0.7,
population_pruning_threshold=0.05,
balancing_weight=1.0,
)
# 3. Create the Explainable Model
xpdeep_model = XpdeepModel.from_torch(
fitted_schema=fit_train_dataset.fitted_schema,
feature_extraction=feature_extraction,
task_learner=task_learner,
backbone=backbone,
decision_graph_parameters=model_specifications,
)
# ##### Train #######
# Metrics to monitor the training.
metrics = DictMetrics(
multi_class_accuracy=partial(MulticlassAccuracy, num_classes=target_size),
multi_class_F1_score=partial(MulticlassF1Score, num_classes=target_size),
confusion_matrix=partial(MulticlassConfusionMatrix, normalize="all", num_classes=target_size),
)
callbacks = [
EarlyStopping(monitoring_metric="Total loss", mode="minimize", patience=5),
Scheduler(
pre_scheduler=partial(ReduceLROnPlateau, patience=3, mode="max"),
step_method="epoch",
monitoring_metric="global_multi_class_accuracy",
),
]
# Optimizer is a partial object as pytorch needs to give the model as optimizer parameter.
optimizer = partial(torch.optim.AdamW, lr=0.001)
trainer = Trainer(
loss=CrossEntropyLossFromProbabilities(reduction="none"),
optimizer=optimizer,
callbacks=callbacks,
start_epoch=0,
max_epochs=20,
metrics=metrics,
)
trained_model = trainer.train(
model=xpdeep_model,
train_set=fit_train_dataset,
validation_set=fit_val_dataset,
batch_size=128,
)
# ##### Explain #######
# 1. Build the Explainer
statistics = DictStats(
distribution_target=DistributionStat(on="target"), distribution_prediction=DistributionStat(on="prediction")
)
quality_metrics = [Sensitivity(), Infidelity()]
explainer = Explainer(
description_representativeness=1000, quality_metrics=quality_metrics, metrics=metrics, statistics=statistics
)
# 2. Model Functioning Explanations
model_explanations = explainer.global_explain(
trained_model,
train_set=fit_train_dataset,
test_set=fit_test_dataset,
validation_set=fit_val_dataset,
)
visualisation_link = model_explanations.visualisation_link
# 3. Inference and their Causal Explanations
my_filter = Filter("testing_filter", fit_test_dataset, row_indexes=list(range(100)))
causal_explanations = explainer.local_explain(trained_model, fit_test_dataset, my_filter)
visualisation_link = causal_explanations.visualisation_link
For further details, see docs
Note
All parameters have a default value, you can start by using those default value, then iterate and update the configuration to find suitable explanations.
3. Create the Explainable Model#
Given the model architecture and configuration, we can finally instantiate the explainable model XpdeepModel
.
from xpdeep.model.xpdeep_model import XpdeepModel
xpdeep_model = XpdeepModel.from_torch(
fitted_schema=fit_train_dataset.fitted_schema,
feature_extraction=feature_extraction,
task_learner=task_learner,
backbone=backbone,
decision_graph_parameters=model_specifications,
)
👀 Full file preview
"""HAR workflow, classification, time series data."""
from functools import partial
from pathlib import Path
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import torch
from torch.optim.lr_scheduler import ReduceLROnPlateau
from models import HarCNN
from preprocessors import ScaleHAR
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from xpdeep import init, set_project
from xpdeep.dataset.parquet_dataset import AnalyzedParquetDataset, FittedParquetDataset
from xpdeep.dataset.schema.feature.feature import (
CategoricalFeature,
Metadata,
MultivariateAsynchronousTimeSerie,
)
from xpdeep.dataset.schema.preprocessor import SklearnPreprocessor
from xpdeep.dataset.schema.schema import AnalyzedSchema
from xpdeep.dataset.upload import upload
from xpdeep.explain.explainer import Explainer
from xpdeep.explain.quality_metrics import Infidelity, Sensitivity
from xpdeep.explain.statistic import DictStats, DistributionStat
from xpdeep.filtering.filter import Filter
from xpdeep.metrics.metric import DictMetrics
from xpdeep.metrics.zoo.multiclass_metrics import MulticlassConfusionMatrix, MulticlassAccuracy, MulticlassF1Score
from xpdeep.model.model_builder import ModelDecisionGraphParameters
from xpdeep.model.xpdeep_model import XpdeepModel
from xpdeep.model.zoo.cross_entropy_loss_from_proba import CrossEntropyLossFromProbabilities
from xpdeep.model.zoo.mlp import MLP
from xpdeep.project import Project
from xpdeep.trainer.callbacks import EarlyStopping, Scheduler
from xpdeep.trainer.trainer import Trainer
torch.random.manual_seed(5)
# ##### Prepare the Dataset #######
# 1. Split and Convert your Raw Data
features_dict = {}
split_name = "train"
for feature_filepath in sorted(Path(f"{split_name}/Inertial Signals/").rglob("*.txt")):
feature_name = feature_filepath.stem
features_dict[feature_name] = np.squeeze(
pd.read_csv(feature_filepath, sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
inputs = np.transpose(np.stack(list(features_dict.values()), axis=1), (0, 2, 1))
targets = np.squeeze(
pd.read_csv(f"{split_name}/y_{split_name}.txt", sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
# Map the target to their labels
activity_mapping = {
1: "Walking",
2: "Walking upstairs",
3: "Walking downstairs",
4: "Sitting",
5: "Standing",
6: "Laying",
}
targets_mapper = np.vectorize(lambda x: activity_mapping[x])
targets = targets_mapper(targets) # Map targets to their labels.
train_val_data = pd.DataFrame.from_dict({"human_activity": inputs.tolist(), "activity": targets})
# Split the data into training and validation sets
train_data, val_data = train_test_split(train_val_data, test_size=0.2, random_state=42)
features_dict = {}
split_name = "test"
for feature_filepath in sorted(Path(f"{split_name}/Inertial Signals/").rglob("*.txt")):
feature_name = feature_filepath.stem
features_dict[feature_name] = np.squeeze(
pd.read_csv(feature_filepath, sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
inputs = np.transpose(np.stack(list(features_dict.values()), axis=1), (0, 2, 1))
targets = np.squeeze(
pd.read_csv(f"{split_name}/y_{split_name}.txt", sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
targets = targets_mapper(targets) # Map targets to their labels.
test_data = pd.DataFrame.from_dict({"human_activity": inputs.tolist(), "activity": targets})
train_data["index_xp_deep"] = range(len(train_data["activity"]))
test_data["index_xp_deep"] = range(len(test_data["activity"]))
val_data["index_xp_deep"] = range(len(val_data["activity"]))
# Convert to pyarrow Table format
train_table = pa.Table.from_pandas(train_data, preserve_index=False)
val_table = pa.Table.from_pandas(val_data, preserve_index=False)
test_table = pa.Table.from_pandas(test_data, preserve_index=False)
# Save each split as ".parquet" file
pq.write_table(train_table, "train.parquet")
pq.write_table(val_table, "val.parquet")
pq.write_table(test_table, "test.parquet")
# 2. Upload your Converted Data
init(api_key="api_key", api_url="api_url")
set_project(Project(id="HarId", name="Har Tutorial"))
directory = upload(
directory_name="har_uploaded",
train_set_path="train.parquet",
test_set_path="test.parquet",
val_set_path="val.parquet",
)
# 4. Find a schema
analyzed_schema = AnalyzedSchema(
Metadata(name="index_xp_deep"),
MultivariateAsynchronousTimeSerie(
channel_names=[
"body_acc_x",
"body_acc_y",
"body_acc_z",
"body_gyro_x",
"body_gyro_y",
"body_gyro_z",
"total_acc_x",
"total_acc_y",
"total_acc_z",
],
name="human_activity",
size=[128, 9],
preprocessor=ScaleHAR(input_size=(128, 9)),
),
CategoricalFeature(
is_target=True,
name="activity",
categories=[],
preprocessor=SklearnPreprocessor(preprocess_function=OneHotEncoder(sparse_output=False)),
),
)
# Create a dataset from the analyzed schema.
analyzed_train_dataset = AnalyzedParquetDataset(
split_name="train",
identifier_name="my_local_dataset",
path=directory["train_set_path"],
analyzed_schema=analyzed_schema,
)
print(analyzed_schema)
# 5. Fit the schema
fit_train_dataset = analyzed_train_dataset.fit()
fit_test_dataset = FittedParquetDataset(
split_name="test",
identifier_name="my_local_dataset",
path=directory["test_set_path"],
fitted_schema=fit_train_dataset.fitted_schema,
)
fit_val_dataset = FittedParquetDataset(
split_name="val",
identifier_name="my_local_dataset",
path=directory["val_set_path"],
fitted_schema=fit_train_dataset.fitted_schema,
)
# ##### Prepare the Model #######
# 1. Create the required torch models
input_size = fit_train_dataset.fitted_schema.input_size[1:]
target_size = fit_train_dataset.fitted_schema.target_size[1]
print(f"input_size: {input_size} - target_size: {target_size}")
backbone = HarCNN(
with_softmax=False, # No softmax for latent space
output_size=128,
)
feature_extraction = MLP(
norm_layer=None,
flatten_input=True,
dropout=0.2,
input_size=128,
hidden_channels=[64, 32],
)
task_learner = MLP(
norm_layer=None,
input_size=32,
hidden_channels=[target_size],
last_activation=partial(torch.nn.Softmax, dim=-1),
)
# 2. Explainable Model Specifications
model_specifications = ModelDecisionGraphParameters(
graph_depth=3,
discrimination_weight=0.1,
target_homogeneity_weight=2.0,
prune_step=11,
target_homogeneity_pruning_threshold=0.7,
population_pruning_threshold=0.05,
balancing_weight=1.0,
)
# 3. Create the Explainable Model
xpdeep_model = XpdeepModel.from_torch(
fitted_schema=fit_train_dataset.fitted_schema,
feature_extraction=feature_extraction,
task_learner=task_learner,
backbone=backbone,
decision_graph_parameters=model_specifications,
)
# ##### Train #######
# Metrics to monitor the training.
metrics = DictMetrics(
multi_class_accuracy=partial(MulticlassAccuracy, num_classes=target_size),
multi_class_F1_score=partial(MulticlassF1Score, num_classes=target_size),
confusion_matrix=partial(MulticlassConfusionMatrix, normalize="all", num_classes=target_size),
)
callbacks = [
EarlyStopping(monitoring_metric="Total loss", mode="minimize", patience=5),
Scheduler(
pre_scheduler=partial(ReduceLROnPlateau, patience=3, mode="max"),
step_method="epoch",
monitoring_metric="global_multi_class_accuracy",
),
]
# Optimizer is a partial object as pytorch needs to give the model as optimizer parameter.
optimizer = partial(torch.optim.AdamW, lr=0.001)
trainer = Trainer(
loss=CrossEntropyLossFromProbabilities(reduction="none"),
optimizer=optimizer,
callbacks=callbacks,
start_epoch=0,
max_epochs=20,
metrics=metrics,
)
trained_model = trainer.train(
model=xpdeep_model,
train_set=fit_train_dataset,
validation_set=fit_val_dataset,
batch_size=128,
)
# ##### Explain #######
# 1. Build the Explainer
statistics = DictStats(
distribution_target=DistributionStat(on="target"), distribution_prediction=DistributionStat(on="prediction")
)
quality_metrics = [Sensitivity(), Infidelity()]
explainer = Explainer(
description_representativeness=1000, quality_metrics=quality_metrics, metrics=metrics, statistics=statistics
)
# 2. Model Functioning Explanations
model_explanations = explainer.global_explain(
trained_model,
train_set=fit_train_dataset,
test_set=fit_test_dataset,
validation_set=fit_val_dataset,
)
visualisation_link = model_explanations.visualisation_link
# 3. Inference and their Causal Explanations
my_filter = Filter("testing_filter", fit_test_dataset, row_indexes=list(range(100)))
causal_explanations = explainer.local_explain(trained_model, fit_test_dataset, my_filter)
visualisation_link = causal_explanations.visualisation_link
Train#
The train step is straightforward: we need to specify the Trainer
parameters.
from functools import partial
import torch
from xpdeep.metrics.metric import DictMetrics
from xpdeep.metrics.zoo.multiclass_metrics import MulticlassConfusionMatrix, MulticlassAccuracy, MulticlassF1Score
from xpdeep.trainer.callbacks import EarlyStopping, Scheduler
from xpdeep.trainer.trainer import Trainer
from xpdeep.model.zoo.cross_entropy_loss_from_proba import CrossEntropyLossFromProbabilities
from torch.optim.lr_scheduler import ReduceLROnPlateau
# Metrics to monitor the training.
metrics = DictMetrics(
multi_class_accuracy=partial(MulticlassAccuracy, num_classes=target_size),
multi_class_F1_score=partial(MulticlassF1Score, num_classes=target_size),
confusion_matrix=partial(MulticlassConfusionMatrix, normalize="all", num_classes=target_size),
)
callbacks = [
EarlyStopping(monitoring_metric="Total loss", mode="minimize", patience=5),
Scheduler(
pre_scheduler=partial(ReduceLROnPlateau, patience=3, mode="max"),
step_method="epoch",
monitoring_metric="global_multi_class_accuracy",
),
]
# Optimizer is a partial object as pytorch needs to give the model as optimizer parameter.
optimizer = partial(torch.optim.AdamW, lr=0.001)
trainer = Trainer(
loss=CrossEntropyLossFromProbabilities(reduction="none"),
optimizer=optimizer,
callbacks=callbacks,
start_epoch=0,
max_epochs=20,
metrics=metrics,
)
👀 Full file preview
"""HAR workflow, classification, time series data."""
from functools import partial
from pathlib import Path
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import torch
from torch.optim.lr_scheduler import ReduceLROnPlateau
from models import HarCNN
from preprocessors import ScaleHAR
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from xpdeep import init, set_project
from xpdeep.dataset.parquet_dataset import AnalyzedParquetDataset, FittedParquetDataset
from xpdeep.dataset.schema.feature.feature import (
CategoricalFeature,
Metadata,
MultivariateAsynchronousTimeSerie,
)
from xpdeep.dataset.schema.preprocessor import SklearnPreprocessor
from xpdeep.dataset.schema.schema import AnalyzedSchema
from xpdeep.dataset.upload import upload
from xpdeep.explain.explainer import Explainer
from xpdeep.explain.quality_metrics import Infidelity, Sensitivity
from xpdeep.explain.statistic import DictStats, DistributionStat
from xpdeep.filtering.filter import Filter
from xpdeep.metrics.metric import DictMetrics
from xpdeep.metrics.zoo.multiclass_metrics import MulticlassConfusionMatrix, MulticlassAccuracy, MulticlassF1Score
from xpdeep.model.model_builder import ModelDecisionGraphParameters
from xpdeep.model.xpdeep_model import XpdeepModel
from xpdeep.model.zoo.cross_entropy_loss_from_proba import CrossEntropyLossFromProbabilities
from xpdeep.model.zoo.mlp import MLP
from xpdeep.project import Project
from xpdeep.trainer.callbacks import EarlyStopping, Scheduler
from xpdeep.trainer.trainer import Trainer
torch.random.manual_seed(5)
# ##### Prepare the Dataset #######
# 1. Split and Convert your Raw Data
features_dict = {}
split_name = "train"
for feature_filepath in sorted(Path(f"{split_name}/Inertial Signals/").rglob("*.txt")):
feature_name = feature_filepath.stem
features_dict[feature_name] = np.squeeze(
pd.read_csv(feature_filepath, sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
inputs = np.transpose(np.stack(list(features_dict.values()), axis=1), (0, 2, 1))
targets = np.squeeze(
pd.read_csv(f"{split_name}/y_{split_name}.txt", sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
# Map the target to their labels
activity_mapping = {
1: "Walking",
2: "Walking upstairs",
3: "Walking downstairs",
4: "Sitting",
5: "Standing",
6: "Laying",
}
targets_mapper = np.vectorize(lambda x: activity_mapping[x])
targets = targets_mapper(targets) # Map targets to their labels.
train_val_data = pd.DataFrame.from_dict({"human_activity": inputs.tolist(), "activity": targets})
# Split the data into training and validation sets
train_data, val_data = train_test_split(train_val_data, test_size=0.2, random_state=42)
features_dict = {}
split_name = "test"
for feature_filepath in sorted(Path(f"{split_name}/Inertial Signals/").rglob("*.txt")):
feature_name = feature_filepath.stem
features_dict[feature_name] = np.squeeze(
pd.read_csv(feature_filepath, sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
inputs = np.transpose(np.stack(list(features_dict.values()), axis=1), (0, 2, 1))
targets = np.squeeze(
pd.read_csv(f"{split_name}/y_{split_name}.txt", sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
targets = targets_mapper(targets) # Map targets to their labels.
test_data = pd.DataFrame.from_dict({"human_activity": inputs.tolist(), "activity": targets})
train_data["index_xp_deep"] = range(len(train_data["activity"]))
test_data["index_xp_deep"] = range(len(test_data["activity"]))
val_data["index_xp_deep"] = range(len(val_data["activity"]))
# Convert to pyarrow Table format
train_table = pa.Table.from_pandas(train_data, preserve_index=False)
val_table = pa.Table.from_pandas(val_data, preserve_index=False)
test_table = pa.Table.from_pandas(test_data, preserve_index=False)
# Save each split as ".parquet" file
pq.write_table(train_table, "train.parquet")
pq.write_table(val_table, "val.parquet")
pq.write_table(test_table, "test.parquet")
# 2. Upload your Converted Data
init(api_key="api_key", api_url="api_url")
set_project(Project(id="HarId", name="Har Tutorial"))
directory = upload(
directory_name="har_uploaded",
train_set_path="train.parquet",
test_set_path="test.parquet",
val_set_path="val.parquet",
)
# 4. Find a schema
analyzed_schema = AnalyzedSchema(
Metadata(name="index_xp_deep"),
MultivariateAsynchronousTimeSerie(
channel_names=[
"body_acc_x",
"body_acc_y",
"body_acc_z",
"body_gyro_x",
"body_gyro_y",
"body_gyro_z",
"total_acc_x",
"total_acc_y",
"total_acc_z",
],
name="human_activity",
size=[128, 9],
preprocessor=ScaleHAR(input_size=(128, 9)),
),
CategoricalFeature(
is_target=True,
name="activity",
categories=[],
preprocessor=SklearnPreprocessor(preprocess_function=OneHotEncoder(sparse_output=False)),
),
)
# Create a dataset from the analyzed schema.
analyzed_train_dataset = AnalyzedParquetDataset(
split_name="train",
identifier_name="my_local_dataset",
path=directory["train_set_path"],
analyzed_schema=analyzed_schema,
)
print(analyzed_schema)
# 5. Fit the schema
fit_train_dataset = analyzed_train_dataset.fit()
fit_test_dataset = FittedParquetDataset(
split_name="test",
identifier_name="my_local_dataset",
path=directory["test_set_path"],
fitted_schema=fit_train_dataset.fitted_schema,
)
fit_val_dataset = FittedParquetDataset(
split_name="val",
identifier_name="my_local_dataset",
path=directory["val_set_path"],
fitted_schema=fit_train_dataset.fitted_schema,
)
# ##### Prepare the Model #######
# 1. Create the required torch models
input_size = fit_train_dataset.fitted_schema.input_size[1:]
target_size = fit_train_dataset.fitted_schema.target_size[1]
print(f"input_size: {input_size} - target_size: {target_size}")
backbone = HarCNN(
with_softmax=False, # No softmax for latent space
output_size=128,
)
feature_extraction = MLP(
norm_layer=None,
flatten_input=True,
dropout=0.2,
input_size=128,
hidden_channels=[64, 32],
)
task_learner = MLP(
norm_layer=None,
input_size=32,
hidden_channels=[target_size],
last_activation=partial(torch.nn.Softmax, dim=-1),
)
# 2. Explainable Model Specifications
model_specifications = ModelDecisionGraphParameters(
graph_depth=3,
discrimination_weight=0.1,
target_homogeneity_weight=2.0,
prune_step=11,
target_homogeneity_pruning_threshold=0.7,
population_pruning_threshold=0.05,
balancing_weight=1.0,
)
# 3. Create the Explainable Model
xpdeep_model = XpdeepModel.from_torch(
fitted_schema=fit_train_dataset.fitted_schema,
feature_extraction=feature_extraction,
task_learner=task_learner,
backbone=backbone,
decision_graph_parameters=model_specifications,
)
# ##### Train #######
# Metrics to monitor the training.
metrics = DictMetrics(
multi_class_accuracy=partial(MulticlassAccuracy, num_classes=target_size),
multi_class_F1_score=partial(MulticlassF1Score, num_classes=target_size),
confusion_matrix=partial(MulticlassConfusionMatrix, normalize="all", num_classes=target_size),
)
callbacks = [
EarlyStopping(monitoring_metric="Total loss", mode="minimize", patience=5),
Scheduler(
pre_scheduler=partial(ReduceLROnPlateau, patience=3, mode="max"),
step_method="epoch",
monitoring_metric="global_multi_class_accuracy",
),
]
# Optimizer is a partial object as pytorch needs to give the model as optimizer parameter.
optimizer = partial(torch.optim.AdamW, lr=0.001)
trainer = Trainer(
loss=CrossEntropyLossFromProbabilities(reduction="none"),
optimizer=optimizer,
callbacks=callbacks,
start_epoch=0,
max_epochs=20,
metrics=metrics,
)
trained_model = trainer.train(
model=xpdeep_model,
train_set=fit_train_dataset,
validation_set=fit_val_dataset,
batch_size=128,
)
# ##### Explain #######
# 1. Build the Explainer
statistics = DictStats(
distribution_target=DistributionStat(on="target"), distribution_prediction=DistributionStat(on="prediction")
)
quality_metrics = [Sensitivity(), Infidelity()]
explainer = Explainer(
description_representativeness=1000, quality_metrics=quality_metrics, metrics=metrics, statistics=statistics
)
# 2. Model Functioning Explanations
model_explanations = explainer.global_explain(
trained_model,
train_set=fit_train_dataset,
test_set=fit_test_dataset,
validation_set=fit_val_dataset,
)
visualisation_link = model_explanations.visualisation_link
# 3. Inference and their Causal Explanations
my_filter = Filter("testing_filter", fit_test_dataset, row_indexes=list(range(100)))
causal_explanations = explainer.local_explain(trained_model, fit_test_dataset, my_filter)
visualisation_link = causal_explanations.visualisation_link
We can now train the model:
trained_model = trainer.train(
model=xpdeep_model,
train_set=fit_train_dataset,
validation_set=fit_val_dataset,
batch_size=128,
)
👀 Full file preview
"""HAR workflow, classification, time series data."""
from functools import partial
from pathlib import Path
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import torch
from torch.optim.lr_scheduler import ReduceLROnPlateau
from models import HarCNN
from preprocessors import ScaleHAR
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from xpdeep import init, set_project
from xpdeep.dataset.parquet_dataset import AnalyzedParquetDataset, FittedParquetDataset
from xpdeep.dataset.schema.feature.feature import (
CategoricalFeature,
Metadata,
MultivariateAsynchronousTimeSerie,
)
from xpdeep.dataset.schema.preprocessor import SklearnPreprocessor
from xpdeep.dataset.schema.schema import AnalyzedSchema
from xpdeep.dataset.upload import upload
from xpdeep.explain.explainer import Explainer
from xpdeep.explain.quality_metrics import Infidelity, Sensitivity
from xpdeep.explain.statistic import DictStats, DistributionStat
from xpdeep.filtering.filter import Filter
from xpdeep.metrics.metric import DictMetrics
from xpdeep.metrics.zoo.multiclass_metrics import MulticlassConfusionMatrix, MulticlassAccuracy, MulticlassF1Score
from xpdeep.model.model_builder import ModelDecisionGraphParameters
from xpdeep.model.xpdeep_model import XpdeepModel
from xpdeep.model.zoo.cross_entropy_loss_from_proba import CrossEntropyLossFromProbabilities
from xpdeep.model.zoo.mlp import MLP
from xpdeep.project import Project
from xpdeep.trainer.callbacks import EarlyStopping, Scheduler
from xpdeep.trainer.trainer import Trainer
torch.random.manual_seed(5)
# ##### Prepare the Dataset #######
# 1. Split and Convert your Raw Data
features_dict = {}
split_name = "train"
for feature_filepath in sorted(Path(f"{split_name}/Inertial Signals/").rglob("*.txt")):
feature_name = feature_filepath.stem
features_dict[feature_name] = np.squeeze(
pd.read_csv(feature_filepath, sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
inputs = np.transpose(np.stack(list(features_dict.values()), axis=1), (0, 2, 1))
targets = np.squeeze(
pd.read_csv(f"{split_name}/y_{split_name}.txt", sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
# Map the target to their labels
activity_mapping = {
1: "Walking",
2: "Walking upstairs",
3: "Walking downstairs",
4: "Sitting",
5: "Standing",
6: "Laying",
}
targets_mapper = np.vectorize(lambda x: activity_mapping[x])
targets = targets_mapper(targets) # Map targets to their labels.
train_val_data = pd.DataFrame.from_dict({"human_activity": inputs.tolist(), "activity": targets})
# Split the data into training and validation sets
train_data, val_data = train_test_split(train_val_data, test_size=0.2, random_state=42)
features_dict = {}
split_name = "test"
for feature_filepath in sorted(Path(f"{split_name}/Inertial Signals/").rglob("*.txt")):
feature_name = feature_filepath.stem
features_dict[feature_name] = np.squeeze(
pd.read_csv(feature_filepath, sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
inputs = np.transpose(np.stack(list(features_dict.values()), axis=1), (0, 2, 1))
targets = np.squeeze(
pd.read_csv(f"{split_name}/y_{split_name}.txt", sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
targets = targets_mapper(targets) # Map targets to their labels.
test_data = pd.DataFrame.from_dict({"human_activity": inputs.tolist(), "activity": targets})
train_data["index_xp_deep"] = range(len(train_data["activity"]))
test_data["index_xp_deep"] = range(len(test_data["activity"]))
val_data["index_xp_deep"] = range(len(val_data["activity"]))
# Convert to pyarrow Table format
train_table = pa.Table.from_pandas(train_data, preserve_index=False)
val_table = pa.Table.from_pandas(val_data, preserve_index=False)
test_table = pa.Table.from_pandas(test_data, preserve_index=False)
# Save each split as ".parquet" file
pq.write_table(train_table, "train.parquet")
pq.write_table(val_table, "val.parquet")
pq.write_table(test_table, "test.parquet")
# 2. Upload your Converted Data
init(api_key="api_key", api_url="api_url")
set_project(Project(id="HarId", name="Har Tutorial"))
directory = upload(
directory_name="har_uploaded",
train_set_path="train.parquet",
test_set_path="test.parquet",
val_set_path="val.parquet",
)
# 4. Find a schema
analyzed_schema = AnalyzedSchema(
Metadata(name="index_xp_deep"),
MultivariateAsynchronousTimeSerie(
channel_names=[
"body_acc_x",
"body_acc_y",
"body_acc_z",
"body_gyro_x",
"body_gyro_y",
"body_gyro_z",
"total_acc_x",
"total_acc_y",
"total_acc_z",
],
name="human_activity",
size=[128, 9],
preprocessor=ScaleHAR(input_size=(128, 9)),
),
CategoricalFeature(
is_target=True,
name="activity",
categories=[],
preprocessor=SklearnPreprocessor(preprocess_function=OneHotEncoder(sparse_output=False)),
),
)
# Create a dataset from the analyzed schema.
analyzed_train_dataset = AnalyzedParquetDataset(
split_name="train",
identifier_name="my_local_dataset",
path=directory["train_set_path"],
analyzed_schema=analyzed_schema,
)
print(analyzed_schema)
# 5. Fit the schema
fit_train_dataset = analyzed_train_dataset.fit()
fit_test_dataset = FittedParquetDataset(
split_name="test",
identifier_name="my_local_dataset",
path=directory["test_set_path"],
fitted_schema=fit_train_dataset.fitted_schema,
)
fit_val_dataset = FittedParquetDataset(
split_name="val",
identifier_name="my_local_dataset",
path=directory["val_set_path"],
fitted_schema=fit_train_dataset.fitted_schema,
)
# ##### Prepare the Model #######
# 1. Create the required torch models
input_size = fit_train_dataset.fitted_schema.input_size[1:]
target_size = fit_train_dataset.fitted_schema.target_size[1]
print(f"input_size: {input_size} - target_size: {target_size}")
backbone = HarCNN(
with_softmax=False, # No softmax for latent space
output_size=128,
)
feature_extraction = MLP(
norm_layer=None,
flatten_input=True,
dropout=0.2,
input_size=128,
hidden_channels=[64, 32],
)
task_learner = MLP(
norm_layer=None,
input_size=32,
hidden_channels=[target_size],
last_activation=partial(torch.nn.Softmax, dim=-1),
)
# 2. Explainable Model Specifications
model_specifications = ModelDecisionGraphParameters(
graph_depth=3,
discrimination_weight=0.1,
target_homogeneity_weight=2.0,
prune_step=11,
target_homogeneity_pruning_threshold=0.7,
population_pruning_threshold=0.05,
balancing_weight=1.0,
)
# 3. Create the Explainable Model
xpdeep_model = XpdeepModel.from_torch(
fitted_schema=fit_train_dataset.fitted_schema,
feature_extraction=feature_extraction,
task_learner=task_learner,
backbone=backbone,
decision_graph_parameters=model_specifications,
)
# ##### Train #######
# Metrics to monitor the training.
metrics = DictMetrics(
multi_class_accuracy=partial(MulticlassAccuracy, num_classes=target_size),
multi_class_F1_score=partial(MulticlassF1Score, num_classes=target_size),
confusion_matrix=partial(MulticlassConfusionMatrix, normalize="all", num_classes=target_size),
)
callbacks = [
EarlyStopping(monitoring_metric="Total loss", mode="minimize", patience=5),
Scheduler(
pre_scheduler=partial(ReduceLROnPlateau, patience=3, mode="max"),
step_method="epoch",
monitoring_metric="global_multi_class_accuracy",
),
]
# Optimizer is a partial object as pytorch needs to give the model as optimizer parameter.
optimizer = partial(torch.optim.AdamW, lr=0.001)
trainer = Trainer(
loss=CrossEntropyLossFromProbabilities(reduction="none"),
optimizer=optimizer,
callbacks=callbacks,
start_epoch=0,
max_epochs=20,
metrics=metrics,
)
trained_model = trainer.train(
model=xpdeep_model,
train_set=fit_train_dataset,
validation_set=fit_val_dataset,
batch_size=128,
)
# ##### Explain #######
# 1. Build the Explainer
statistics = DictStats(
distribution_target=DistributionStat(on="target"), distribution_prediction=DistributionStat(on="prediction")
)
quality_metrics = [Sensitivity(), Infidelity()]
explainer = Explainer(
description_representativeness=1000, quality_metrics=quality_metrics, metrics=metrics, statistics=statistics
)
# 2. Model Functioning Explanations
model_explanations = explainer.global_explain(
trained_model,
train_set=fit_train_dataset,
test_set=fit_test_dataset,
validation_set=fit_val_dataset,
)
visualisation_link = model_explanations.visualisation_link
# 3. Inference and their Causal Explanations
my_filter = Filter("testing_filter", fit_test_dataset, row_indexes=list(range(100)))
causal_explanations = explainer.local_explain(trained_model, fit_test_dataset, my_filter)
visualisation_link = causal_explanations.visualisation_link
The training logs are displayed in the console:
Epoch 1/20 - Loss: 1.790: 2%|▏ | 1/46 [00:04<03:22, 4.49s/it]
Epoch 1/20 - Loss: 1.790: 2%|▏ | 1/46 [00:05<03:22, 4.49s/it]
Epoch 1/20 - Loss: 1.576: 4%|▍ | 2/46 [00:06<02:04, 2.83s/it]
Epoch 1/20 - Loss: 1.576: 4%|▍ | 2/46 [00:06<02:04, 2.83s/it]
Once the model trained, it can be used to get explanations.
Explain#
Similarly to the Trainer
, explanations are computed with an Explainer
interface.
1. Build the Explainer#
We provide the Explainer
quality metrics to get insights on the explanation quality. In addition, we compute
along with the explanations histograms to get a detailed distribution on targets and predictions. Finally, we set description_representativeness
to 1000.
from xpdeep.explain.explainer import Explainer
from xpdeep.explain.quality_metrics import Infidelity, Sensitivity
from xpdeep.explain.statistic import DictStats, DistributionStat
statistics = DictStats(
distribution_target=DistributionStat(on="target"),
distribution_prediction=DistributionStat(on="prediction")
)
quality_metrics = [Sensitivity(), Infidelity()]
explainer = Explainer(
description_representativeness=1000, quality_metrics=quality_metrics,
metrics=metrics, statistics=statistics
)
👀 Full file preview
"""HAR workflow, classification, time series data."""
from functools import partial
from pathlib import Path
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import torch
from torch.optim.lr_scheduler import ReduceLROnPlateau
from models import HarCNN
from preprocessors import ScaleHAR
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from xpdeep import init, set_project
from xpdeep.dataset.parquet_dataset import AnalyzedParquetDataset, FittedParquetDataset
from xpdeep.dataset.schema.feature.feature import (
CategoricalFeature,
Metadata,
MultivariateAsynchronousTimeSerie,
)
from xpdeep.dataset.schema.preprocessor import SklearnPreprocessor
from xpdeep.dataset.schema.schema import AnalyzedSchema
from xpdeep.dataset.upload import upload
from xpdeep.explain.explainer import Explainer
from xpdeep.explain.quality_metrics import Infidelity, Sensitivity
from xpdeep.explain.statistic import DictStats, DistributionStat
from xpdeep.filtering.filter import Filter
from xpdeep.metrics.metric import DictMetrics
from xpdeep.metrics.zoo.multiclass_metrics import MulticlassConfusionMatrix, MulticlassAccuracy, MulticlassF1Score
from xpdeep.model.model_builder import ModelDecisionGraphParameters
from xpdeep.model.xpdeep_model import XpdeepModel
from xpdeep.model.zoo.cross_entropy_loss_from_proba import CrossEntropyLossFromProbabilities
from xpdeep.model.zoo.mlp import MLP
from xpdeep.project import Project
from xpdeep.trainer.callbacks import EarlyStopping, Scheduler
from xpdeep.trainer.trainer import Trainer
torch.random.manual_seed(5)
# ##### Prepare the Dataset #######
# 1. Split and Convert your Raw Data
features_dict = {}
split_name = "train"
for feature_filepath in sorted(Path(f"{split_name}/Inertial Signals/").rglob("*.txt")):
feature_name = feature_filepath.stem
features_dict[feature_name] = np.squeeze(
pd.read_csv(feature_filepath, sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
inputs = np.transpose(np.stack(list(features_dict.values()), axis=1), (0, 2, 1))
targets = np.squeeze(
pd.read_csv(f"{split_name}/y_{split_name}.txt", sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
# Map the target to their labels
activity_mapping = {
1: "Walking",
2: "Walking upstairs",
3: "Walking downstairs",
4: "Sitting",
5: "Standing",
6: "Laying",
}
targets_mapper = np.vectorize(lambda x: activity_mapping[x])
targets = targets_mapper(targets) # Map targets to their labels.
train_val_data = pd.DataFrame.from_dict({"human_activity": inputs.tolist(), "activity": targets})
# Split the data into training and validation sets
train_data, val_data = train_test_split(train_val_data, test_size=0.2, random_state=42)
features_dict = {}
split_name = "test"
for feature_filepath in sorted(Path(f"{split_name}/Inertial Signals/").rglob("*.txt")):
feature_name = feature_filepath.stem
features_dict[feature_name] = np.squeeze(
pd.read_csv(feature_filepath, sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
inputs = np.transpose(np.stack(list(features_dict.values()), axis=1), (0, 2, 1))
targets = np.squeeze(
pd.read_csv(f"{split_name}/y_{split_name}.txt", sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
targets = targets_mapper(targets) # Map targets to their labels.
test_data = pd.DataFrame.from_dict({"human_activity": inputs.tolist(), "activity": targets})
train_data["index_xp_deep"] = range(len(train_data["activity"]))
test_data["index_xp_deep"] = range(len(test_data["activity"]))
val_data["index_xp_deep"] = range(len(val_data["activity"]))
# Convert to pyarrow Table format
train_table = pa.Table.from_pandas(train_data, preserve_index=False)
val_table = pa.Table.from_pandas(val_data, preserve_index=False)
test_table = pa.Table.from_pandas(test_data, preserve_index=False)
# Save each split as ".parquet" file
pq.write_table(train_table, "train.parquet")
pq.write_table(val_table, "val.parquet")
pq.write_table(test_table, "test.parquet")
# 2. Upload your Converted Data
init(api_key="api_key", api_url="api_url")
set_project(Project(id="HarId", name="Har Tutorial"))
directory = upload(
directory_name="har_uploaded",
train_set_path="train.parquet",
test_set_path="test.parquet",
val_set_path="val.parquet",
)
# 4. Find a schema
analyzed_schema = AnalyzedSchema(
Metadata(name="index_xp_deep"),
MultivariateAsynchronousTimeSerie(
channel_names=[
"body_acc_x",
"body_acc_y",
"body_acc_z",
"body_gyro_x",
"body_gyro_y",
"body_gyro_z",
"total_acc_x",
"total_acc_y",
"total_acc_z",
],
name="human_activity",
size=[128, 9],
preprocessor=ScaleHAR(input_size=(128, 9)),
),
CategoricalFeature(
is_target=True,
name="activity",
categories=[],
preprocessor=SklearnPreprocessor(preprocess_function=OneHotEncoder(sparse_output=False)),
),
)
# Create a dataset from the analyzed schema.
analyzed_train_dataset = AnalyzedParquetDataset(
split_name="train",
identifier_name="my_local_dataset",
path=directory["train_set_path"],
analyzed_schema=analyzed_schema,
)
print(analyzed_schema)
# 5. Fit the schema
fit_train_dataset = analyzed_train_dataset.fit()
fit_test_dataset = FittedParquetDataset(
split_name="test",
identifier_name="my_local_dataset",
path=directory["test_set_path"],
fitted_schema=fit_train_dataset.fitted_schema,
)
fit_val_dataset = FittedParquetDataset(
split_name="val",
identifier_name="my_local_dataset",
path=directory["val_set_path"],
fitted_schema=fit_train_dataset.fitted_schema,
)
# ##### Prepare the Model #######
# 1. Create the required torch models
input_size = fit_train_dataset.fitted_schema.input_size[1:]
target_size = fit_train_dataset.fitted_schema.target_size[1]
print(f"input_size: {input_size} - target_size: {target_size}")
backbone = HarCNN(
with_softmax=False, # No softmax for latent space
output_size=128,
)
feature_extraction = MLP(
norm_layer=None,
flatten_input=True,
dropout=0.2,
input_size=128,
hidden_channels=[64, 32],
)
task_learner = MLP(
norm_layer=None,
input_size=32,
hidden_channels=[target_size],
last_activation=partial(torch.nn.Softmax, dim=-1),
)
# 2. Explainable Model Specifications
model_specifications = ModelDecisionGraphParameters(
graph_depth=3,
discrimination_weight=0.1,
target_homogeneity_weight=2.0,
prune_step=11,
target_homogeneity_pruning_threshold=0.7,
population_pruning_threshold=0.05,
balancing_weight=1.0,
)
# 3. Create the Explainable Model
xpdeep_model = XpdeepModel.from_torch(
fitted_schema=fit_train_dataset.fitted_schema,
feature_extraction=feature_extraction,
task_learner=task_learner,
backbone=backbone,
decision_graph_parameters=model_specifications,
)
# ##### Train #######
# Metrics to monitor the training.
metrics = DictMetrics(
multi_class_accuracy=partial(MulticlassAccuracy, num_classes=target_size),
multi_class_F1_score=partial(MulticlassF1Score, num_classes=target_size),
confusion_matrix=partial(MulticlassConfusionMatrix, normalize="all", num_classes=target_size),
)
callbacks = [
EarlyStopping(monitoring_metric="Total loss", mode="minimize", patience=5),
Scheduler(
pre_scheduler=partial(ReduceLROnPlateau, patience=3, mode="max"),
step_method="epoch",
monitoring_metric="global_multi_class_accuracy",
),
]
# Optimizer is a partial object as pytorch needs to give the model as optimizer parameter.
optimizer = partial(torch.optim.AdamW, lr=0.001)
trainer = Trainer(
loss=CrossEntropyLossFromProbabilities(reduction="none"),
optimizer=optimizer,
callbacks=callbacks,
start_epoch=0,
max_epochs=20,
metrics=metrics,
)
trained_model = trainer.train(
model=xpdeep_model,
train_set=fit_train_dataset,
validation_set=fit_val_dataset,
batch_size=128,
)
# ##### Explain #######
# 1. Build the Explainer
statistics = DictStats(
distribution_target=DistributionStat(on="target"), distribution_prediction=DistributionStat(on="prediction")
)
quality_metrics = [Sensitivity(), Infidelity()]
explainer = Explainer(
description_representativeness=1000, quality_metrics=quality_metrics, metrics=metrics, statistics=statistics
)
# 2. Model Functioning Explanations
model_explanations = explainer.global_explain(
trained_model,
train_set=fit_train_dataset,
test_set=fit_test_dataset,
validation_set=fit_val_dataset,
)
visualisation_link = model_explanations.visualisation_link
# 3. Inference and their Causal Explanations
my_filter = Filter("testing_filter", fit_test_dataset, row_indexes=list(range(100)))
causal_explanations = explainer.local_explain(trained_model, fit_test_dataset, my_filter)
visualisation_link = causal_explanations.visualisation_link
Tip
Here we reuse metrics
from the train stage for convenience, but they can be adapted to your needs !
2. Model Functioning Explanations#
Model Functioning Explanations are computed with the global_explain
method.
model_explanations = explainer.global_explain(trained_model,
train_set=fit_train_dataset,
test_set=fit_test_dataset,
validation_set=fit_val_dataset,
)
visualisation_link = model_explanations.visualisation_link
👀 Full file preview
"""HAR workflow, classification, time series data."""
from functools import partial
from pathlib import Path
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import torch
from torch.optim.lr_scheduler import ReduceLROnPlateau
from models import HarCNN
from preprocessors import ScaleHAR
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from xpdeep import init, set_project
from xpdeep.dataset.parquet_dataset import AnalyzedParquetDataset, FittedParquetDataset
from xpdeep.dataset.schema.feature.feature import (
CategoricalFeature,
Metadata,
MultivariateAsynchronousTimeSerie,
)
from xpdeep.dataset.schema.preprocessor import SklearnPreprocessor
from xpdeep.dataset.schema.schema import AnalyzedSchema
from xpdeep.dataset.upload import upload
from xpdeep.explain.explainer import Explainer
from xpdeep.explain.quality_metrics import Infidelity, Sensitivity
from xpdeep.explain.statistic import DictStats, DistributionStat
from xpdeep.filtering.filter import Filter
from xpdeep.metrics.metric import DictMetrics
from xpdeep.metrics.zoo.multiclass_metrics import MulticlassConfusionMatrix, MulticlassAccuracy, MulticlassF1Score
from xpdeep.model.model_builder import ModelDecisionGraphParameters
from xpdeep.model.xpdeep_model import XpdeepModel
from xpdeep.model.zoo.cross_entropy_loss_from_proba import CrossEntropyLossFromProbabilities
from xpdeep.model.zoo.mlp import MLP
from xpdeep.project import Project
from xpdeep.trainer.callbacks import EarlyStopping, Scheduler
from xpdeep.trainer.trainer import Trainer
torch.random.manual_seed(5)
# ##### Prepare the Dataset #######
# 1. Split and Convert your Raw Data
features_dict = {}
split_name = "train"
for feature_filepath in sorted(Path(f"{split_name}/Inertial Signals/").rglob("*.txt")):
feature_name = feature_filepath.stem
features_dict[feature_name] = np.squeeze(
pd.read_csv(feature_filepath, sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
inputs = np.transpose(np.stack(list(features_dict.values()), axis=1), (0, 2, 1))
targets = np.squeeze(
pd.read_csv(f"{split_name}/y_{split_name}.txt", sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
# Map the target to their labels
activity_mapping = {
1: "Walking",
2: "Walking upstairs",
3: "Walking downstairs",
4: "Sitting",
5: "Standing",
6: "Laying",
}
targets_mapper = np.vectorize(lambda x: activity_mapping[x])
targets = targets_mapper(targets) # Map targets to their labels.
train_val_data = pd.DataFrame.from_dict({"human_activity": inputs.tolist(), "activity": targets})
# Split the data into training and validation sets
train_data, val_data = train_test_split(train_val_data, test_size=0.2, random_state=42)
features_dict = {}
split_name = "test"
for feature_filepath in sorted(Path(f"{split_name}/Inertial Signals/").rglob("*.txt")):
feature_name = feature_filepath.stem
features_dict[feature_name] = np.squeeze(
pd.read_csv(feature_filepath, sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
inputs = np.transpose(np.stack(list(features_dict.values()), axis=1), (0, 2, 1))
targets = np.squeeze(
pd.read_csv(f"{split_name}/y_{split_name}.txt", sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
targets = targets_mapper(targets) # Map targets to their labels.
test_data = pd.DataFrame.from_dict({"human_activity": inputs.tolist(), "activity": targets})
train_data["index_xp_deep"] = range(len(train_data["activity"]))
test_data["index_xp_deep"] = range(len(test_data["activity"]))
val_data["index_xp_deep"] = range(len(val_data["activity"]))
# Convert to pyarrow Table format
train_table = pa.Table.from_pandas(train_data, preserve_index=False)
val_table = pa.Table.from_pandas(val_data, preserve_index=False)
test_table = pa.Table.from_pandas(test_data, preserve_index=False)
# Save each split as ".parquet" file
pq.write_table(train_table, "train.parquet")
pq.write_table(val_table, "val.parquet")
pq.write_table(test_table, "test.parquet")
# 2. Upload your Converted Data
init(api_key="api_key", api_url="api_url")
set_project(Project(id="HarId", name="Har Tutorial"))
directory = upload(
directory_name="har_uploaded",
train_set_path="train.parquet",
test_set_path="test.parquet",
val_set_path="val.parquet",
)
# 4. Find a schema
analyzed_schema = AnalyzedSchema(
Metadata(name="index_xp_deep"),
MultivariateAsynchronousTimeSerie(
channel_names=[
"body_acc_x",
"body_acc_y",
"body_acc_z",
"body_gyro_x",
"body_gyro_y",
"body_gyro_z",
"total_acc_x",
"total_acc_y",
"total_acc_z",
],
name="human_activity",
size=[128, 9],
preprocessor=ScaleHAR(input_size=(128, 9)),
),
CategoricalFeature(
is_target=True,
name="activity",
categories=[],
preprocessor=SklearnPreprocessor(preprocess_function=OneHotEncoder(sparse_output=False)),
),
)
# Create a dataset from the analyzed schema.
analyzed_train_dataset = AnalyzedParquetDataset(
split_name="train",
identifier_name="my_local_dataset",
path=directory["train_set_path"],
analyzed_schema=analyzed_schema,
)
print(analyzed_schema)
# 5. Fit the schema
fit_train_dataset = analyzed_train_dataset.fit()
fit_test_dataset = FittedParquetDataset(
split_name="test",
identifier_name="my_local_dataset",
path=directory["test_set_path"],
fitted_schema=fit_train_dataset.fitted_schema,
)
fit_val_dataset = FittedParquetDataset(
split_name="val",
identifier_name="my_local_dataset",
path=directory["val_set_path"],
fitted_schema=fit_train_dataset.fitted_schema,
)
# ##### Prepare the Model #######
# 1. Create the required torch models
input_size = fit_train_dataset.fitted_schema.input_size[1:]
target_size = fit_train_dataset.fitted_schema.target_size[1]
print(f"input_size: {input_size} - target_size: {target_size}")
backbone = HarCNN(
with_softmax=False, # No softmax for latent space
output_size=128,
)
feature_extraction = MLP(
norm_layer=None,
flatten_input=True,
dropout=0.2,
input_size=128,
hidden_channels=[64, 32],
)
task_learner = MLP(
norm_layer=None,
input_size=32,
hidden_channels=[target_size],
last_activation=partial(torch.nn.Softmax, dim=-1),
)
# 2. Explainable Model Specifications
model_specifications = ModelDecisionGraphParameters(
graph_depth=3,
discrimination_weight=0.1,
target_homogeneity_weight=2.0,
prune_step=11,
target_homogeneity_pruning_threshold=0.7,
population_pruning_threshold=0.05,
balancing_weight=1.0,
)
# 3. Create the Explainable Model
xpdeep_model = XpdeepModel.from_torch(
fitted_schema=fit_train_dataset.fitted_schema,
feature_extraction=feature_extraction,
task_learner=task_learner,
backbone=backbone,
decision_graph_parameters=model_specifications,
)
# ##### Train #######
# Metrics to monitor the training.
metrics = DictMetrics(
multi_class_accuracy=partial(MulticlassAccuracy, num_classes=target_size),
multi_class_F1_score=partial(MulticlassF1Score, num_classes=target_size),
confusion_matrix=partial(MulticlassConfusionMatrix, normalize="all", num_classes=target_size),
)
callbacks = [
EarlyStopping(monitoring_metric="Total loss", mode="minimize", patience=5),
Scheduler(
pre_scheduler=partial(ReduceLROnPlateau, patience=3, mode="max"),
step_method="epoch",
monitoring_metric="global_multi_class_accuracy",
),
]
# Optimizer is a partial object as pytorch needs to give the model as optimizer parameter.
optimizer = partial(torch.optim.AdamW, lr=0.001)
trainer = Trainer(
loss=CrossEntropyLossFromProbabilities(reduction="none"),
optimizer=optimizer,
callbacks=callbacks,
start_epoch=0,
max_epochs=20,
metrics=metrics,
)
trained_model = trainer.train(
model=xpdeep_model,
train_set=fit_train_dataset,
validation_set=fit_val_dataset,