Skip to content

parquet_dataset

Parquet datasets, to be used with raw data under the ".parquet" format.

Classes:

Name Description
BaseParquetDataset

Base Parquet Dataset class.

ParquetDataset

Parquet Dataset class, to be analyzed via the AutoAnalyzer.

AnalyzedParquetDataset

Analyzed Parquet Dataset class to be created from an existing analyzed schema.

FittedParquetDataset

Fitted Parquet Dataset class to be created from an existing fitted schema.

Attributes:

Name Type Description
logger

logger = logging.getLogger(__name__) #

BaseParquetDataset #

Base Parquet Dataset class.

Parameters:

Name Type Description Default

name #

str

Dataset name.

required

path #

str

The dataset parquet file path, which must be fsspec compatible.

required

storage_options #

dict[str, Any]

Optional storage options to stream data from a cloud storage instance.

<class 'dict'>

num_rows #

int | None

Size of the parquet dataset, if None it will be automatically inferred before database insertion which may take time on large files.

None

Methods:

Name Description
get_num_rows

Get the number of rows in the parquet file.

Attributes:

Name Type Description
name str
path str
storage_options dict[str, object]
num_rows int | None

name: str #

path: str #

storage_options: dict[str, object] = field(factory=dict) #

num_rows: int | None = None #

get_num_rows() -> int #

Get the number of rows in the parquet file.

Source code in src/xpdeep/dataset/parquet_dataset.py
def get_num_rows(self) -> int:
    """Get the number of rows in the parquet file."""
    if self.num_rows is not None:
        return self.num_rows

    try:

        def _read_metadata() -> int:
            fs = PyFileSystem(FSSpecHandler(url_to_fs(self.path, **self.storage_options)[0]))
            parquet_file = pq.ParquetFile(self.path, filesystem=fs)
            if not isinstance(parquet_file.metadata.num_rows, int):
                msg = "Number of rows not found in parquet metadata."
                raise TypeError(msg)
            return parquet_file.metadata.num_rows

        with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
            future = executor.submit(_read_metadata)
            try:
                return future.result(timeout=10)
            except concurrent.futures.TimeoutError as e:
                msg = (
                    f"Parquet size could not be estimated in less than 10s, "
                    f"please set the the number of rows when instantiating your {self.__class__.__name__})"
                )
                raise TimeoutError(msg) from e
    except:
        logger.exception("Failed to automatically estimate parquet size")
        raise

ParquetDataset #

Parquet Dataset class, to be analyzed via the AutoAnalyzer.

Parameters:

Name Type Description Default

name #

str
required

path #

str
required

storage_options #

dict[str, object]

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

<class 'dict'>

num_rows #

int | None
None

Methods:

Name Description
analyze

Analyze the dataset and create an Analyzed Schema.

analyze(*forced_type: ExplainableFeature | IndexMetadata, target_names: list[str] | None = None) -> AnalyzedParquetDataset #

Analyze the dataset and create an Analyzed Schema.

Parameters:

Name Type Description Default
forced_type #
Feature

Features objects to force custom feature type for specific column names in the Arrow Table.

()
target_names #
list[str] | None

Optional list of column names indicating which columns should be considered targets. Default None.

None

Returns:

Type Description
AnalyzedParquetDataset

The analyzed dataset, a parquet dataset with an analyzed schema attached.

Source code in src/xpdeep/dataset/parquet_dataset.py
@initialized_client_verification
@initialized_project_verification
def analyze(
    self, *forced_type: ExplainableFeature | IndexMetadata, target_names: list[str] | None = None
) -> "AnalyzedParquetDataset":
    """Analyze the dataset and create an Analyzed Schema.

    Parameters
    ----------
    forced_type : Feature
        Features objects to force custom feature type for specific column names in the Arrow Table.
    target_names : list[str] | None
        Optional list of column names indicating which columns should be considered targets. Default None.

    Returns
    -------
    AnalyzedParquetDataset
        The analyzed dataset, a parquet dataset with an analyzed schema attached.
    """
    client_factory = ClientFactory.CURRENT.get()

    forced_types_as_dict = {value.name: (value.to_model().to_dict()) for value in forced_type}

    with client_factory() as client:
        analyzed_schema_pipeline = handle_api_validation_errors(
            launch_pipeline.sync(
                project_id=Project.CURRENT.get().model.id,
                client=client,
                body=AutoAnalyzerPipelineInput(
                    type_="AUTO_ANALYZER",
                    dataset=self._to_read_dataset_option(),
                    forced_type=AutoAnalyzerPipelineInputForcedType.from_dict(forced_types_as_dict),
                    target_names=target_names if target_names is not None else UNSET,
                ),
            )
        )

        analyzed_schema = schema_analysis.get_pipeline_result(analyzed_schema_pipeline.id)

    return AnalyzedParquetDataset(
        name=self.name,
        path=self.path,
        storage_options=self.storage_options,
        analyzed_schema=analyzed_schema,
    )

AnalyzedParquetDataset #

Analyzed Parquet Dataset class to be created from an existing analyzed schema.

Parameters:

Name Type Description Default

name #

str
required

path #

str
required

storage_options #

dict[str, object]

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

<class 'dict'>

num_rows #

int | None
None

analyzed_schema #

AnalyzedSchema
required

Methods:

Name Description
fit

Create a Fitted Parquet Dataset object.

Attributes:

Name Type Description
analyzed_schema AnalyzedSchema

analyzed_schema: AnalyzedSchema = field(kw_only=True) #

fit() -> FittedParquetDataset #

Create a Fitted Parquet Dataset object.

Source code in src/xpdeep/dataset/parquet_dataset.py
@initialized_client_verification
@initialized_project_verification
def fit(self) -> "FittedParquetDataset":
    """Create a Fitted Parquet Dataset object."""
    client_factory = ClientFactory.CURRENT.get()

    with client_factory() as client:
        fitted_schema_pipeline = launch_pipeline.sync(
            project_id=Project.CURRENT.get().model.id,
            client=client,
            body=FitSchemaPipelineInput(
                type_="FIT_SCHEMA",
                schema=self.analyzed_schema.as_fit_schema_pipeline_input_schema,
                dataset=self._to_read_dataset_option(),
            ),
        )

        pipeline_result = handle_api_validation_errors(fitted_schema_pipeline)
        fitted_schema = schema_fitting.get_pipeline_result(pipeline_result.id)

    return FittedParquetDataset(
        name=self.name, path=self.path, storage_options=self.storage_options, fitted_schema=fitted_schema
    )

FittedParquetDataset #

Fitted Parquet Dataset class to be created from an existing fitted schema.

A fitted parquet dataset can be saved remotely using save method or by using it as a parameter for computing an explanation. Once it had been saved remotely, updates are not allowed anymore on it.

Parameters:

Name Type Description Default

name #

str
required

path #

str
required

storage_options #

dict[str, object]

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

<class 'dict'>

num_rows #

int | None
None

fitted_schema #

FittedSchema
required

size #

int | None
None

Methods:

Name Description
__setattr__

Set attribute.

stable_hash

Build a hash of the fitted parquet dataset.

get_last_parquet_modification_datetime_utc

Get the datetime of the last modification of a file or object. Assume timezone UTC.

to_model

Convert to ParquetDatasetArtifactInsert instance.

save

Save the Fitted Parquet Dataset remotely.

load_all

List all datasets of the current project.

get_by_id

Get fitted dataset by its ID.

get_by_name

Get fitted dataset by its name.

delete

Delete the current object remotely.

load_computed_statistics

Load all computed statistics on this dataset.

Attributes:

Name Type Description
fitted_schema FittedSchema
size int | None
id str

Get artifact id if the object exists remotely.

fitted_schema: FittedSchema = field(kw_only=True) #

size: int | None = None #

id: str #

Get artifact id if the object exists remotely.

__setattr__(attr: str, value: object) -> None #

Set attribute.

Source code in src/xpdeep/dataset/parquet_dataset.py
@initialized_client_verification
@initialized_project_verification
def __setattr__(self, attr: str, value: object) -> None:
    """Set attribute."""
    try:
        if hasattr(self, attr) and self.id:
            message = (
                "Updating FittedParquetDataset instance attributes is not allowed since the "
                "object is saved remotely. Consider creating a new one."
            )
            raise AttributeError(message)
    except NotSavedError:
        pass

    object.__setattr__(self, attr, value)

stable_hash() -> str #

Build a hash of the fitted parquet dataset.

Source code in src/xpdeep/dataset/parquet_dataset.py
def stable_hash(self) -> str:
    """Build a hash of the fitted parquet dataset."""
    return str(
        sha256(
            f"{self.fitted_schema.stable_hash()}_{self.name}_{self.size}_{self.path}_{self.storage_options}".encode()
        ).hexdigest()
    )

get_last_parquet_modification_datetime_utc() -> datetime | None #

Get the datetime of the last modification of a file or object. Assume timezone UTC.

Returns:

Type Description
datetime | None

Datetime of the last modification in UTC or None if not found.

Raises:

Type Description
FileNotFoundError

If the file or object does not exist.

Source code in src/xpdeep/dataset/parquet_dataset.py
def get_last_parquet_modification_datetime_utc(self) -> datetime | None:
    """
    Get the datetime of the last modification of a file or object. Assume timezone UTC.

    Returns
    -------
    datetime | None
        Datetime of the last modification in UTC or None if not found.

    Raises
    ------
    FileNotFoundError
        If the file or object does not exist.
    """
    with fsspec.open(self.path, **self.storage_options) as file:
        try:
            info = file.info()
            modified = info.get("LastModified")
            if modified is None:
                modified = info.get("mtime")

        except AttributeError:
            # On local file systems, "info" is not accessed the same way.
            modified = file.fs.info(self.path).get("mtime")
        except FileNotFoundError as e:
            msg = "The parquet file does not exist. Please create your file before inserting it in the database."
            raise FileNotFoundError(msg) from e

    if isinstance(modified, float):
        modified_dt = datetime.fromtimestamp(modified, tz=UTC)
    elif modified is not None:
        modified_dt = modified.astimezone(tz=UTC)
    else:
        modified_dt = None

    return modified_dt

to_model() -> ParquetDatasetArtifactInsert #

Convert to ParquetDatasetArtifactInsert instance.

Source code in src/xpdeep/dataset/parquet_dataset.py
def to_model(self) -> ParquetDatasetArtifactInsert:
    """Convert to ParquetDatasetArtifactInsert instance."""
    if not any(isinstance(feature, IndexMetadata) for feature in self.fitted_schema.columns):
        logger.warning(
            f"No index metadata found in dataset schema: {self.name}. Some functionalities will be limited."  # noqa:G004
            f"The Auto Analyzer recognizes columns named '__index_level_0__' or 'index_xp_deep' as IndexMetadata."
            f"You can also manually add IndexMetadata(name='my_custom_index_column') to your schema."
        )

    return ParquetDatasetArtifactInsert(
        project_id=Project.CURRENT.get().model.id,
        hash_=self.stable_hash(),
        name=self.name,
        core_version="0.0.0",
        size=self.get_num_rows(),
        schema=self.fitted_schema.to_model(),
        type_="PARQUET_DATASET",
        path=self.path,
        storage_options=ParquetDatasetArtifactInsertStorageOptions.from_dict(self.storage_options),
        last_file_modification=self.get_last_parquet_modification_datetime_utc(),
    )

save(*, force: bool = False) -> FittedParquetDataset #

Save the Fitted Parquet Dataset remotely.

Source code in src/xpdeep/dataset/parquet_dataset.py
@initialized_client_verification
@initialized_project_verification
def save(self, *, force: bool = False) -> "FittedParquetDataset":
    """Save the Fitted Parquet Dataset remotely."""
    if self._database_id is not None:
        try:
            next(iter(FittedParquetDataset._load_select_ones(dataset_id=self._database_id)))
            msg = "Your dataset was already saved."
            logger.warning(msg)
        except StopIteration:
            msg = (
                "Your FittedParquetDataset has an id but was not found remotely, "
                "ignoring current id and saving it again."
            )
            logger.warning(msg)
            self._set_database_id(None)
        else:
            return self
    try:  # Try to insert the dataset in DB
        parquet_dataset_artifact_select_one = handle_api_validation_errors(
            insert_parquet_dataset_artifact.sync(
                project_id=Project.CURRENT.get().model.id,
                client=ClientFactory.CURRENT.get()(),
                body=self.to_model(),
            ),
        )
    except UnexpectedStatus as err:
        # Parquet dataset artifact with the same name already exists in DB.
        if not force:
            remote_dataset = FittedParquetDataset.get_by_name(self.name)
            if remote_dataset.stable_hash() != self.stable_hash():
                message = (
                    f"The dataset: {self.name} already exists in database with a different content. "
                    f"Update `name` or use `force=True` to create a new remote object with a different name."
                )
                raise DuplicatedRemoteObjectError(message) from err
            # The remote dataset is the same as the current dataset
            self._set_database_id(remote_dataset.id)

        timestamp = datetime.now(tz=UTC)
        logger.warning(
            f"The dataset: {self.name} already exists in database. "  # noqa:G004
            f"Saving dataset under name : {self.name}_{timestamp}"
        )
        self.name += f"_{timestamp}"
        parquet_dataset_artifact_select_one = handle_api_validation_errors(
            insert_parquet_dataset_artifact.sync(
                project_id=Project.CURRENT.get().model.id,
                client=ClientFactory.CURRENT.get()(),
                body=self.to_model(),
            ),
        )

    # Mandatory in order to upgrade IDs of schema's features, required to build criteria.
    # Use object to bypass custom __setattr__
    self._set_database_id(parquet_dataset_artifact_select_one.id)
    object.__setattr__(  # noqa : PLC2801
        self, "fitted_schema", FittedSchema.from_model(parquet_dataset_artifact_select_one.schema.to_dict())
    )

    return self

load_all() -> list[FittedParquetDataset] #

List all datasets of the current project.

Source code in src/xpdeep/dataset/parquet_dataset.py
@classmethod
@initialized_client_verification
@initialized_project_verification
def load_all(cls) -> list["FittedParquetDataset"]:
    """List all datasets of the current project."""
    return cls._load()

get_by_id(dataset_id: str) -> FittedParquetDataset #

Get fitted dataset by its ID.

Parameters:

Name Type Description Default
dataset_id #
str

The ID of the dataset to retrieve.

required
Source code in src/xpdeep/dataset/parquet_dataset.py
@classmethod
@initialized_client_verification
@initialized_project_verification
def get_by_id(cls, dataset_id: str) -> "FittedParquetDataset":
    """Get fitted dataset by its ID.

    Parameters
    ----------
    dataset_id : str
        The ID of the dataset to retrieve.
    """
    try:
        return next(iter(cls._load(dataset_id=dataset_id)))
    except StopIteration as err:
        message = f"No fitted dataset found for dataset ID: {dataset_id}"
        raise NotSavedError(message) from err

get_by_name(dataset_name: str) -> FittedParquetDataset #

Get fitted dataset by its name.

Parameters:

Name Type Description Default
dataset_name #
str

The name of the dataset to retrieve.

required
Source code in src/xpdeep/dataset/parquet_dataset.py
@classmethod
@initialized_client_verification
@initialized_project_verification
def get_by_name(cls, dataset_name: str) -> "FittedParquetDataset":
    """Get fitted dataset by its name.

    Parameters
    ----------
    dataset_name : str
        The name of the dataset to retrieve.
    """
    try:
        return next(iter(cls._load(dataset_name=dataset_name)))
    except StopIteration as err:
        message = f"No fitted dataset found for dataset name: {dataset_name}"
        raise NotSavedError(message) from err

delete() -> None #

Delete the current object remotely.

Source code in src/xpdeep/dataset/parquet_dataset.py
@initialized_client_verification
@initialized_project_verification
def delete(self) -> None:
    """Delete the current object remotely."""
    try:
        with ClientFactory.CURRENT.get()() as client:
            delete_one_dataset_artifact.sync(
                project_id=Project.CURRENT.get().model.id,
                dataset_artifact_id=self.id,
                client=client,
            )
    except NotSavedError:
        message = "The current object does not exist remotely."
        warnings.warn(
            message,
            category=UserWarning,
            stacklevel=2,
        )

load_computed_statistics() -> list[ExplanationStatisticSelect] #

Load all computed statistics on this dataset.

Source code in src/xpdeep/dataset/parquet_dataset.py
@initialized_client_verification
@initialized_project_verification
def load_computed_statistics(self) -> list[ExplanationStatisticSelect]:
    """Load all computed statistics on this dataset."""
    with ClientFactory.CURRENT.get()() as client:
        return _get_all_objects_from_paginated_route(
            select_explanation_statistics.sync,
            project_id=Project.CURRENT.get().model.id,
            client=client,
            dataset_artifact_id=self.id,
        )