Giter Club home page Giter Club logo

flytekit's Introduction

Hi there, I'm Yicheng ๐Ÿ‘‹

๐Ÿ” I'm passionate about contributing to Open Source.

flytekit's People

Contributors

akhurana001 avatar alekhyasasi avatar asottile avatar bnsblue avatar byronhsu avatar cosmicbboy avatar dependabot[bot] avatar derwiki avatar eapolinario avatar enghabu avatar fg91 avatar flixr avatar future-outlier avatar honnix avatar jeevb avatar katrogan avatar kumare3 avatar matthewphsmith avatar mayitbeegh avatar migueltol22 avatar pingsutw avatar samhita-alla avatar slai avatar smritisatyanv avatar snyk-bot avatar sonjaer avatar th0114nd avatar wild-endeavor avatar yicheng-lu-llll avatar yindia avatar

Stargazers

 avatar

flytekit's Issues

flytekit metrics

Current Workflow Metrics Exploration in Flyte

1. Propeller sends events

Flyte Propeller dispatches various events, which include a reportedAt timestamp. These events encompass WorkflowEvent, NodeEvent, and TaskEvent, each containing relevant timing information.

Refer to the related code here.

2. Flyte Admin stores the time info in DB

Flyte Admin processes the received events and persists them in the database as executions.

Check the corresponding code here.

3. Console Metrics Fetching

Flyte Console retrieves Execution Metrics through the GetExecutionMetrics function. This gRPC function returns a workflow span, which acts as the root of a multi-node tree where each span can have a list of child spans. The span types include workflow, task, node, and operation(see here).

A workflow span can contain multiple node spans. A node span can have multiple task spans when there's a subworkflow. Task spans, however, should not have child spans. Flyte Console deals with these spans recursively and show them in UI.

See related code here.

Flytekit Metrics Exploration Proposal

1. Association with Tasks

Flytekit metrics should be associated with a task. On a timeline graph, all Flytekit metrics should be within a task's start and end times. Therefore, a TaskExecutionIdentifier is required to associate with Flytekit metrics.

2. Add Flytekit Spans

Given that spans are organized as a multi-node tree and Flyte Console processes spans recursively, a new type of span - the Flytekit span is needed. We can append all Flytekit spans to the corresponding task spans as their children. This approach minimizes code changes in Flyte Console.

This process can be broken down as follows:

  1. During each task run in k8s, Flytekit collects metrics and fetches the TaskExecutionIdentifier in some ways (TODO). These metrics are then converted into Flytekit spans and sent to Flyte Admin. A potential way to access the TaskExecutionIdentifier could be during the invocation of the load_task function in entrypoint.py. But need further investigation. Also, refer to the definition of TaskExecutionIdentifier in Flytekit here.

  2. Flyte Admin saves these Flytekit spans directly to the db without additional processing.

  3. When Flyte Console calls GetExecutionMetrics, it also gets a workflow span(so, the interface does not change, no grpc changes). But this time, the span tree will also include flytekit span. This minor change allows Flytekit metrics to be displayed in the UI with minimal changes to Flyte Console.

Pseudo code in Flytekit(send metrics to flyte admin):

grpcClient.create_flytekit_spins(all_flytekit_metrics, TaskExecutionIdentifier)

Pseudo code in Flyte Admin(store the received flytekit spins in db, associated with TaskExecutionIdentifier):

func createFlytekitSpins(allFlytekitMetrics Metrics, taskExecutionIdentifier TaskExecutionIdentifier) {
    db.Write(allFlytekitMetrics, taskExecutionIdentifier)
}

Pseudo code for the concatenateSpans function(ass additional code for GetExecutionMetrics, so that it still return workflow spin(the root of the tree), but contain flytekit spins this time):

func concatenateSpans(workflowSpan root) []Span {
# iterate the tree, and if task span is found,  
# use taskExecutionIdentifier to get all Flytekit spin 
# and append to the task span as its children.

In Flyte Console: almost no change, Flyte Console will automatically do the recursion and get all the time info.

Code in flyteidl:

// Span represents a duration trace of Flyte execution. The id field denotes a Flyte execution entity or an operation
// which uniquely identifies the Span. The spans attribute allows this Span to be further broken down into more
// precise definitions.
message Span {
    // start_time defines the instance this span began.
    google.protobuf.Timestamp start_time = 1;

    // end_time defines the instance this span completed.
    google.protobuf.Timestamp end_time = 2;

    oneof id {
        // workflow_id is the id of the workflow execution this Span represents.
        flyteidl.core.WorkflowExecutionIdentifier workflow_id = 3;

        // node_id is the id of the node execution this Span represents.
        flyteidl.core.NodeExecutionIdentifier node_id = 4;

        // task_id is the id of the task execution this Span represents.
        flyteidl.core.TaskExecutionIdentifier task_id = 5;

        // operation_id is the id of a unique operation that this Span represents.
        string operation_id = 6;

       // add here, flytekit_id can be the name of the executing code, like "download s3 data"
       string flytekit_id = 7;
    }

    // spans defines a collection of Spans that breakdown this execution.
    repeated Span spans = 7;
}

Also GRPC for create_flytekit_spins

rpc CreateFlytekitSpan(CreateFlytekitSpanRequest) returns (CreateFlytekitSpanResponse);

flytekit metrics note

Current frame work:
flyteorg/flyte#3272

propeller:
https://github.com/flyteorg/flytepropeller/blob/f4cadb0062320086096306c8d6e5a1e412090ef8/events/admin_eventsink.go#LL43C1-L43C1

func (s *adminEventSink) Sink(ctx context.Context, message proto.Message) error {

flyteidl:
https://github.com/flyteorg/flyteidl/pull/367/files

  // Fetches runtime metrics for a :ref:`ref_flyteidl.admin.Execution`.
  rpc GetExecutionMetrics (flyteidl.admin.WorkflowExecutionGetMetricsRequest) returns (flyteidl.admin.WorkflowExecutionGetMetricsResponse) {
    option (google.api.http) = {
      get: "/api/v1/metrics/executions/{id.project}/{id.domain}/{id.name}"
    };
    // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
    //   description: "Retrieve metrics from an existing workflow execution."
    // };
  };
}
// WorkflowExecutionGetMetricsRequest represents a request to retrieve metrics for the specified workflow execution.
message WorkflowExecutionGetMetricsRequest {
    // id defines the workflow execution to query for.
    core.WorkflowExecutionIdentifier id = 1;

    // depth defines the number of Flyte entity levels to traverse when breaking down execution details.
    int32 depth = 2;
}
// WorkflowExecutionGetMetricsResponse represents the response containing metrics for the specified workflow execution.
message WorkflowExecutionGetMetricsResponse {
    // Span defines the top-level breakdown of the workflows execution. More precise information is nested in a
    // hierarchical structure using Flyte entity references.
    core.Span span = 1;
}
/ Span represents a duration trace of Flyte execution. The id field denotes a Flyte execution entity or an operation
// which uniquely identifies the Span. The spans attribute allows this Span to be further broken down into more
// precise definitions.
message Span {
    // start_time defines the instance this span began.
    google.protobuf.Timestamp start_time = 1;

    // end_time defines the instance this span completed.
    google.protobuf.Timestamp end_time = 2;

    oneof id {
        // workflow_id is the id of the workflow execution this Span represents.
        flyteidl.core.WorkflowExecutionIdentifier workflow_id = 3;

        // node_id is the id of the node execution this Span represents.
        flyteidl.core.NodeExecutionIdentifier node_id = 4;

        // task_id is the id of the task execution this Span represents.
        flyteidl.core.TaskExecutionIdentifier task_id = 5;

        // operation_id is the id of a unique operation that this Span represents.
        string operation_id = 6;
    }

    // spans defines a collection of Spans that breakdown this execution.
    repeated Span spans = 7;
}

flyteadmin:
https://github.com/flyteorg/flyteadmin/pull/524/files
receive event. reportedAt

func (m *AdminService) GetExecutionMetrics(
	ctx context.Context, request *admin.WorkflowExecutionGetMetricsRequest) (*admin.WorkflowExecutionGetMetricsResponse, error) {
	defer m.interceptPanic(ctx, request)
	if request == nil {
		return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
	}
	var response *admin.WorkflowExecutionGetMetricsResponse
	var err error
	m.Metrics.executionEndpointMetrics.getMetrics.Time(func() {
		response, err = m.MetricsManager.GetExecutionMetrics(ctx, *request)
	})
	if err != nil {
		return nil, util.TransformAndRecordError(err, &m.Metrics.executionEndpointMetrics.getMetrics)
	}
	m.Metrics.executionEndpointMetrics.getMetrics.Success()
	return response, nil
}

https://github.com/flyteorg/flyteadmin/blob/master/pkg/manager/impl/metrics_manager.go

// GetExecutionMetrics returns a Span hierarchically breaking down the workflow execution into a collection of
// Categorical and Reference Spans.
func (m *MetricsManager) GetExecutionMetrics(ctx context.Context,
	request admin.WorkflowExecutionGetMetricsRequest) (*admin.WorkflowExecutionGetMetricsResponse, error) {

	// retrieve workflow execution
	executionRequest := admin.WorkflowExecutionGetRequest{Id: request.Id}
	execution, err := m.executionManager.GetExecution(ctx, executionRequest)
	if err != nil {
		return nil, err
	}

	span, err := m.parseExecution(ctx, execution, int(request.Depth))
	if err != nil {
		return nil, err
	}

	return &admin.WorkflowExecutionGetMetricsResponse{Span: span}, nil
}

flyteconsole:
https://github.com/flyteorg/flyteconsole/pull/747/files

export const fetchExecutionMetrics = async (
  id: WorkflowExecutionIdentifier,
  depth: number,
  apiContext: APIContextValue,
) => {
  const { getExecutionMetrics } = apiContext;
  const metrics = await getExecutionMetrics(id, {
    params: {
      depth,
    },
  });
  return metrics;
};

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.