Skip to content

parquet_dataset

Parquet datasets, to be used with raw data under the ".parquet" format.

Classes:

Name Description
BaseParquetDataset

Base Parquet Dataset class.

ParquetDataset

Parquet Dataset class, to be analyzed via the AutoAnalyzer.

AnalyzedParquetDataset

Analyzed Parquet Dataset class to be created from an existing analyzed schema.

FittedParquetDataset

Fitted Parquet Dataset class to be created from an existing fitted schema.

BaseParquetDataset #

Base Parquet Dataset class.

Parameters:

Name Type Description Default

name #

str

Dataset name.

required

path #

str

The dataset parquet file path, which must be fsspec compatible.

required

storage_options #

dict[str, Any]

Optional storage options to stream data from a cloud storage instance.

required

Attributes:

Name Type Description
name str
path str
storage_options dict[str, object]

name: str #

path: str #

storage_options: dict[str, object] = field(factory=dict) #

ParquetDataset #

Parquet Dataset class, to be analyzed via the AutoAnalyzer.

Methods:

Name Description
analyze

Analyze the dataset and create an Analyzed Schema.

analyze(*forced_type: BaseFeature | ExplainableFeature, target_names: list[str] | None = None) -> AnalyzedParquetDataset #

Analyze the dataset and create an Analyzed Schema.

Parameters:

Name Type Description Default
forced_type #
Feature

Features objects to force custom feature type for specific column names in the Arrow Table.

()
target_names #
list[str] | None

Optional list of column names indicating which columns should be considered targets. Default None.

None

Returns:

Type Description
AnalyzedParquetDataset

The analyzed dataset, a parquet dataset with an analyzed schema attached.

Source code in src/xpdeep/dataset/parquet_dataset.py
@initialized_client_verification
@initialized_project_verification
def analyze(
    self, *forced_type: BaseFeature | ExplainableFeature, target_names: list[str] | None = None
) -> "AnalyzedParquetDataset":
    """Analyze the dataset and create an Analyzed Schema.

    Parameters
    ----------
    forced_type : Feature
        Features objects to force custom feature type for specific column names in the Arrow Table.
    target_names : list[str] | None
        Optional list of column names indicating which columns should be considered targets. Default None.

    Returns
    -------
    AnalyzedParquetDataset
        The analyzed dataset, a parquet dataset with an analyzed schema attached.
    """
    client_factory = ClientFactory.CURRENT.get()

    forced_types_as_dict = {
        value.name: (
            base64.encodebytes(NumpyMsgpackEncoder().encode(value.as_exposed(with_augmentation=False))).decode(
                "utf-8"
            )
        )
        if isinstance(value, ExplainableFeature)
        else (base64.encodebytes(NumpyMsgpackEncoder().encode(value.as_exposed())).decode("utf-8"))
        for value in forced_type
    }

    with client_factory() as client:
        analyzed_schema_job = handle_api_validation_errors(
            create_analyzed_schema.sync(
                Project.CURRENT.get().model.id,
                body=ParquetDatasetAnalyzeRequestBody(
                    parquet_dataset=self.__as_request_body,
                    forced_type=ParquetDatasetAnalyzeRequestBodyForcedTypeType0.from_dict(forced_types_as_dict)
                    if len(forced_types_as_dict) > 0
                    else None,
                    target_names=target_names,
                ),
                client=client,
            ),
        )

        analyzed_schema = get_schema_analysis_job_result(analyzed_schema_job.id)

        for feature in forced_type:
            if (
                isinstance(feature, ExplainableFeature)
                and feature.feature_augmentation is not None
                and feature.name in [feature_.name for feature_ in analyzed_schema.columns]
            ):
                cast(
                    "ExplainableFeature", analyzed_schema[feature.name]
                ).feature_augmentation = feature.feature_augmentation

        return AnalyzedParquetDataset(
            name=self.name,
            path=self.path,
            storage_options=self.storage_options,
            analyzed_schema=analyzed_schema,
        )

AnalyzedParquetDataset #

Analyzed Parquet Dataset class to be created from an existing analyzed schema.

Parameters:

Name Type Description Default

analyzed_schema #

AnalyzedSchema
required

Methods:

Name Description
fit

Create a Fitted Parquet Dataset object.

Attributes:

Name Type Description
analyzed_schema AnalyzedSchema

analyzed_schema: AnalyzedSchema = field(kw_only=True) #

fit() -> FittedParquetDataset #

Create a Fitted Parquet Dataset object.

Source code in src/xpdeep/dataset/parquet_dataset.py
@initialized_client_verification
@initialized_project_verification
def fit(self) -> "FittedParquetDataset":
    """Create a Fitted Parquet Dataset object."""
    for column in self.analyzed_schema.as_exposed.columns:
        if type(column) is ExposedBaseFeature:
            message = "Base features are not accepted for fitting schemas."
            raise TypeError(message) from None

    client_factory = ClientFactory.CURRENT.get()

    with client_factory() as client:
        fitted_schema_job = handle_api_validation_errors(
            create_fitted_schema.sync(
                project_id=Project.CURRENT.get().model.id,
                client=client,
                body=FitSchemaRequestBody(
                    self._as_request_body,
                    base64.encodebytes(NumpyMsgpackEncoder().encode(self.analyzed_schema.as_exposed)).decode(
                        "utf-8"
                    ),
                ),
            ),
        )
    fitted_schema = get_schema_fitting_job_result(fitted_schema_job.id)

    def keep_augmentation() -> None:
        """Inject augmentation from analyzed schema features in new generated fitted schema features."""
        features = [feature for feature in self.analyzed_schema.columns if not isinstance(feature, BaseFeature)]
        for feature in features:
            if (
                isinstance(feature, ExplainableFeature)
                and feature.feature_augmentation is not None
                and feature.name in [feature_.name for feature_ in fitted_schema.columns]
            ):
                cast(
                    "ExplainableFeature", fitted_schema[feature.name]
                ).feature_augmentation = feature.feature_augmentation

    keep_augmentation()

    return FittedParquetDataset(
        name=self.name, path=self.path, storage_options=self.storage_options, fitted_schema=fitted_schema
    )

FittedParquetDataset #

Fitted Parquet Dataset class to be created from an existing fitted schema.

Parameters:

Name Type Description Default

fitted_schema #

FittedSchema
required

Attributes:

Name Type Description
fitted_schema FittedSchema
artifact_id str

Get artifact id.

fitted_schema: FittedSchema = field(kw_only=True) #

artifact_id: str #

Get artifact id.