Get schema analysis job results.
Source code in src/xpdeep/utils/jobs/schema_fitting.py
| @initialized_client_verification
@initialized_project_verification
def get_job_result(job_id: UUID) -> FittedSchema:
"""Get schema analysis job results."""
try:
logging.print_job_progress(job_id)
except KeyboardInterrupt:
handle_api_validation_errors(
cancel_job.sync(Project.CURRENT.get().model.id, job_id, client=ClientFactory.CURRENT.get()())
)
raise
ongoing_job = handle_api_validation_errors(
get_one_job.sync(Project.CURRENT.get().model.id, job_id, client=ClientFactory.CURRENT.get()()),
)
if ongoing_job.status == JobStatus.ERROR:
remote_error = cast(JobModelResultsType0, ongoing_job.results)
error_message = f"{remote_error['error_class']} : {remote_error['error_detail']}"
raise ApiError(error_message)
if ongoing_job.results is None:
message = "Job failed"
raise ApiError(message)
if ongoing_job.type_ != JobType.FIT_SCHEMA:
message = f"Unexpected Job type {ongoing_job.type_}."
raise ApiError(message)
return FittedSchema.from_bytes(base64.decodebytes(ongoing_job.results["schema"].encode("utf-8")))
|