Skip to content

parquet_dataset

Parquet datasets, to be used with raw data under the ".parquet" format.

Classes:

Name Description
BaseParquetDataset

Base Parquet Dataset class.

ParquetDataset

Parquet Dataset class, to be analyzed via the AutoAnalyzer.

AnalyzedParquetDataset

Analyzed Parquet Dataset class to be created from an existing analyzed schema.

FittedParquetDataset

Fitted Parquet Dataset class to be created from an existing fitted schema.

BaseParquetDataset #

Base Parquet Dataset class.

Parameters:

Name Type Description Default

name #

str

Dataset name.

required

path #

str

The dataset parquet file path, which must be fsspec compatible.

required

storage_options #

dict[str, Any]

Optional storage options to stream data from a cloud storage instance.

required

Attributes:

Name Type Description
name str
path str
storage_options dict[str, object]

name: str #

path: str #

storage_options: dict[str, object] = field(factory=dict) #

ParquetDataset #

Parquet Dataset class, to be analyzed via the AutoAnalyzer.

Parameters:

Name Type Description Default

name #

str
required

path #

str
required

Methods:

Name Description
analyze

Analyze the dataset and create an Analyzed Schema.

analyze(*forced_type: ExplainableFeature | IndexMetadata, target_names: list[str] | None = None) -> AnalyzedParquetDataset #

Analyze the dataset and create an Analyzed Schema.

Parameters:

Name Type Description Default
forced_type #
Feature

Features objects to force custom feature type for specific column names in the Arrow Table.

()
target_names #
list[str] | None

Optional list of column names indicating which columns should be considered targets. Default None.

None

Returns:

Type Description
AnalyzedParquetDataset

The analyzed dataset, a parquet dataset with an analyzed schema attached.

Source code in src/xpdeep/dataset/parquet_dataset.py
@initialized_client_verification
@initialized_project_verification
def analyze(
    self, *forced_type: ExplainableFeature | IndexMetadata, target_names: list[str] | None = None
) -> "AnalyzedParquetDataset":
    """Analyze the dataset and create an Analyzed Schema.

    Parameters
    ----------
    forced_type : Feature
        Features objects to force custom feature type for specific column names in the Arrow Table.
    target_names : list[str] | None
        Optional list of column names indicating which columns should be considered targets. Default None.

    Returns
    -------
    AnalyzedParquetDataset
        The analyzed dataset, a parquet dataset with an analyzed schema attached.
    """
    client_factory = ClientFactory.CURRENT.get()

    forced_types_as_dict = {value.name: (value.to_model().to_dict()) for value in forced_type}

    with client_factory() as client:
        analyzed_schema_pipeline = handle_api_validation_errors(
            launch_pipeline.sync(
                project_id=Project.CURRENT.get().model.id,
                client=client,
                body=AutoAnalyzerPipelineInput(
                    type_="AUTO_ANALYZER",
                    dataset=self._to_read_dataset_option(),
                    forced_type=AutoAnalyzerPipelineInputForcedType.from_dict(forced_types_as_dict),
                    target_names=target_names if target_names is not None else UNSET,
                ),
            )
        )

        analyzed_schema = schema_analysis.get_pipeline_result(analyzed_schema_pipeline.id)

    return AnalyzedParquetDataset(
        name=self.name,
        path=self.path,
        storage_options=self.storage_options,
        analyzed_schema=analyzed_schema,
    )

AnalyzedParquetDataset #

Analyzed Parquet Dataset class to be created from an existing analyzed schema.

Parameters:

Name Type Description Default

name #

str
required

path #

str
required

analyzed_schema #

AnalyzedSchema
required

Methods:

Name Description
fit

Create a Fitted Parquet Dataset object.

Attributes:

Name Type Description
analyzed_schema AnalyzedSchema

analyzed_schema: AnalyzedSchema = field(kw_only=True) #

fit() -> FittedParquetDataset #

Create a Fitted Parquet Dataset object.

Source code in src/xpdeep/dataset/parquet_dataset.py
@initialized_client_verification
@initialized_project_verification
def fit(self) -> "FittedParquetDataset":
    """Create a Fitted Parquet Dataset object."""
    client_factory = ClientFactory.CURRENT.get()

    with client_factory() as client:
        fitted_schema_pipeline = launch_pipeline.sync(
            project_id=Project.CURRENT.get().model.id,
            client=client,
            body=FitSchemaPipelineInput(
                type_="FIT_SCHEMA",
                schema=self.analyzed_schema._as_fit_schema_pipeline_input_schema,  # noqa : SLF001
                dataset=self._to_read_dataset_option(),
            ),
        )

        pipeline_result = handle_api_validation_errors(fitted_schema_pipeline)
        fitted_schema = schema_fitting.get_pipeline_result(pipeline_result.id)

    return FittedParquetDataset(
        name=self.name, path=self.path, storage_options=self.storage_options, fitted_schema=fitted_schema
    )

FittedParquetDataset #

Fitted Parquet Dataset class to be created from an existing fitted schema.

Parameters:

Name Type Description Default

name #

str
required

path #

str
required

fitted_schema #

FittedSchema
required

Methods:

Name Description
to_model

Convert to ParquetDatasetArtifactInsert instance.

artifact_id

Get artifact id if not set yet.

Attributes:

Name Type Description
fitted_schema FittedSchema

fitted_schema: FittedSchema = field(kw_only=True) #

to_model() -> ParquetDatasetArtifactInsert #

Convert to ParquetDatasetArtifactInsert instance.

Source code in src/xpdeep/dataset/parquet_dataset.py
def to_model(self) -> ParquetDatasetArtifactInsert:
    """Convert to ParquetDatasetArtifactInsert instance."""
    return ParquetDatasetArtifactInsert(
        project_id=Project.CURRENT.get().model.id,
        hash_=self._build_hash(),
        name=self.name,
        core_version="0.0.0",
        split_name="split_name_should_not_exist_anymore",  # https://gitlab.xpdeep.com/xpdeep/xpdeep-database/-/issues/356
        # TODO(<meziane bellahmer>): https://gitlab.xpdeep.com/xpdeep/xpdeep-database/-/issues/356 remove split name
        size=1048576,
        schema=self.fitted_schema.to_model(),
        type_="PARQUET_DATASET",
        path=self.path,
        storage_options=ParquetDatasetArtifactInsertStorageOptions.from_dict(self.storage_options),
    )

artifact_id() -> str #

Get artifact id if not set yet.

Source code in src/xpdeep/dataset/parquet_dataset.py
def artifact_id(self) -> str:
    """Get artifact id if not set yet."""
    if self._artifact_id is not None:  # If already set, return it.
        return self._artifact_id

    dataset_artifact = self._insert_dataset_artifact()

    self._artifact_id = dataset_artifact.id
    self.fitted_schema = FittedSchema.from_model(dataset_artifact.schema.to_dict())

    return self._artifact_id