Current Workflow Metrics Exploration in Flyte
1. Propeller sends events
Flyte Propeller dispatches various events, which include a reportedAt
timestamp. These events encompass WorkflowEvent
, NodeEvent
, and TaskEvent
, each containing relevant timing information.
Refer to the related code here.
2. Flyte Admin stores the time info in DB
Flyte Admin processes the received events and persists them in the database as executions.
Check the corresponding code here.
3. Console Metrics Fetching
Flyte Console retrieves Execution Metrics through the GetExecutionMetrics
function. This gRPC function returns a workflow span, which acts as the root of a multi-node tree where each span can have a list of child spans. The span types include workflow, task, node, and operation(see here).
A workflow span can contain multiple node spans. A node span can have multiple task spans when there's a subworkflow. Task spans, however, should not have child spans. Flyte Console deals with these spans recursively and show them in UI.
See related code here.
Flytekit Metrics Exploration Proposal
1. Association with Tasks
Flytekit metrics should be associated with a task. On a timeline graph, all Flytekit metrics should be within a task's start and end times. Therefore, a TaskExecutionIdentifier
is required to associate with Flytekit metrics.
2. Add Flytekit Spans
Given that spans are organized as a multi-node tree and Flyte Console processes spans recursively, a new type of span - the Flytekit span is needed. We can append all Flytekit spans to the corresponding task spans as their children. This approach minimizes code changes in Flyte Console.
This process can be broken down as follows:
-
During each task run in k8s, Flytekit collects metrics and fetches the TaskExecutionIdentifier
in some ways (TODO). These metrics are then converted into Flytekit spans and sent to Flyte Admin. A potential way to access the TaskExecutionIdentifier
could be during the invocation of the load_task
function in entrypoint.py
. But need further investigation. Also, refer to the definition of TaskExecutionIdentifier
in Flytekit here.
-
Flyte Admin saves these Flytekit spans directly to the db without additional processing.
-
When Flyte Console calls GetExecutionMetrics
, it also gets a workflow span(so, the interface does not change, no grpc changes). But this time, the span tree will also include flytekit span. This minor change allows Flytekit metrics to be displayed in the UI with minimal changes to Flyte Console.
Pseudo code in Flytekit(send metrics to flyte admin):
grpcClient.create_flytekit_spins(all_flytekit_metrics, TaskExecutionIdentifier)
Pseudo code in Flyte Admin(store the received flytekit spins in db, associated with TaskExecutionIdentifier):
func createFlytekitSpins(allFlytekitMetrics Metrics, taskExecutionIdentifier TaskExecutionIdentifier) {
db.Write(allFlytekitMetrics, taskExecutionIdentifier)
}
Pseudo code for the concatenateSpans function(ass additional code for GetExecutionMetrics, so that it still return workflow spin(the root of the tree), but contain flytekit spins this time):
func concatenateSpans(workflowSpan root) []Span {
# iterate the tree, and if task span is found,
# use taskExecutionIdentifier to get all Flytekit spin
# and append to the task span as its children.
In Flyte Console: almost no change, Flyte Console will automatically do the recursion and get all the time info.
Code in flyteidl:
// Span represents a duration trace of Flyte execution. The id field denotes a Flyte execution entity or an operation
// which uniquely identifies the Span. The spans attribute allows this Span to be further broken down into more
// precise definitions.
message Span {
// start_time defines the instance this span began.
google.protobuf.Timestamp start_time = 1;
// end_time defines the instance this span completed.
google.protobuf.Timestamp end_time = 2;
oneof id {
// workflow_id is the id of the workflow execution this Span represents.
flyteidl.core.WorkflowExecutionIdentifier workflow_id = 3;
// node_id is the id of the node execution this Span represents.
flyteidl.core.NodeExecutionIdentifier node_id = 4;
// task_id is the id of the task execution this Span represents.
flyteidl.core.TaskExecutionIdentifier task_id = 5;
// operation_id is the id of a unique operation that this Span represents.
string operation_id = 6;
// add here, flytekit_id can be the name of the executing code, like "download s3 data"
string flytekit_id = 7;
}
// spans defines a collection of Spans that breakdown this execution.
repeated Span spans = 7;
}
Also GRPC for create_flytekit_spins
rpc CreateFlytekitSpan(CreateFlytekitSpanRequest) returns (CreateFlytekitSpanResponse);