Giter Club home page Giter Club logo

Comments (7)

HumairAK avatar HumairAK commented on July 30, 2024 1

default accessmodes for big data passing seems to be "ReadWriteMany", which is not supported by all storage classes, can get around this by:

os.environ["DEFAULT_ACCESSMODES"] = "ReadWriteOnce"

https://github.com/kubeflow/kfp-tekton/blob/master/sdk/python/kfp_tekton/compiler/_data_passing_rewriter.py#L484

from kfp-tekton.

diegolovison avatar diegolovison commented on July 30, 2024

I did my homework and found the following
The set pipeline_workspaces will be filled only when the task as a param

image
image

from kfp-tekton.

Tomcli avatar Tomcli commented on July 30, 2024

Hi, the DEFAULT_STORAGE_CLASS is only setting up the storage class for big data passing. Are you trying to update the basic artifact storage class/s3 endpoint to something different?

from kfp-tekton.

Tomcli avatar Tomcli commented on July 30, 2024

For basic artifact passing that doesn't require a volume mount, you can update the s3 settings over here. https://github.com/kubeflow/kfp-tekton/blob/master/guides/kfp-admin-guide.md#customize-s3-endpoint-for-kfp-tekton-artifacts

from kfp-tekton.

diegolovison avatar diegolovison commented on July 30, 2024

The following pipeline will look for the volume gp3-csi.

I am compiling with:

        os.environ["DEFAULT_STORAGE_CLASS"] = self.api.get_default_storage()
        result = client.create_run_from_pipeline_func(pipeline_func=pipeline)
"""Test pipeline to exercise various data flow mechanisms."""
import kfp


"""Producer"""


def send_file(
    file_size_bytes: int,
    outgoingfile: kfp.components.OutputPath(),
):
    import os
    import zipfile

    def create_large_file(file_path, size_in_bytes):
        with open(file_path, 'wb') as f:
            f.write(os.urandom(size_in_bytes))

    def zip_file(input_file_path, output_zip_path):
        with zipfile.ZipFile(output_zip_path, 'w', compression=zipfile.ZIP_DEFLATED) as zipf:
            zipf.write(input_file_path, os.path.basename(input_file_path))

    print("starting creating the file...")
    file_path = "/tmp/large_file.txt"
    create_large_file(file_path, file_size_bytes)
    zip_file(file_path, outgoingfile)
    print("done")


"""Consumer"""


def receive_file(
    incomingfile: kfp.components.InputPath(),
    saveartifact: kfp.components.OutputPath(),
):
    import os
    import shutil

    print("reading %s, size is %s" % (incomingfile, os.path.getsize(incomingfile)))

    with open(incomingfile, "rb") as f:
        b = f.read(1)
        print("read byte: %s" % b)
        f.close()

    print("copying in %s to out %s" % (incomingfile, saveartifact))
    shutil.copyfile(incomingfile, saveartifact)


def test_uploaded_artifact(previous_step: kfp.components.InputPath(), file_size_bytes: int, mlpipeline_minio_artifact_secret: str):
    from minio import Minio
    import base64
    import json

    print(previous_step)
    name_data = previous_step.split('/')
    object_name = 'artifacts/' + name_data[4] + '/receive-file/saveartifact.tgz'

    mlpipeline_minio_artifact_secret = json.loads(mlpipeline_minio_artifact_secret)["data"]

    def inner_decode(my_str):
        return base64.b64decode(my_str).decode("utf-8")

    host = inner_decode(mlpipeline_minio_artifact_secret["host"])
    port = inner_decode(mlpipeline_minio_artifact_secret["port"])
    access_key = inner_decode(mlpipeline_minio_artifact_secret["accesskey"])
    secret_key = inner_decode(mlpipeline_minio_artifact_secret["secretkey"])
    secure = inner_decode(mlpipeline_minio_artifact_secret["secure"])
    secure = secure.lower() == 'true'
    print(host, port, access_key, secret_key, secure)
    client = Minio(
        f'{host}:{port}',
        access_key=access_key,
        secret_key=secret_key,
        secure=secure
    )

    data = client.get_object('mlpipeline', object_name)
    with open('my-testfile', 'wb') as file_data:
        for d in data.stream(32 * 1024):
            file_data.write(d)
        bytes_written = file_data.tell()

    print(file_size_bytes, bytes_written)
    diff = round((bytes_written / file_size_bytes) - 1, 3)
    print(diff)
    # if not matching, the test will fail
    assert diff == 0


"""Build the producer component"""
send_file_op = kfp.components.create_component_from_func(
    send_file,
    base_image="registry.access.redhat.com/ubi8/python-38",
)

"""Build the consumer component"""
receive_file_op = kfp.components.create_component_from_func(
    receive_file,
    base_image="registry.access.redhat.com/ubi8/python-38",
)

test_uploaded_artifact_op = kfp.components.create_component_from_func(
    test_uploaded_artifact,
    base_image="registry.access.redhat.com/ubi8/python-38",
    packages_to_install=['minio']
)

"""Wire up the pipeline"""


@kfp.dsl.pipeline(
    name="Test Data Passing Pipeline 1",
)
def wire_up_pipeline(mlpipeline_minio_artifact_secret):
    import json

    file_size_mb = 20
    file_size_bytes = file_size_mb * 1024 * 1024

    send_file_task = send_file_op(file_size_bytes)

    receive_file_task = receive_file_op(
        send_file_task.output,
    ).add_pod_annotation(name='artifact_outputs', value=json.dumps(['saveartifact']))

    test_uploaded_artifact_op(receive_file_task.output, file_size_bytes, mlpipeline_minio_artifact_secret)

AWS Error:

Generated from [ebs.csi.aws.com](http://ebs.csi.aws.com/)_aws-ebs-csi-driver-controller-89bbb6b69-p2wdt_b9b496c4-31ba-4b30-a6a4-a8979b562c2413 times in the last 19 minutes
failed to provision volume with StorageClass "gp3-csi": rpc error: code = InvalidArgument desc = Volume capabilities MULTI_NODE_MULTI_WRITER not supported. Only AccessModes[ReadWriteOnce] supported.

from kfp-tekton.

Tomcli avatar Tomcli commented on July 30, 2024

default accessmodes for big data passing seems to be "ReadWriteMany", which is not supported by all storage classes, can get around this by:

os.environ["DEFAULT_ACCESSMODES"] = "ReadWriteOnce"

https://github.com/kubeflow/kfp-tekton/blob/master/sdk/python/kfp_tekton/compiler/_data_passing_rewriter.py#L484

Yes, set the DEFAULT_ACCESSMODES to the one that your storage can support. Then just make sure affinity assistant or the latest coscheduler is on to have the best experience.
https://github.com/tektoncd/pipeline/releases/tag/v0.51.0

from kfp-tekton.

diegolovison avatar diegolovison commented on July 30, 2024

Wrong test data

from kfp-tekton.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.