Skip to content

parquet_dataset

Parquet datasets, to be used with raw data under the ".parquet" format.

Classes:

Name Description
BaseParquetDataset

Base Parquet Dataset class.

ParquetDataset

Parquet Dataset class, to be analyzed via the AutoAnalyzer.

AnalyzedParquetDataset

Analyzed Parquet Dataset class to be created from an existing analyzed schema.

FittedParquetDataset

Fitted Parquet Dataset class to be created from an existing fitted schema.

Attributes:

Name Type Description
logger

logger = logging.getLogger(__name__) #

BaseParquetDataset #

Base Parquet Dataset class.

Parameters:

Name Type Description Default

name #

str

Dataset name.

required

path #

str

The dataset parquet file path, which must be fsspec compatible.

required

storage_options #

dict[str, Any]

Optional storage options to stream data from a cloud storage instance.

<class 'dict'>

num_rows #

int | None

Size of the parquet dataset, if None it will be automatically inferred before database insertion which may take time on large files.

None

Methods:

Name Description
get_num_rows

Get the number of rows in the parquet file.

Attributes:

Name Type Description
name str
path str
storage_options dict[str, object]
num_rows int | None

name: str #

path: str #

storage_options: dict[str, object] = field(factory=dict) #

num_rows: int | None = None #

get_num_rows() -> int #

Get the number of rows in the parquet file.

Source code in src/xpdeep/dataset/parquet_dataset.py
def get_num_rows(self) -> int:
    """Get the number of rows in the parquet file."""
    if self.num_rows is not None:
        return self.num_rows

    try:

        def _read_metadata() -> int:
            fs = PyFileSystem(FSSpecHandler(url_to_fs(self.path, **self.storage_options)[0]))
            parquet_file = pq.ParquetFile(self.path, filesystem=fs)
            if not isinstance(parquet_file.metadata.num_rows, int):
                msg = "Number of rows not found in parquet metadata."
                raise TypeError(msg)
            return parquet_file.metadata.num_rows

        with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
            future = executor.submit(_read_metadata)
            try:
                return future.result(timeout=10)
            except concurrent.futures.TimeoutError as e:
                msg = (
                    f"Parquet size could not be estimated in less than 10s, "
                    f"please set the the number of rows when instantiating your {self.__class__.__name__})"
                )
                raise TimeoutError(msg) from e
    except:
        logger.exception("Failed to automatically estimate parquet size")
        raise

ParquetDataset #

Parquet Dataset class, to be analyzed via the AutoAnalyzer.

Parameters:

Name Type Description Default

name #

str
required

path #

str
required

storage_options #

dict[str, object]

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

<class 'dict'>

num_rows #

int | None
None

Methods:

Name Description
analyze

Analyze the dataset and create an Analyzed Schema.

analyze(*forced_type: ExplainableFeature | IndexMetadata, target_names: list[str] | None = None) -> AnalyzedParquetDataset #

Analyze the dataset and create an Analyzed Schema.

Parameters:

Name Type Description Default
forced_type #
Feature

Features objects to force custom feature type for specific column names in the Arrow Table.

()
target_names #
list[str] | None

Optional list of column names indicating which columns should be considered targets. Default None.

None

Returns:

Type Description
AnalyzedParquetDataset

The analyzed dataset, a parquet dataset with an analyzed schema attached.

Source code in src/xpdeep/dataset/parquet_dataset.py
@initialized_client_verification
@initialized_project_verification
def analyze(
    self, *forced_type: ExplainableFeature | IndexMetadata, target_names: list[str] | None = None
) -> "AnalyzedParquetDataset":
    """Analyze the dataset and create an Analyzed Schema.

    Parameters
    ----------
    forced_type : Feature
        Features objects to force custom feature type for specific column names in the Arrow Table.
    target_names : list[str] | None
        Optional list of column names indicating which columns should be considered targets. Default None.

    Returns
    -------
    AnalyzedParquetDataset
        The analyzed dataset, a parquet dataset with an analyzed schema attached.
    """
    client_factory = ClientFactory.CURRENT.get()

    forced_types_as_dict = {value.name: (value.to_model().to_dict()) for value in forced_type}

    with client_factory() as client:
        analyzed_schema_pipeline = handle_api_validation_errors(
            launch_pipeline.sync(
                project_id=Project.CURRENT.get().model.id,
                client=client,
                body=AutoAnalyzerPipelineInput(
                    type_="AUTO_ANALYZER",
                    dataset=self._to_read_dataset_option(),
                    forced_type=AutoAnalyzerPipelineInputForcedType.from_dict(forced_types_as_dict),
                    target_names=target_names if target_names is not None else UNSET,
                ),
            )
        )

        analyzed_schema = schema_analysis.get_pipeline_result(analyzed_schema_pipeline.id)

    return AnalyzedParquetDataset(
        name=self.name,
        path=self.path,
        storage_options=self.storage_options,
        analyzed_schema=analyzed_schema,
    )

AnalyzedParquetDataset #

Analyzed Parquet Dataset class to be created from an existing analyzed schema.

Parameters:

Name Type Description Default

name #

str
required

path #

str
required

storage_options #

dict[str, object]

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

<class 'dict'>

num_rows #

int | None
None

analyzed_schema #

AnalyzedSchema
required

Methods:

Name Description
fit

Create a Fitted Parquet Dataset object.

Attributes:

Name Type Description
analyzed_schema AnalyzedSchema

analyzed_schema: AnalyzedSchema = field(kw_only=True) #

fit() -> FittedParquetDataset #

Create a Fitted Parquet Dataset object.

Source code in src/xpdeep/dataset/parquet_dataset.py
@initialized_client_verification
@initialized_project_verification
def fit(self) -> "FittedParquetDataset":
    """Create a Fitted Parquet Dataset object."""
    client_factory = ClientFactory.CURRENT.get()

    with client_factory() as client:
        fitted_schema_pipeline = launch_pipeline.sync(
            project_id=Project.CURRENT.get().model.id,
            client=client,
            body=FitSchemaPipelineInput(
                type_="FIT_SCHEMA",
                schema=self.analyzed_schema.as_fit_schema_pipeline_input_schema,
                dataset=self._to_read_dataset_option(),
            ),
        )

        pipeline_result = handle_api_validation_errors(fitted_schema_pipeline)
        fitted_schema = schema_fitting.get_pipeline_result(pipeline_result.id)

    return FittedParquetDataset(
        name=self.name, path=self.path, storage_options=self.storage_options, fitted_schema=fitted_schema
    )

FittedParquetDataset #

Fitted Parquet Dataset class to be created from an existing fitted schema.

Parameters:

Name Type Description Default

name #

str
required

path #

str
required

storage_options #

dict[str, object]

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

<class 'dict'>

num_rows #

int | None
None

fitted_schema #

FittedSchema
required

size #

int | None
None

Methods:

Name Description
to_model

Convert to ParquetDatasetArtifactInsert instance.

artifact_id

Get artifact id if not set yet.

Attributes:

Name Type Description
fitted_schema FittedSchema
size int | None

fitted_schema: FittedSchema = field(kw_only=True) #

size: int | None = None #

to_model() -> ParquetDatasetArtifactInsert #

Convert to ParquetDatasetArtifactInsert instance.

Source code in src/xpdeep/dataset/parquet_dataset.py
def to_model(self) -> ParquetDatasetArtifactInsert:
    """Convert to ParquetDatasetArtifactInsert instance."""
    return ParquetDatasetArtifactInsert(
        project_id=Project.CURRENT.get().model.id,
        hash_=self._build_hash(),
        name=self.name,
        core_version="0.0.0",
        split_name="split_name_should_not_exist_anymore",  # https://gitlab.xpdeep.com/xpdeep/xpdeep-database/-/issues/356
        # TODO(<meziane bellahmer>): https://gitlab.xpdeep.com/xpdeep/xpdeep-database/-/issues/356 remove split name
        size=self.get_num_rows(),
        schema=self.fitted_schema.to_model(),
        type_="PARQUET_DATASET",
        path=self.path,
        storage_options=ParquetDatasetArtifactInsertStorageOptions.from_dict(self.storage_options),
    )

artifact_id() -> str #

Get artifact id if not set yet.

Source code in src/xpdeep/dataset/parquet_dataset.py
def artifact_id(self) -> str:
    """Get artifact id if not set yet."""
    if self._artifact_id is not None:  # If already set, return it.
        return self._artifact_id

    dataset_artifact = self._insert_dataset_artifact()

    self._artifact_id = dataset_artifact.id
    self.fitted_schema = FittedSchema.from_model(dataset_artifact.schema.to_dict())

    return self._artifact_id