'Read the output of a kubeflow pipeline programatically

I run pipelines on Kubeflow with a python command like:

client.create_run_from_pipeline_func(pipeline_function, arguments=params_dict[name], run_name=name)

It creates a job on Kubeflow pipelines and I would like to be able to access the information about the different steps of the pipeline with a python API.

job.get({step_name}).get_custom_properties({property_name})

I can do that by opening the run in Kubeflow and looking at the custom properties of the step of the pipeline I am interested into but I would like to automate this process. Do you know if it is possible to do that with a python API?



Solution 1:[1]

I used this class to extract the parameters from a kubeflow run:

import json
from typing import Dict
from typing import List

from kfp_server_api.models.api_run_detail import ApiRunDetail


class PipelineResult:

    DATASET_PATH_NAME = "data-load-features-DATA_SET_PATH"

    def __init__(self, run_description: ApiRunDetail):
        self._run_description = run_description

    @property
    def workflow_manifest(self) -> dict:
        return json.loads(self._run_description.pipeline_runtime.workflow_manifest)

    @property
    def status(self) -> str:
        return self.workflow_manifest["status"]["phase"]

    @property
    def params(self) -> List[Dict]:
        params_list = []
        for k, v in self.workflow_manifest["status"]["nodes"].items():

            for params in v.get("inputs", {}).get("parameters", []):
                params_list.append({"node_name": k, **params})
        return params_list

    def get_param(self, param_name: str):
        for el in self.params:
            if el["name"] == param_name:
                return el["value"]

    @property
    def training_set_path(self):
        return self.get_param(self.DATASET_PATH_NAME)

    @property
    def run_name(self):
        return self.workflow_manifest["metadata"]["annotations"]["pipelines.kubeflow.org/run_name"]

    def as_dict(self):
        return {
            "status": self.status,
            "training_set_path": self.training_set_path,
            "run_name": self.run_name,
        }


client = kfp.Client()
api_response = client.list_runs(namespace='...', sort_by='created_at desc', page_size=30)
runs_descriptions = [client.get_run(run.id) for run in api_response.runs]
runs = pd.DataFrame([PipelineResult(el).as_dict() for el in runs_descriptions])

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Robin Nicole