Skip to content

parquet_dataset

Parquet datasets, to be used with raw data under the ".parquet" format.

BaseParquetDataset #

Base Parquet Dataset class.

Parameters:

Name Type Description Default
split_name str

A split name, for example, "train", only used for the visualization XpViz.

required
identifier_name str

A key to group each dataset, only used in the visualization platform XpViz.

required
path str

The directory artifact path.

required
storage_options

Optional storage options to stream data from a cloud storage instance.

required

split_name: str #

identifier_name: str #

path: str #

storage_options: dict[str, object] = field(factory=dict) #

ParquetDataset #

Parquet Dataset class.

analyze(*forced_type: Feature, target_names: list[str] | None = None) -> AnalyzedParquetDataset #

Analyze the dataset and create an Analyzed Schema.

Parameters:

Name Type Description Default
forced_type Feature

Features objects to force custom feature type for specific column names in the Arrow Table.

()
target_names list[str] | None

Optional list of column names indicating which columns should be considered targets. Default None.

None

Returns:

Type Description
AnalyzedParquetDataset

The analyzed dataset, a parquet dataset with an analyzed schema attached.

Source code in src/xpdeep/dataset/parquet_dataset.py
@initialized_client_verification
@initialized_project_verification
def analyze(self, *forced_type: Feature, target_names: list[str] | None = None) -> "AnalyzedParquetDataset":
    """Analyze the dataset and create an Analyzed Schema.

    Parameters
    ----------
    forced_type: Feature
        Features objects to force custom feature type for specific column names in the Arrow Table.
    target_names: list[str] | None
        Optional list of column names indicating which columns should be considered targets. Default None.

    Returns
    -------
    AnalyzedParquetDataset
        The analyzed dataset, a parquet dataset with an analyzed schema attached.
    """
    client_factory = ClientFactory.CURRENT.get()

    forced_type = {
        value.name: (base64.encodebytes(NumpyMsgpackEncoder().encode(value.as_exposed)).decode("utf-8"))
        for value in forced_type
    }

    with client_factory() as client:
        response = create_analyzed_schema.sync_detailed(
            Project.CURRENT.get().model.id,
            body=ParquetDatasetAnalyzeRequestBody(
                parquet_dataset=self.as_request_body,
                forced_type=ParquetDatasetAnalyzeRequestBodyForcedTypeType0.from_dict(forced_type)
                if len(forced_type) > 0
                else None,
                target_names=target_names,
            ),
            client=client,
        )

        return AnalyzedParquetDataset(self, AnalyzedSchema.from_bytes(response.content))

AnalyzedParquetDataset(parquet_dataset: ParquetDataset, analyzed_schema: AnalyzedSchema) #

Analyzed Parquet Dataset class.

Source code in src/xpdeep/dataset/parquet_dataset.py
def __init__(self, parquet_dataset: ParquetDataset, analyzed_schema: AnalyzedSchema) -> None:
    super().__init__(**attr_asdict(parquet_dataset))
    self.analyzed_schema = analyzed_schema

analyzed_schema = analyzed_schema #

fit() -> FittedParquetDataset #

Create a Fitted Parquet Dataset object.

Source code in src/xpdeep/dataset/parquet_dataset.py
@initialized_client_verification
@initialized_project_verification
def fit(self) -> "FittedParquetDataset":
    """Create a Fitted Parquet Dataset object."""
    for column in self.analyzed_schema.as_exposed.columns:
        if type(column) is ExposedBaseFeature:
            message = "Base features are not accepted for fitting schemas."
            raise TypeError(message) from None

    client_factory = ClientFactory.CURRENT.get()

    with client_factory() as client:
        response = create_fitted_schema.sync_detailed(
            project_id=Project.CURRENT.get().model.id,
            client=client,
            body=FitSchemaRequestBody(
                self.as_request_body,
                base64.encodebytes(NumpyMsgpackEncoder().encode(self.analyzed_schema.as_exposed)).decode("utf-8"),
            ),
        )

    return FittedParquetDataset(
        self.split_name, self.identifier_name, self.path, fitted_schema=FittedSchema.from_bytes(response.content)
    )

FittedParquetDataset #

Fitted Parquet Dataset class.

Parameters:

Name Type Description Default
fitted_schema FittedSchema
required

fitted_schema: FittedSchema = field(kw_only=True) #

artifact_id: str #

Get artifact id.