Skip to content

schema_analysis

Schema analysis pipelines utilities.

Functions:

Name Description
get_pipeline_result

Get schema analysis job results.

get_pipeline_result(pipeline_id: str) -> AnalyzedSchema #

Get schema analysis job results.

Source code in src/xpdeep/utils/pipelines/schema_analysis.py
@initialized_client_verification
@initialized_project_verification
@retry_on_exception((httpx.RemoteProtocolError, urllib3.exceptions.ProtocolError), max_retries=10)
def get_pipeline_result(pipeline_id: str) -> AnalyzedSchema:
    """Get schema analysis job results."""
    with (
        ClientFactory.CURRENT.get()() as client,
        connect_sse(
            client.get_httpx_client(), "GET", f"/{Project.CURRENT.get().model.id}/pipeline/{pipeline_id}/progress"
        ) as event_source,
    ):
        try:
            for event in event_source.iter_sse():
                if "console_output" in json.loads(event.data):
                    print(json.loads(event.data)["console_output"], end="")  # noqa : T201
                elif "error" in json.loads(event.data):
                    raise ApiError(json.loads(event.data)["error"])
                else:
                    return AnalyzedSchema.from_model(json.loads(event.data))
        except KeyboardInterrupt:
            handle_api_validation_errors(
                cancel_pipeline.sync(Project.CURRENT.get().model.id, pipeline_id, client=ClientFactory.CURRENT.get()())
            )
            raise

        msg = f"Unexpected error during pipeline `{pipeline_id}` execution"
        raise ApiError(msg)