Giter Club home page Giter Club logo

Comments (58)

sayakpaul avatar sayakpaul commented on June 16, 2024 1

Cool. Now it's more clear to me. Will work on it from here and keep you posted.

from continuous-adaptation-for-machine-learning-system-to-data-changes.

sayakpaul avatar sayakpaul commented on June 16, 2024 1

@deep-diver here's a demo notebook that shows the evaluator component: https://colab.research.google.com/gist/sayakpaul/9a483c09e355dd94e1103d16abf39707/evaluator.ipynb.

from continuous-adaptation-for-machine-learning-system-to-data-changes.

sayakpaul avatar sayakpaul commented on June 16, 2024 1

Understood. I can start working on it from tomorrow/day after. Feel free to suggest if you have any resources/ideas that might be helpful.

from continuous-adaptation-for-machine-learning-system-to-data-changes.

sayakpaul avatar sayakpaul commented on June 16, 2024 1

Yeah sure. Seems like it'd be a fun one but I am guessing I will have lots of doubts so I will send them your way.

from continuous-adaptation-for-machine-learning-system-to-data-changes.

sayakpaul avatar sayakpaul commented on June 16, 2024 1

I see. It's clear now.

from continuous-adaptation-for-machine-learning-system-to-data-changes.

deep-diver avatar deep-diver commented on June 16, 2024 1

@sayakpaul

Did you test it?

Yes, and it doesn't work yet. That is why I have imported our custom components from Dual Deployments project.

I am not sure if parameter_values would still be applicable to it.

At this moment, we don't need any parameter_values, but we have to modify it later and leave a note about the current issue.

Also, for reference in the _create_pipeline() method of the batch prediction notebook, we also need to pass the precompiled pipeline spec path so that we can trigger the model re-training pipeline based on the performance evaluation.

yup, I forgot to upload the training pipeline spec (sorry :( ). Let me do that quickly

from continuous-adaptation-for-machine-learning-system-to-data-changes.

sayakpaul avatar sayakpaul commented on June 16, 2024

@deep-diver I have been thinking a bit about it.

What I have seen in most of the cases is that the batch prediction pipeline is triggered and is usually kept separately from the training pipeline. In our case, the trigger could be the availability of new test data (say at least 100 samples) or we could schedule it.

Therefore, I think we might need to think a bit more about how we would want to incorporate batch prediction in our workflow.

from continuous-adaptation-for-machine-learning-system-to-data-changes.

deep-diver avatar deep-diver commented on June 16, 2024

@sayakpaul

yes I thought so. So, we will build the training pipeline in 02_TFX_Training_Pipeline.ipynb and batch prediction pipeline in 03_Batch_Prediction_Pipeline.ipynb separately.

and the part that you have mentioned (Cloud Scheduler to check if there are enough samples to trigger the batch prediction pipeline) is not listed in 03_Batch_Prediction_Pipeline.ipynb. I think we can make one more notebook for that .

WDYT?

from continuous-adaptation-for-machine-learning-system-to-data-changes.

sayakpaul avatar sayakpaul commented on June 16, 2024

Ah I see. You mean to deploy a separate pipeline for batch prediction, right?

from continuous-adaptation-for-machine-learning-system-to-data-changes.

deep-diver avatar deep-diver commented on June 16, 2024

yeap

the second pipeline will trigger the first pipeline based on the evaluation (like accuracy < threshold). and the second pipeline will be triggered by Cloud Scheduler which periodically checks the number of samples in a certain GCS bucket.

from continuous-adaptation-for-machine-learning-system-to-data-changes.

sayakpaul avatar sayakpaul commented on June 16, 2024

@deep-diver created a notebook: https://colab.research.google.com/gist/sayakpaul/c046543b565fe976880142b3ac417926/03_batch_prediction_pipeline.ipynb.

We need to find a way out to support this type of type-annotation while defining the bulk inferer component: Union[str, Sequence[str]]. While I am aware that we could have simply passed str i.e. Parameter[str] to account for only string inputs but I think we should consider the generic cases too.

I could not find enough documentation around the annotations so I will open up an issue to clarify.

from continuous-adaptation-for-machine-learning-system-to-data-changes.

deep-diver avatar deep-diver commented on June 16, 2024

@sayakpaul

in which parameter do you think we need Union[str, Sequence[str]] ?

from continuous-adaptation-for-machine-learning-system-to-data-changes.

sayakpaul avatar sayakpaul commented on June 16, 2024

@deep-diver here's the updated copy: https://colab.research.google.com/gist/sayakpaul/cb51da98ef55a712209ba435d100ad12/03_batch_prediction_pipeline.ipynb.

It still errors out because of the wrong GCS source entry. I need to run the components in isolation using the InteractiveContext and figure out what's going on. I will do that either tomorrow or later today.

from continuous-adaptation-for-machine-learning-system-to-data-changes.

sayakpaul avatar sayakpaul commented on June 16, 2024

@deep-diver I am not sure why this explicitly string engineering is required:

    # Read GCS Source  
    bucketname = gcs_source.uri.split('//')[1:][0].split('/')[0]
    logging.info(f'bucket: {bucketname}')
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucketname)

    filepath = '/'.join(gcs_source.uri.split('//')[1:][0].split('/')[1:])
    logging.info(f'filepath: {filepath}')
    blob = bucket.blob(filepath)
    gcs_source = f'gs://{blob.download_as_string()}'

If you could add comments on what the string values are after each of the different operations that will be helpful.

from continuous-adaptation-for-machine-learning-system-to-data-changes.

sayakpaul avatar sayakpaul commented on June 16, 2024

@deep-diver in the FileListGen component, we are explicitly setting the value:

outpath.value = gcs_source_bucket + '/' + prefix + output_filename

Then my question is why can't we retrieve this value directly inside the pipeline? Like I'd expect filelist_gen.outputs['outpath'] to have the desired path already. Or we should be able to retrieve it from filelist_gen.outputs['outpath'] since we have already set the value inside the component.

Maybe investigating the outputs and the attributes of filelist_gen.outputs['outpath'] would be more helpful? Perhaps you have already tried that?

from continuous-adaptation-for-machine-learning-system-to-data-changes.

deep-diver avatar deep-diver commented on June 16, 2024

@sayakpaul

I have experimented with your approach for a while yesterday. However, I failed to retrieve the value from filelist_gen.outputs['outpath'], and when investigating filelist_gen.outputs['outpath'] I could not find the value that I set gcs_source_bucket + '/' + prefix + output_filename.

So I gave up with that approach, and I saw I can still get the URI where the Artifact's value is actually stored. I tried downloading the actual value gcs_source_bucket + '/' + prefix + output_filename by accessing the GCS object, and I finally got the value right.

My confusing point is

  • why the value property is not set even though the bulk_inferrer component is guaranteed to be run after filelistgen component.

from continuous-adaptation-for-machine-learning-system-to-data-changes.

sayakpaul avatar sayakpaul commented on June 16, 2024

Exactly my point, @deep-diver. Maybe we should clarify this with Robert?

from continuous-adaptation-for-machine-learning-system-to-data-changes.

deep-diver avatar deep-diver commented on June 16, 2024

Looks great :)

from continuous-adaptation-for-machine-learning-system-to-data-changes.

deep-diver avatar deep-diver commented on June 16, 2024

Reminder & Missing Part

  • when batch prediction pipeline gets triggered, cloud function should be able to pass the range of SPAN
  • after the evaluator component, successive pipeline_trigger component should trigger training pipeline by taking into account the range of SPAN

from continuous-adaptation-for-machine-learning-system-to-data-changes.

sayakpaul avatar sayakpaul commented on June 16, 2024

I suggest we first ensure that the pipeline is running completely till the evaluator and then take it forward. That way it will be easier I guess and we will have more confidence about the validity of each of the components. WDYT?

from continuous-adaptation-for-machine-learning-system-to-data-changes.

deep-diver avatar deep-diver commented on June 16, 2024

sure! I just left this as a reminder while writing down 04_ notebook :)

from continuous-adaptation-for-machine-learning-system-to-data-changes.

deep-diver avatar deep-diver commented on June 16, 2024

Reminder & Thought about PipelineTrigger component

There are two main jobs this component should do

  1. Move newly collected data to GCS bucket with an appropriate SPAN number
  2. Remove from the temporary GCS bucket (prevent Cloud Scheduler to run the batch prediction pipeline once again)
  3. Run the training pipeline

from continuous-adaptation-for-machine-learning-system-to-data-changes.

sayakpaul avatar sayakpaul commented on June 16, 2024

@deep-diver could you provide a schematic of how you are envisioning the steps mentioned above? If you have made any progress share those too.

from continuous-adaptation-for-machine-learning-system-to-data-changes.

deep-diver avatar deep-diver commented on June 16, 2024

@sayakpaul sure!

  1. Move newly collected data to GCS bucket with an appropriate SPAN number

When Evaluator evaluates the model performance < threshold, it is assumed that data/concept drift happened. TFX Trainer component consumes data from ImportExampleGen to train the model, and we planned to store drifted data in different SPAN number in the early stage of this project.

  • the original CIFAR10 dataset is stored under like span-1/train/* and span-1/test/*, and the newly collected dataset should be stored under like span-2/train/* and span-2/test/*.
  1. Remove from the temporary GCS bucket (prevent Cloud Scheduler to run the batch prediction pipeline once again)

If batch prediction pipeline is running, it is assumed that the number of sample data has exceeded the threshold(let's say 100 samples). And it is Cloud Scheduler's job to check the number of sample data if it has exceeded the threshold periodically. If we don't empty the bucket, Cloud Scheduler will see the batch prediction pipeline should be run again and again. So, after moving the sample data to the GCS location where ImportExampleGen would consume, the sample data should be removed from the GCS bucket where they were.

  1. Run the training pipeline

Now we have saved the sample data in SPAN-2, removed from the GCS bucket where they were, then we are OK to run the training pipeline.

please let me know if anything is unclear.

from continuous-adaptation-for-machine-learning-system-to-data-changes.

deep-diver avatar deep-diver commented on June 16, 2024

please note that span-2 is an example, and the span number should be increasing whenever we detect data drift.

from continuous-adaptation-for-machine-learning-system-to-data-changes.

sayakpaul avatar sayakpaul commented on June 16, 2024

the original CIFAR10 dataset is stored under like span-1/train/* and span-1/test/, and the newly collected dataset should be stored under like span-2/train/ and span-2/test/*.

So, the new data should be split into train and test set before it goes the bucket in their respective locations? What should be the split size? Also, we need to ensure there are enough samples available in the bucket to run the batch inferer on. Otherwise, it can lead to compute wastage.

If batch prediction pipeline is running, it is assumed that the number of sample data has exceeded the threshold(let's say 100 samples).

I thought threshold was a performance threshold. Specifically, a threshold on the top-1 accuracy, and this is what you had initially specified in the notebooks as well. Currently, we don't have a threshold on how many samples there should be in the GCS bucket before triggering the batch prediction pipeline. I suppose you are referring to this threshold and I am also assuming that this number needs to be specified in the Cloud Scheduler function?

please note that span-2 is an example, and the span number should be increasing whenever we detect data drift.

How is span tracked?

from continuous-adaptation-for-machine-learning-system-to-data-changes.

deep-diver avatar deep-diver commented on June 16, 2024

So, the new data should be split into train and test set before it goes the bucket in their respective locations? What should be the split size? Also, we need to ensure there are enough samples available in the bucket to run the batch inferer on. Otherwise, it can lead to compute wastage.

It is not decided how many sample is enough and how many data should go into the test set.

I thought threshold was a performance threshold. Specifically, a threshold on the top-1 accuracy, and this is what you had initially specified in the notebooks as well. Currently, we don't have a threshold on how many samples there should be in the GCS bucket before triggering the batch prediction pipeline. I suppose you are referring to this threshold and I am also assuming that this number needs to be specified in the Cloud Scheduler function?

There are two thresholds. one for Evaluator and the other for Cloud Scheduler & Cloud Function to determine if there are enough number of data to trigger the batch prediction pipeline. Cloud Scheduler & Cloud Function is completely out of scope of the 03_Batch_Prediction_Pipeline notebook, and it is handled in the 04_Cloud_Scheduler_Trigger notebook. We can dynamically set the enough number of data in a parameter to Cloud Function.

How is span tracked?

I don't know how exactly SPAN is tracked, but we can specify the range in parameter to Cloud Function that to be passed to the training pipeline. This project is only for demonstration purpose, so we don't put too much data (maybe 100) to SPAN-2.

However, in real world scenario, for data drift situation, we probably need much much more data. In that case, we probably don't need to account for SPAN-1 but SPAN-2, and we don't need to specify the range since the latest SPAN number will be selected automatically.

  • if we want more sophisticated control (like we want the range of the most 2 recent SPAN), then we probably should calculate the range by looking up the GCS bucket.

If you mean how to decide which SPAN number should be assigned to the newly collected data, then it has to be calculated manually by looking up the GCS bucket like how many SPANs there are.

from continuous-adaptation-for-machine-learning-system-to-data-changes.

sayakpaul avatar sayakpaul commented on June 16, 2024

I see.

I think training on top of the already available data would be promising and approachable. I guess for that we need to specify a range of spans? This is also because training on 100 examples seems a bit too unrealistic to me.

It is not decided how many sample is enough and how many data should go into the test set.

Let's cap it to 100 and take an 80:20 split? 80 samples will go to the existing training set and the rest will go to the existing test set. What say?

from continuous-adaptation-for-machine-learning-system-to-data-changes.

deep-diver avatar deep-diver commented on June 16, 2024

or we could collect more data. please let me know how many data sounds realistic at minimum.

from continuous-adaptation-for-machine-learning-system-to-data-changes.

sayakpaul avatar sayakpaul commented on June 16, 2024

Actually adding new examples to the existing training set and then training the model from scratch is a perfectly valid approach. This is mainly because we wouldn't want our model to forget about the recent past but would also want it to adapt to the changes of the present.

Let me know if that makes sense. If it does, then there are some points here where your inputs are needed.

from continuous-adaptation-for-machine-learning-system-to-data-changes.

deep-diver avatar deep-diver commented on June 16, 2024

@sayakpaul

Yup that makes sense. However, here are two possible demonstrations that we can show

  1. Adding more examples to the existing dataset. In this case, we can demonstrate the GCS's data versioning feature. However, it is hard to go back and forth in which version data/concept drift happened

    • This is the case when the new examples resembles to the existing dataset (similar distribution) I guess.
  2. Adding more examples in a separate span. In this case, we can demonstrate ImportExampleGen's input data pattern feature (span number). It is more manageable to have datasets from different distribution in different spans.

Let me know what you think about this.

from continuous-adaptation-for-machine-learning-system-to-data-changes.

sayakpaul avatar sayakpaul commented on June 16, 2024

If we can specify span ranges then I think we can kill two birds with a single stone. Isn't it?

We get all the previous training examples from the previous span as well as the latest ones. We will also be able to track which span contains the drifted data because the recency in the spans would reflect that.

from continuous-adaptation-for-machine-learning-system-to-data-changes.

deep-diver avatar deep-diver commented on June 16, 2024

@sayakpaul

sure thing indeed. so, store the new examples in the succeeding span number. then use two spans for training pipeline.

from continuous-adaptation-for-machine-learning-system-to-data-changes.

sayakpaul avatar sayakpaul commented on June 16, 2024

Yes exactly. Same for the evaluation. Does that sound good?

from continuous-adaptation-for-machine-learning-system-to-data-changes.

deep-diver avatar deep-diver commented on June 16, 2024

Yes sounds good :)

I will work on the Cloud Scheduler / Cloud Function while you work on the last TFX custom component if you are OK with it.

from continuous-adaptation-for-machine-learning-system-to-data-changes.

deep-diver avatar deep-diver commented on June 16, 2024

yea it is good to have you since discussion makes our idea more concrete and robust!

from continuous-adaptation-for-machine-learning-system-to-data-changes.

sayakpaul avatar sayakpaul commented on June 16, 2024

@deep-diver I was referring to this blog: https://cloud.google.com/blog/topics/developers-practitioners/lets-get-it-started-triggering-ml-pipeline-runs.

It seems like it's best if we monitor the results of the batch prediction pipeline and then use its output to reuse the training and deployment pipeline.

So, the idea is to use a Cloud Function to monitor the batch prediction pipeline which should tell if triggering is necessary and if so where to find the data and trigger the training pipeline accordingly. To do this, we could probably maintain a bucket where decisive results from the batch prediction pipeline will be dumped (as .jsons). Here's how a sample result might look like:

{"trigger_training": "True", "new_span": "3"}

Note that the predictions of the batch prediction job itself is different.

The deployed Cloud Function would read this and then take the decision accordingly.

from continuous-adaptation-for-machine-learning-system-to-data-changes.

deep-diver avatar deep-diver commented on June 16, 2024

@sayakpaul

It is possible, but I wonder why it is the best option even though decision is already made at the end of the batch prediction pipeline and we can embed triggering component in the pipeline.

It sounds to me like we are wasting extra resources for extra Cloud Function and Bucket. If your consideration is tracking the history, span number can be tracked by ImportExampleGen already.

I am open to any better options, so please let me know your thoughts.

from continuous-adaptation-for-machine-learning-system-to-data-changes.

sayakpaul avatar sayakpaul commented on June 16, 2024

So we create a component on top of create_job_from_spec() and the component would load a pipeline spec and we would also provide data span to it. Is this how you are envisioning it?

from continuous-adaptation-for-machine-learning-system-to-data-changes.

deep-diver avatar deep-diver commented on June 16, 2024

Yes,

actually we have to use create_run_from_job_spec() in any case since that is the only way to create run for Vertex AI Pipeline from TFX (please correct me if I am wrong).

we can create Cloud Function (and there is create_run_from_job_spec() in it), but what it does is essentially the same thing I guess? It's a matter if we want to create pipeline job directly from batch prediction pipeline or indirectly from Cloud Function (please correct me if I am wrong as well).

from continuous-adaptation-for-machine-learning-system-to-data-changes.

sayakpaul avatar sayakpaul commented on June 16, 2024

@deep-diver here is how I am envisioning the final component:

from google.cloud import storage

from kfp.v2.google.client import AIPlatformClient
from tfx.dsl.component.experimental.annotations import Parameter
from tfx.dsl.component.experimental.decorators import component

from absl import logging


@component
def PipelineTrigger(
	pipeline_spec_path: Parameter[str],
	parameter_values: Parameter[str],
	project_id: Parameter[str],
	region: Parameter[str],
):	
	# Check if the pipeline spec exists.
	storage_client = storage.Client()

	path_parts = pipeline_spec_path.replace("gs://", "").split("/")
	bucket_name = path_parts[0]
	blob_name = "/".join(path_parts[1:])

	bucket = storage_client.bucket(bucket_name)
	blob = storage.Blob(bucket=bucket, name=blob_name)

	if not blob.exists(storage_client):
	  raise ValueError(f"{pipeline_spec_path} does not exist.")

	# Read the parameters values. 
	# Should be essentially the span.
	parameter_values = json.loads(parameter_values)

	# Initialize Vertex AI API client and submit for pipeline execution.
	api_client = AIPlatformClient(project_id=project_id, region=region)

	response = api_client.create_run_from_job_spec(
	  job_spec_path=gcs_pipeline_file_location,
	  parameter_values=parameter_values,
	  enable_caching=True,
	)

	logging.info(response)

We need to decide on how the parameter_values should be crafted.

from continuous-adaptation-for-machine-learning-system-to-data-changes.

deep-diver avatar deep-diver commented on June 16, 2024

I am a bit confused that who writes JSON file? I thought its PipelineTrigger's job

from continuous-adaptation-for-machine-learning-system-to-data-changes.

sayakpaul avatar sayakpaul commented on June 16, 2024

Which JSON are you referring to?

I am referring to the JSON pipeline spec created here: https://github.com/deep-diver/Continuous-Adaptation-for-Machine-Learning-System-to-Data-Changes/blob/main/notebooks/02_TFX_Training_Pipeline.ipynb.

from continuous-adaptation-for-machine-learning-system-to-data-changes.

deep-diver avatar deep-diver commented on June 16, 2024

JSON where the range of span is recorded.

Looks like you get the range of span in parameter_values by looking up the JSON file

from continuous-adaptation-for-machine-learning-system-to-data-changes.

sayakpaul avatar sayakpaul commented on June 16, 2024

Yup. I agree that the PerformanceEvaluator component or the BatchPredictor component should yield that JSON file for us. I will incorporate those changes but I imagine our training pipeline (https://github.com/deep-diver/Continuous-Adaptation-for-Machine-Learning-System-to-Data-Changes/blob/main/notebooks/02_TFX_Training_Pipeline.ipynb) needs to be adjusted first and we need to discuss in which format it would expect the span related information.

WDYT?

from continuous-adaptation-for-machine-learning-system-to-data-changes.

deep-diver avatar deep-diver commented on June 16, 2024

Yes yes
Let me look into it but like in few hours

from continuous-adaptation-for-machine-learning-system-to-data-changes.

deep-diver avatar deep-diver commented on June 16, 2024

I was looking at this code snippet from link

# In cases where files have zero-padding, the width modifier in SPAN spec is
# required so TFX can correctly substitute spec with zero-padded span number.
input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='span-{SPAN}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='span-{SPAN}/eval/*')
            ])
# Specify the span number to be processed here using StaticRange.
range = proto.RangeConfig(
                static_range=proto.StaticRange(
                        start_span_number=1, end_span_number=1)
            )

# After substitution, the train and eval split patterns will be
# 'input_dir/span-01/train/*' and 'input_dir/span-01/eval/*', respectively.
example_gen = CsvExampleGen(input_base=input_dir, input_config=input,
                            range_config=range)

and I thought I can specify start_span_number and end_span_number to 1 and 2 respectively. but I got en error, and I checked the internal implementation like below from here

    if range_config:
      if range_config.HasField('static_range'):
        start_span_number = range_config.static_range.start_span_number
        end_span_number = range_config.static_range.end_span_number
        if start_span_number != end_span_number:
          raise ValueError(
              'For ExampleGen, start and end span numbers for RangeConfig.StaticRange must be equal.'
          )

It looks like only single span number is allowed for ExampleGen type for now. In this case, there are two directions from this point.

  1. only use the most recent SPAN for retraining which is the default behaviour (but we still need to pass the SPAN number if we want to retrain the model with the past data version)
  2. propose ranging behaviour for ExampleGen to TFX team.

please let me know your thought.

from continuous-adaptation-for-machine-learning-system-to-data-changes.

sayakpaul avatar sayakpaul commented on June 16, 2024

Hmm.

only use the most recent SPAN for retraining which is the default behaviour (but we still need to pass the SPAN number if we want to retrain the model with the past data version)

I think being able to add the newly available labeled examples to the existing training (and validation) set is still a valid choice. We both can think of a couple of approaches for accomplishing that but I am personally not sure what would be the best approach. Maybe we check with Robert again to know how to best accomplish this for a TFX pipeline?

propose ranging behaviour for ExampleGen to TFX team.

Sounds good to me.

from continuous-adaptation-for-machine-learning-system-to-data-changes.

deep-diver avatar deep-diver commented on June 16, 2024

Yeah discussing with Robert sounds good to me!

from continuous-adaptation-for-machine-learning-system-to-data-changes.

sayakpaul avatar sayakpaul commented on June 16, 2024

@deep-diver

From the TFX training notebook:

pusher = tfx.extensions.google_cloud_ai_platform.Pusher(
    model=trainer.outputs['model'],
    custom_config={
        tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY: True,
        tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY: region,
        tfx.extensions.google_cloud_ai_platform.VERTEX_CONTAINER_IMAGE_URI_KEY: 'us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-5:latest',
        tfx.extensions.google_cloud_ai_platform.SERVING_ARGS_KEY: SERVING_JOB_SPEC
    }
).with_id('pusher')

Did you test it?

I demoed a sample pipeline trigger component in #3 (comment). However, I am not sure if parameter_values would still be applicable to it. My question is what should be the inputs to this component? Let's say we have the pipeline JSON spec already serialized with the SPAN stuff. Since we are calculating the already with glob pattern matching I suspect we won't need any separate parameter values.

Also, for reference in the _create_pipeline() method of the batch prediction notebook, we also need to pass the precompiled pipeline spec path so that we can trigger the model re-training pipeline based on the performance evaluation.

from continuous-adaptation-for-machine-learning-system-to-data-changes.

deep-diver avatar deep-diver commented on June 16, 2024

@sayakpaul

the JSON spec for training pipeline is uploaded to {PIPELINE_ROOT}/{PIPELINE_DEFINITION_FILE} which is equal to gs://cifar10-experimental-csp/pipeline_root/continuous-adaptation-for-data-changes/continuous-adaptation-for-data-changes_pipeline.json

from continuous-adaptation-for-machine-learning-system-to-data-changes.

sayakpaul avatar sayakpaul commented on June 16, 2024

@deep-diver thank you.

Here's how I have coded them:

Re-trainer component

from google.cloud import storage

from kfp.v2.google.client import AIPlatformClient
from tfx.dsl.component.experimental.annotations import Parameter
from tfx.dsl.component.experimental.decorators import component

from absl import logging


@component
def PipelineTrigger(
	pipeline_spec_path: Parameter[str],
	project_id: Parameter[str],
	region: Parameter[str],
):	
	# Check if the pipeline spec exists.
	storage_client = storage.Client()

	path_parts = pipeline_spec_path.replace("gs://", "").split("/")
	bucket_name = path_parts[0]
	blob_name = "/".join(path_parts[1:])

	bucket = storage_client.bucket(bucket_name)
	blob = storage.Blob(bucket=bucket, name=blob_name)

	if not blob.exists(storage_client):
	  raise ValueError(f"{pipeline_spec_path} does not exist.")


	# Initialize Vertex AI API client and submit for pipeline execution.
	api_client = AIPlatformClient(project_id=project_id, region=region)

	response = api_client.create_run_from_job_spec(
	  job_spec_path=gcs_pipeline_file_location,
	  enable_caching=True,
	)

	logging.info(response)

Modified create_pipeline() method in batch prediction notebook

def _create_pipeline(
    pipeline_name: str,
    pipeline_root: str,
    data_gcs_bucket: str,
    data_gcs_prefix: str,
    batch_job_gcs: str,
    job_display_name: str,
    model_resource_name: str,
    project_id: str,
    region: str,
    threshold: float
) -> Pipeline:

    # Generate a file list for batch preditions. 
    # More details on the structure of this file here:
    # https://bit.ly/3BzfHVu.
    filelist_gen = FileListGen(
        project=project_id,
        gcs_source_bucket=data_gcs_bucket,
        gcs_source_prefix=data_gcs_prefix,
    ).with_id("filelist_gen")

    # Submit a batch prediction job.
    batch_pred_component = BatchPredictionGen(
        project=project_id,
        location=region,
        job_display_name=job_display_name,
        model_resource_name=model_resource_name,
        gcs_source=filelist_gen.outputs["outpath"],
        gcs_destination=f"gs://{batch_job_gcs}/results/",
        accelerator_count=0,
        accelerator_type=None,
    ).with_id("bulk_inferer_vertex")
    batch_pred_component.add_upstream_node(filelist_gen)

    # Evaluate the performance of the predictions. 
    # In a real-world project, this evaluation takes place
    # separately, typically with the help of domain experts. 
    final_gcs_destination=f"gs://{batch_job_gcs}/results/"
    evaluator = PerformanceEvaluator(
        gcs_destination=f'gs://{final_gcs_destination.split("/")[2]}',
        local_directory=final_gcs_destination.split("/")[-2],
        threshold=threshold
    ).with_id("batch_prediction_evaluator")
    evaluator.add_upstream_node(batch_pred_component)
    components = [filelist_gen, batch_pred_component, evaluator]

    # If the evaluation score is below the threshold, trigger re-training.
    re_train = evaluator.outputs["trigger_pipeline"].get()[0].get_string_custom_property("result")
    if retrain:
        # First, fetch the pipeline spec path.
        pipeline_spec_path = os.path.join(pipeline_root, PIPELINE_DEFINITION_FILE)

        # Then submit a training job to Vertex AI.
        trainer = PipelineTrigger(pipeline_spec_path, project_id, region)
        components.extend(trainer)

    return Pipeline(
        pipeline_name=pipeline_name, pipeline_root=pipeline_root, components=components
    )

Would you be able to run it and let me know how it goes? Because you have the metadata of the previous runs already I thought this might speed things up a bit. Let me know what you think.

We still need to add the span-related comments to the 02 notebook.

from continuous-adaptation-for-machine-learning-system-to-data-changes.

deep-diver avatar deep-diver commented on June 16, 2024

sure! did you include these code in the batch prediction pipeline notebook?

from continuous-adaptation-for-machine-learning-system-to-data-changes.

deep-diver avatar deep-diver commented on June 16, 2024

never mind! I will do this :)

from continuous-adaptation-for-machine-learning-system-to-data-changes.

sayakpaul avatar sayakpaul commented on June 16, 2024

Thank you! But please do let me know if the components error out. I will then sync with you and generate the files necessary for running the pipeline.

from continuous-adaptation-for-machine-learning-system-to-data-changes.

deep-diver avatar deep-diver commented on June 16, 2024

@sayakpaul

I don't see the codes that compress new list of images into tfrecord + copy it over to GCS where the ImportExampleGen will look up.

Could you also share a code snippet to build tfrecord from raw list of images that is compatible to the cifar10 from TensorFlow DataSet ?

from continuous-adaptation-for-machine-learning-system-to-data-changes.

sayakpaul avatar sayakpaul commented on June 16, 2024

@deep-diver I am working on that as we speak.

copy it over to GCS where the ImportExampleGen will look up

Where the generated TFRecords should be copied over? What will be the location?

from continuous-adaptation-for-machine-learning-system-to-data-changes.

sayakpaul avatar sayakpaul commented on June 16, 2024

@deep-diver here's a Colab Notebook that shows what you wanted:

https://colab.research.google.com/gist/sayakpaul/95eb115487c9d78eadb4827dad11ad7d/scratchpad.ipynb

Let me know if anything is unclear.

from continuous-adaptation-for-machine-learning-system-to-data-changes.

Related Issues (14)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.