Comments (7)
default accessmodes for big data passing seems to be "ReadWriteMany", which is not supported by all storage classes, can get around this by:
os.environ["DEFAULT_ACCESSMODES"] = "ReadWriteOnce"
from kfp-tekton.
I did my homework and found the following
The set pipeline_workspaces
will be filled only when the task as a param
from kfp-tekton.
Hi, the DEFAULT_STORAGE_CLASS
is only setting up the storage class for big data passing. Are you trying to update the basic artifact storage class/s3 endpoint to something different?
from kfp-tekton.
For basic artifact passing that doesn't require a volume mount, you can update the s3 settings over here. https://github.com/kubeflow/kfp-tekton/blob/master/guides/kfp-admin-guide.md#customize-s3-endpoint-for-kfp-tekton-artifacts
from kfp-tekton.
The following pipeline will look for the volume gp3-csi
.
I am compiling with:
os.environ["DEFAULT_STORAGE_CLASS"] = self.api.get_default_storage()
result = client.create_run_from_pipeline_func(pipeline_func=pipeline)
"""Test pipeline to exercise various data flow mechanisms."""
import kfp
"""Producer"""
def send_file(
file_size_bytes: int,
outgoingfile: kfp.components.OutputPath(),
):
import os
import zipfile
def create_large_file(file_path, size_in_bytes):
with open(file_path, 'wb') as f:
f.write(os.urandom(size_in_bytes))
def zip_file(input_file_path, output_zip_path):
with zipfile.ZipFile(output_zip_path, 'w', compression=zipfile.ZIP_DEFLATED) as zipf:
zipf.write(input_file_path, os.path.basename(input_file_path))
print("starting creating the file...")
file_path = "/tmp/large_file.txt"
create_large_file(file_path, file_size_bytes)
zip_file(file_path, outgoingfile)
print("done")
"""Consumer"""
def receive_file(
incomingfile: kfp.components.InputPath(),
saveartifact: kfp.components.OutputPath(),
):
import os
import shutil
print("reading %s, size is %s" % (incomingfile, os.path.getsize(incomingfile)))
with open(incomingfile, "rb") as f:
b = f.read(1)
print("read byte: %s" % b)
f.close()
print("copying in %s to out %s" % (incomingfile, saveartifact))
shutil.copyfile(incomingfile, saveartifact)
def test_uploaded_artifact(previous_step: kfp.components.InputPath(), file_size_bytes: int, mlpipeline_minio_artifact_secret: str):
from minio import Minio
import base64
import json
print(previous_step)
name_data = previous_step.split('/')
object_name = 'artifacts/' + name_data[4] + '/receive-file/saveartifact.tgz'
mlpipeline_minio_artifact_secret = json.loads(mlpipeline_minio_artifact_secret)["data"]
def inner_decode(my_str):
return base64.b64decode(my_str).decode("utf-8")
host = inner_decode(mlpipeline_minio_artifact_secret["host"])
port = inner_decode(mlpipeline_minio_artifact_secret["port"])
access_key = inner_decode(mlpipeline_minio_artifact_secret["accesskey"])
secret_key = inner_decode(mlpipeline_minio_artifact_secret["secretkey"])
secure = inner_decode(mlpipeline_minio_artifact_secret["secure"])
secure = secure.lower() == 'true'
print(host, port, access_key, secret_key, secure)
client = Minio(
f'{host}:{port}',
access_key=access_key,
secret_key=secret_key,
secure=secure
)
data = client.get_object('mlpipeline', object_name)
with open('my-testfile', 'wb') as file_data:
for d in data.stream(32 * 1024):
file_data.write(d)
bytes_written = file_data.tell()
print(file_size_bytes, bytes_written)
diff = round((bytes_written / file_size_bytes) - 1, 3)
print(diff)
# if not matching, the test will fail
assert diff == 0
"""Build the producer component"""
send_file_op = kfp.components.create_component_from_func(
send_file,
base_image="registry.access.redhat.com/ubi8/python-38",
)
"""Build the consumer component"""
receive_file_op = kfp.components.create_component_from_func(
receive_file,
base_image="registry.access.redhat.com/ubi8/python-38",
)
test_uploaded_artifact_op = kfp.components.create_component_from_func(
test_uploaded_artifact,
base_image="registry.access.redhat.com/ubi8/python-38",
packages_to_install=['minio']
)
"""Wire up the pipeline"""
@kfp.dsl.pipeline(
name="Test Data Passing Pipeline 1",
)
def wire_up_pipeline(mlpipeline_minio_artifact_secret):
import json
file_size_mb = 20
file_size_bytes = file_size_mb * 1024 * 1024
send_file_task = send_file_op(file_size_bytes)
receive_file_task = receive_file_op(
send_file_task.output,
).add_pod_annotation(name='artifact_outputs', value=json.dumps(['saveartifact']))
test_uploaded_artifact_op(receive_file_task.output, file_size_bytes, mlpipeline_minio_artifact_secret)
AWS Error:
Generated from [ebs.csi.aws.com](http://ebs.csi.aws.com/)_aws-ebs-csi-driver-controller-89bbb6b69-p2wdt_b9b496c4-31ba-4b30-a6a4-a8979b562c2413 times in the last 19 minutes
failed to provision volume with StorageClass "gp3-csi": rpc error: code = InvalidArgument desc = Volume capabilities MULTI_NODE_MULTI_WRITER not supported. Only AccessModes[ReadWriteOnce] supported.
from kfp-tekton.
default accessmodes for big data passing seems to be "ReadWriteMany", which is not supported by all storage classes, can get around this by:
os.environ["DEFAULT_ACCESSMODES"] = "ReadWriteOnce"
Yes, set the DEFAULT_ACCESSMODES to the one that your storage can support. Then just make sure affinity assistant or the latest coscheduler is on to have the best experience.
https://github.com/tektoncd/pipeline/releases/tag/v0.51.0
from kfp-tekton.
Wrong test data
from kfp-tekton.
Related Issues (20)
- load pipeline from yaml HOT 2
- Why kfp-tekton-server-api is required? HOT 2
- Recurring runs do not inherit Pipeline Run names and substitutions
- Support runtime configuration in v2 HOT 1
- Task result not passed to nested loop HOT 1
- Performance enhancement: Update pipelineloop to use informer instead of lister for updating pipeline status. HOT 2
- Parameterize the v2 Launcher Image HOT 1
- [KFP-Tekton v2] Merge driver, ktp-task, and taskrun logic into a single custom task controller HOT 2
- Moving KFP-Tekton V2 custom task controller to master branch and use go mod import HOT 1
- Update fluentd example to support new KFP V2 log archive HOT 2
- KFP-Tekton V2 tracker issue
- Update kubectl-wrapper code into a kfp component so user can replicate resource op in kfp v2. HOT 2
- Improve Logging in the KFP components HOT 1
- Tekton V2 compiler: gather user data on how the exit handler should represent HOT 4
- Support artifact passing in airgapped environments with self-signed certs
- Implement removal of SCCs
- Update/Remove hardcoded minio-service annotation
- [KFP-Tekton v2] Merge DAG drivers and publishers into a sub-dag controller to reduce Tekton graph complexity.
- fix datetime usage in python sdk _client.py
- Task result not passed to nested range loop
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from kfp-tekton.