Get schema analysis job results.
Source code in src/xpdeep/utils/pipelines/schema_fitting.py
| @initialized_client_verification
@initialized_project_verification
@retry_on_exception((httpx.RemoteProtocolError, urllib3.exceptions.ProtocolError), max_retries=10)
def get_pipeline_result(pipeline_id: str) -> FittedSchema:
"""Get schema analysis job results."""
with (
ClientFactory.CURRENT.get()() as client,
connect_sse(
client.get_httpx_client(), "GET", f"/{Project.CURRENT.get().model.id}/pipeline/{pipeline_id}/progress"
) as event_source,
):
try:
for event in event_source.iter_sse():
if "console_output" in json.loads(event.data):
print(json.loads(event.data)["console_output"], end="") # noqa : T201
elif "error" in json.loads(event.data):
raise ApiError(json.loads(event.data)["error"])
else:
return FittedSchema.from_model(json.loads(event.data))
except KeyboardInterrupt:
handle_api_validation_errors(
cancel_pipeline.sync(Project.CURRENT.get().model.id, pipeline_id, client=ClientFactory.CURRENT.get()())
)
raise
msg = f"Unexpected error during pipeline `{pipeline_id}` execution"
raise ApiError(msg)
|