'Kubeflow Pipelines v2 is giving Permission Denied on OutputPath

In Kubeflow Pipelines v2, running on EKS with default install, I'm getting a "permission denied" error.

It ran correctly in KFP v1.

time="2022-04-26T21:53:30.710Z" level=info msg="capturing logs" argo=true
I0426 21:53:30.745547      18 launcher.go:144] PipelineRoot defaults to "minio://mlpipeline/v2/artifacts".
I0426 21:53:30.745908      18 cache.go:120] Connecting to cache endpoint 10.100.244.104:8887
I0426 21:53:30.854201      18 launcher.go:193] enable caching
F0426 21:53:30.979055      18 main.go:50] Failed to execute component: failed to create directory "/tmp/outputs/output_context_path" for output parameter "output_context_path": mkdir /tmp/outputs/output_context_path: permission denied
time="2022-04-26T21:53:30.980Z" level=info msg="/tmp/outputs/output_context_path/data -> /var/run/argo/outputs/artifacts/tmp/outputs/output_context_path/data.tgz" argo=true
time="2022-04-26T21:53:30.981Z" level=info msg="Taring /tmp/outputs/output_context_path/data"
Error: failed to tarball the output /tmp/outputs/output_context_path/data to /var/run/argo/outputs/artifacts/tmp/outputs/output_context_path/data.tgz: stat /tmp/outputs/output_context_path/data: permission denied
failed to tarball the output /tmp/outputs/output_context_path/data to /var/run/argo/outputs/artifacts/tmp/outputs/output_context_path/data.tgz: stat /tmp/outputs/output_context_path/data: permission denied

The code that produces this is here:

import kfp
from kfp.v2.dsl import component, Artifact, Input, InputPath, Output, OutputPath, Dataset, Model
from typing import NamedTuple

def same_step_000_afc67b36914c4108b47e8b4bb316869d_fn(
    input_context_path: InputPath(str),
    output_context_path: OutputPath(str),
    run_info: str ="gAR9lC4=",
    metadata_url: str="",
):
    from base64 import urlsafe_b64encode, urlsafe_b64decode
    from pathlib import Path
    import datetime
    import requests
    import tempfile
    import dill
    import os

    input_context = None
    with Path(input_context_path).open("rb") as reader:
        input_context = reader.read()

    # Helper function for posting metadata to mlflow.
    def post_metadata(json):
        if metadata_url == "":
            return

        try:
            req = requests.post(metadata_url, json=json)
            req.raise_for_status()
        except requests.exceptions.HTTPError as err:
            print(f"Error posting metadata: {err}")

    # Move to writable directory as user might want to do file IO.
    # TODO: won't persist across steps, might need support in SDK?
    os.chdir(tempfile.mkdtemp())

    # Load information about the current experiment run:
    run_info = dill.loads(urlsafe_b64decode(run_info))

    # Post session context to mlflow.
    if len(input_context) > 0:
            input_context_str = urlsafe_b64encode(input_context)
            post_metadata({
                "experiment_id": run_info["experiment_id"],
                "run_id": run_info["run_id"],
                "step_id": "same_step_000",
                "metadata_type": "input",
                "metadata_value": input_context_str,
                "metadata_time": datetime.datetime.now().isoformat(),
            })

    # User code for step, which we run in its own execution frame.
    user_code = f"""
import dill

# Load session context into global namespace:
if { len(input_context) } > 0:
    dill.load_session("{ input_context_path }")

{dill.loads(urlsafe_b64decode("gASVGAAAAAAAAACMFHByaW50KCJIZWxsbyB3b3JsZCIplC4="))}

# Remove anything from the global namespace that cannot be serialised.
# TODO: this will include things like pandas dataframes, needs sdk support?
_bad_keys = []
_all_keys = list(globals().keys())
for k in _all_keys:
    try:
        dill.dumps(globals()[k])
    except TypeError:
        _bad_keys.append(k)

for k in _bad_keys:
    del globals()[k]

# Save new session context to disk for the next component:
dill.dump_session("{output_context_path}")
"""

    # Runs the user code in a new execution frame. Context from the previous
    # component in the run is loaded into the session dynamically, and we run
    # with a single globals() namespace to simulate top-level execution.
    exec(user_code, globals(), globals())

    # Post new session context to mlflow:
    with Path(output_context_path).open("rb") as reader:
        context = urlsafe_b64encode(reader.read())
        post_metadata({
            "experiment_id": run_info["experiment_id"],
            "run_id": run_info["run_id"],
            "step_id": "same_step_000",
            "metadata_type": "output",
            "metadata_value": context,
            "metadata_time": datetime.datetime.now().isoformat(),
        })

Environment

  • How did you deploy Kubeflow Pipelines (KFP)? From manifests
  • KFP version: 1.8.1
  • KFP SDK version: 1.8.12

I SUSPECT this is because I'm using the native functionality in Kubeflow to write out files to a local temp directory, but I (theorize) in KFP v2 it doesn't auto-create this. Do I need to have a bucket created for this purpose on KFP v2 on AWS?


EDIT TWO: here's the generated yaml - line 317 is the one that worries me. It APPEARS it's putting in the string of output_context_path when shouldn't that be a variable? is that substituted at runtime? --

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: root-pipeline-compilation-
  annotations:
    pipelines.kubeflow.org/kfp_sdk_version: 1.8.12
    pipelines.kubeflow.org/pipeline_compilation_time: '2022-04-29T18:04:24.336927'
    pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": "", "name": "context",
      "optional": true, "type": "String"}, {"default": "", "name": "metadata_url",
      "optional": true, "type": "String"}, {"default": "", "name": "pipeline-root"},
      {"default": "pipeline/root_pipeline_compilation", "name": "pipeline-name"}],
      "name": "root_pipeline_compilation"}'
    pipelines.kubeflow.org/v2_pipeline: "true"
  labels:
    pipelines.kubeflow.org/v2_pipeline: "true"
    pipelines.kubeflow.org/kfp_sdk_version: 1.8.12
spec:
  entrypoint: root-pipeline-compilation
  templates:
  - name: root-pipeline-compilation
    inputs:
      parameters:
      - {name: metadata_url}
      - {name: pipeline-name}
      - {name: pipeline-root}
    dag:
      tasks:
      - name: run-info-fn
        template: run-info-fn
        arguments:
          parameters:
          - {name: pipeline-name, value: '{{inputs.parameters.pipeline-name}}'}
          - {name: pipeline-root, value: '{{inputs.parameters.pipeline-root}}'}
      - name: same-step-000-d5554cccadc4445f91f51849eb5f2de6-fn
        template: same-step-000-d5554cccadc4445f91f51849eb5f2de6-fn
        dependencies: [run-info-fn]
        arguments:
          parameters:
          - {name: metadata_url, value: '{{inputs.parameters.metadata_url}}'}
          - {name: pipeline-name, value: '{{inputs.parameters.pipeline-name}}'}
          - {name: pipeline-root, value: '{{inputs.parameters.pipeline-root}}'}
          - {name: run-info-fn-run_info, value: '{{tasks.run-info-fn.outputs.parameters.run-info-fn-run_info}}'}
  - name: run-info-fn
    container:
      args:
      - sh
      - -c
      - |2

        if ! [ -x "$(command -v pip)" ]; then
            python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip
        fi

        PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet     --no-warn-script-location 'kfp' 'dill' 'kfp==1.8.12' && "$0" "$@"
      - sh
      - -ec
      - |
        program_path=$(mktemp -d)
        printf "%s" "$0" > "$program_path/ephemeral_component.py"
        python3 -m kfp.v2.components.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"
      - |2+

        import kfp
        from kfp.v2 import dsl
        from kfp.v2.dsl import *
        from typing import *

        def run_info_fn(
            run_id: str,
        ) -> NamedTuple("RunInfoOutput", [("run_info", str),]):
            from base64 import urlsafe_b64encode
            from collections import namedtuple
            import datetime
            import base64
            import dill
            import kfp

            client = kfp.Client(host="http://ml-pipeline:8888")
            run_info = client.get_run(run_id=run_id)

            run_info_dict = {
                "run_id": run_info.run.id,
                "name": run_info.run.name,
                "created_at": run_info.run.created_at.isoformat(),
                "pipeline_id": run_info.run.pipeline_spec.pipeline_id,
            }

            # Track kubernetes resources associated wth the run.
            for r in run_info.run.resource_references:
                run_info_dict[f"{r.key.type.lower()}_id"] = r.key.id

            # Base64-encoded as value is visible in kubeflow ui.
            output = urlsafe_b64encode(dill.dumps(run_info_dict))

            return namedtuple("RunInfoOutput", ["run_info"])(
                str(output, encoding="ascii")
            )

      - --executor_input
      - '{{$}}'
      - --function_to_execute
      - run_info_fn
      command: [/kfp-launcher/launch, --mlmd_server_address, $(METADATA_GRPC_SERVICE_HOST),
        --mlmd_server_port, $(METADATA_GRPC_SERVICE_PORT), --runtime_info_json, $(KFP_V2_RUNTIME_INFO),
        --container_image, $(KFP_V2_IMAGE), --task_name, run-info-fn, --pipeline_name,
        '{{inputs.parameters.pipeline-name}}', --run_id, $(KFP_RUN_ID), --run_resource,
        workflows.argoproj.io/$(WORKFLOW_ID), --namespace, $(KFP_NAMESPACE), --pod_name,
        $(KFP_POD_NAME), --pod_uid, $(KFP_POD_UID), --pipeline_root, '{{inputs.parameters.pipeline-root}}',
        --enable_caching, $(ENABLE_CACHING), --, 'run_id={{workflow.uid}}', --]
      env:
      - name: KFP_POD_NAME
        valueFrom:
          fieldRef: {fieldPath: metadata.name}
      - name: KFP_POD_UID
        valueFrom:
          fieldRef: {fieldPath: metadata.uid}
      - name: KFP_NAMESPACE
        valueFrom:
          fieldRef: {fieldPath: metadata.namespace}
      - name: WORKFLOW_ID
        valueFrom:
          fieldRef: {fieldPath: 'metadata.labels[''workflows.argoproj.io/workflow'']'}
      - name: KFP_RUN_ID
        valueFrom:
          fieldRef: {fieldPath: 'metadata.labels[''pipeline/runid'']'}
      - name: ENABLE_CACHING
        valueFrom:
          fieldRef: {fieldPath: 'metadata.labels[''pipelines.kubeflow.org/enable_caching'']'}
      - {name: KFP_V2_IMAGE, value: 'python:3.7'}
      - {name: KFP_V2_RUNTIME_INFO, value: '{"inputParameters": {"run_id": {"type":
          "STRING"}}, "inputArtifacts": {}, "outputParameters": {"run_info": {"type":
          "STRING", "path": "/tmp/outputs/run_info/data"}}, "outputArtifacts": {}}'}
      envFrom:
      - configMapRef: {name: metadata-grpc-configmap, optional: true}
      image: python:3.7
      volumeMounts:
      - {mountPath: /kfp-launcher, name: kfp-launcher}
    inputs:
      parameters:
      - {name: pipeline-name}
      - {name: pipeline-root}
    outputs:
      parameters:
      - name: run-info-fn-run_info
        valueFrom: {path: /tmp/outputs/run_info/data}
      artifacts:
      - {name: run-info-fn-run_info, path: /tmp/outputs/run_info/data}
    metadata:
      annotations:
        pipelines.kubeflow.org/v2_component: "true"
        pipelines.kubeflow.org/component_ref: '{}'
        pipelines.kubeflow.org/arguments.parameters: '{"run_id": "{{workflow.uid}}"}'
      labels:
        pipelines.kubeflow.org/kfp_sdk_version: 1.8.12
        pipelines.kubeflow.org/pipeline-sdk-type: kfp
        pipelines.kubeflow.org/v2_component: "true"
        pipelines.kubeflow.org/enable_caching: "true"
    initContainers:
    - command: [launcher, --copy, /kfp-launcher/launch]
      image: gcr.io/ml-pipeline/kfp-launcher:1.8.7
      name: kfp-launcher
      mirrorVolumeMounts: true
    volumes:
    - {name: kfp-launcher}
  - name: same-step-000-d5554cccadc4445f91f51849eb5f2de6-fn
    container:
      args:
      - sh
      - -c
      - |2

        if ! [ -x "$(command -v pip)" ]; then
            python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip
        fi

        PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet     --no-warn-script-location 'dill' 'requests' 'kfp==1.8.12' && "$0" "$@"
      - sh
      - -ec
      - |
        program_path=$(mktemp -d)
        printf "%s" "$0" > "$program_path/ephemeral_component.py"
        python3 -m kfp.v2.components.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"
      - |2+

        import kfp
        from kfp.v2 import dsl
        from kfp.v2.dsl import *
        from typing import *

        def same_step_000_d5554cccadc4445f91f51849eb5f2de6_fn(
            input_context_path: InputPath(str),
            output_context_path: OutputPath(str),
            run_info: str = "gAR9lC4=",
            metadata_url: str = "",
        ):
            from base64 import urlsafe_b64encode, urlsafe_b64decode
            from pathlib import Path
            import datetime
            import requests
            import tempfile
            import dill
            import os

            input_context = None
            with Path(input_context_path).open("rb") as reader:
                input_context = reader.read()

            # Helper function for posting metadata to mlflow.
            def post_metadata(json):
                if metadata_url == "":
                    return

                try:
                    req = requests.post(metadata_url, json=json)
                    req.raise_for_status()
                except requests.exceptions.HTTPError as err:
                    print(f"Error posting metadata: {err}")

            # Move to writable directory as user might want to do file IO.
            # TODO: won't persist across steps, might need support in SDK?
            os.chdir(tempfile.mkdtemp())

            # Load information about the current experiment run:
            run_info = dill.loads(urlsafe_b64decode(run_info))

            # Post session context to mlflow.
            if len(input_context) > 0:
                    input_context_str = urlsafe_b64encode(input_context)
                    post_metadata({
                        "experiment_id": run_info["experiment_id"],
                        "run_id": run_info["run_id"],
                        "step_id": "same_step_000",
                        "metadata_type": "input",
                        "metadata_value": input_context_str,
                        "metadata_time": datetime.datetime.now().isoformat(),
                    })

            # User code for step, which we run in its own execution frame.
            user_code = f"""
        import dill

        # Load session context into global namespace:
        if { len(input_context) } > 0:
            dill.load_session("{ input_context_path }")

        {dill.loads(urlsafe_b64decode("gASVGAAAAAAAAACMFHByaW50KCJIZWxsbyB3b3JsZCIplC4="))}

        # Remove anything from the global namespace that cannot be serialised.
        # TODO: this will include things like pandas dataframes, needs sdk support?
        _bad_keys = []
        _all_keys = list(globals().keys())
        for k in _all_keys:
            try:
                dill.dumps(globals()[k])
            except TypeError:
                _bad_keys.append(k)

        for k in _bad_keys:
            del globals()[k]

        # Save new session context to disk for the next component:
        dill.dump_session("{output_context_path}")
        """

            # Runs the user code in a new execution frame. Context from the previous
            # component in the run is loaded into the session dynamically, and we run
            # with a single globals() namespace to simulate top-level execution.
            exec(user_code, globals(), globals())

            # Post new session context to mlflow:
            with Path(output_context_path).open("rb") as reader:
                context = urlsafe_b64encode(reader.read())
                post_metadata({
                    "experiment_id": run_info["experiment_id"],
                    "run_id": run_info["run_id"],
                    "step_id": "same_step_000",
                    "metadata_type": "output",
                    "metadata_value": context,
                    "metadata_time": datetime.datetime.now().isoformat(),
                })

      - --executor_input
      - '{{$}}'
      - --function_to_execute
      - same_step_000_d5554cccadc4445f91f51849eb5f2de6_fn
      command: [/kfp-launcher/launch, --mlmd_server_address, $(METADATA_GRPC_SERVICE_HOST),
        --mlmd_server_port, $(METADATA_GRPC_SERVICE_PORT), --runtime_info_json, $(KFP_V2_RUNTIME_INFO),
        --container_image, $(KFP_V2_IMAGE), --task_name, same-step-000-d5554cccadc4445f91f51849eb5f2de6-fn,
        --pipeline_name, '{{inputs.parameters.pipeline-name}}', --run_id, $(KFP_RUN_ID),
        --run_resource, workflows.argoproj.io/$(WORKFLOW_ID), --namespace, $(KFP_NAMESPACE),
        --pod_name, $(KFP_POD_NAME), --pod_uid, $(KFP_POD_UID), --pipeline_root, '{{inputs.parameters.pipeline-root}}',
        --enable_caching, $(ENABLE_CACHING), --, input_context_path=, 'metadata_url={{inputs.parameters.metadata_url}}',
        'run_info={{inputs.parameters.run-info-fn-run_info}}', --]
      env:
      - name: KFP_POD_NAME
        valueFrom:
          fieldRef: {fieldPath: metadata.name}
      - name: KFP_POD_UID
        valueFrom:
          fieldRef: {fieldPath: metadata.uid}
      - name: KFP_NAMESPACE
        valueFrom:
          fieldRef: {fieldPath: metadata.namespace}
      - name: WORKFLOW_ID
        valueFrom:
          fieldRef: {fieldPath: 'metadata.labels[''workflows.argoproj.io/workflow'']'}
      - name: KFP_RUN_ID
        valueFrom:
          fieldRef: {fieldPath: 'metadata.labels[''pipeline/runid'']'}
      - name: ENABLE_CACHING
        valueFrom:
          fieldRef: {fieldPath: 'metadata.labels[''pipelines.kubeflow.org/enable_caching'']'}
      - {name: KFP_V2_IMAGE, value: 'public.ecr.aws/j1r0q0g6/notebooks/notebook-servers/codeserver-python:v1.5.0'}
      - {name: KFP_V2_RUNTIME_INFO, value: '{"inputParameters": {"input_context_path":
          {"type": "STRING"}, "metadata_url": {"type": "STRING"}, "run_info": {"type":
          "STRING"}}, "inputArtifacts": {}, "outputParameters": {"output_context_path":
          {"type": "STRING", "path": "/tmp/outputs/output_context_path/data"}}, "outputArtifacts":
          {}}'}
      envFrom:
      - configMapRef: {name: metadata-grpc-configmap, optional: true}
      image: public.ecr.aws/j1r0q0g6/notebooks/notebook-servers/codeserver-python:v1.5.0
      volumeMounts:
      - {mountPath: /kfp-launcher, name: kfp-launcher}
    inputs:
      parameters:
      - {name: metadata_url}
      - {name: pipeline-name}
      - {name: pipeline-root}
      - {name: run-info-fn-run_info}
    outputs:
      artifacts:
      - {name: same-step-000-d5554cccadc4445f91f51849eb5f2de6-fn-output_context_path,
        path: /tmp/outputs/output_context_path/data}
    metadata:
      annotations:
        pipelines.kubeflow.org/v2_component: "true"
        pipelines.kubeflow.org/component_ref: '{}'
        pipelines.kubeflow.org/arguments.parameters: '{"input_context_path": "", "metadata_url":
          "{{inputs.parameters.metadata_url}}", "run_info": "{{inputs.parameters.run-info-fn-run_info}}"}'
        pipelines.kubeflow.org/max_cache_staleness: P0D
      labels:
        pipelines.kubeflow.org/kfp_sdk_version: 1.8.12
        pipelines.kubeflow.org/pipeline-sdk-type: kfp
        pipelines.kubeflow.org/v2_component: "true"
        pipelines.kubeflow.org/enable_caching: "true"
    initContainers:
    - command: [launcher, --copy, /kfp-launcher/launch]
      image: gcr.io/ml-pipeline/kfp-launcher:1.8.7
      name: kfp-launcher
      mirrorVolumeMounts: true
    volumes:
    - {name: kfp-launcher}
  arguments:
    parameters:
    - {name: context, value: ''}
    - {name: metadata_url, value: ''}
    - {name: pipeline-root, value: ''}
    - {name: pipeline-name, value: pipeline/root_pipeline_compilation}
  serviceAccountName: pipeline-runner

It's DEFINITELY a regression - here's the same YAML generated with the two compiler flags on. The first works, the second doesn't.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source