Skip to content

schema_analysis

Schema analysis job utilities.

Functions:

Name Description
get_job_result

Get schema analysis job results.

get_job_result(job_id: UUID) -> AnalyzedSchema #

Get schema analysis job results.

Source code in src/xpdeep/utils/jobs/schema_analysis.py
@initialized_client_verification
@initialized_project_verification
def get_job_result(job_id: UUID) -> AnalyzedSchema:
    """Get schema analysis job results."""
    try:
        logging.print_job_progress(job_id)
    except KeyboardInterrupt:
        handle_api_validation_errors(
            cancel_job.sync(Project.CURRENT.get().model.id, job_id, client=ClientFactory.CURRENT.get()())
        )
        raise

    ongoing_job = handle_api_validation_errors(
        get_one_job.sync(Project.CURRENT.get().model.id, job_id, client=ClientFactory.CURRENT.get()()),
    )

    if ongoing_job.status == JobStatus.ERROR:
        remote_error = cast(JobModelResultsType0, ongoing_job.results)
        error_message = f"{remote_error['error_class']} : {remote_error['error_detail']}"
        raise ApiError(error_message)

    if ongoing_job.results is None:
        message = "Job failed"
        raise ApiError(message)

    if ongoing_job.type_ != JobType.ANALYZE_PARQUET_DATASET:
        message = f"Unexpected Job type {ongoing_job.type_}."
        raise ApiError(message)

    return AnalyzedSchema.from_bytes(base64.decodebytes(ongoing_job.results["schema"].encode(encoding="utf-8")))