Giter Club home page Giter Club logo

stated-workflow's Introduction

Stated-Workflows

Overview

Stated Workflows is a collection of functions for a lightweight and scalable event-driven workflow engine using Stated template engine. This README assumes some familiarity with the Stated REPL. If you don't have Stated, get it now. The key benefits of Stated Worklflows are:

  • Easy - Stated Workflows are easier to express and comprehend than other workflow languages
  • Testable - Stated Workflows are testable. Every Stated Workflow can be tested from a REPL and behaves exactly locally as it does in Stated Workflow cluster.
  • Interactive - As you can see from the exmaples in this README, you can interact directly with workflows from the REPL
  • Transparent - Stated Workflows takes a "What You See Is What You Get" approach to workflows. Stated-Workflows is the only workflow engine with a JSON-centric approach to data and durability.
  • Highly Available - Stated Workflows can be run in an easy-to-scale, k8s-friendly cluster for scaling, durability, and high availability

Getting Started

Installation

To install the stated-js package, you can use yarn or npm. Open your terminal and run one of the following commands:

Using Yarn:

yarn global add stated-workflows

Using Npm:

npm install -g stated-workflows

Verify you have node:

node -v | grep -Eo 'v([0-9]+)\.' | grep -E 'v19|v[2-9][0-9]' && echo "Node is 19 or higher" || echo "Node is below 19"

Running the REPL

To use the Stated Workflows REPL (Read-Eval-Print Loop) it is recommended to have at least version 19.2 or higher of node.js. The Stated REPL is built on Node REPL. You can start the REPL by running the stateflow command in your terminal:

stateflow

The REPL will launch, allowing you to interact with the stated-js library. In order to launch properly you need to have node on your path.stateflow is a wrapper script that simply calls stated-workflow.js, which contains this #!/usr/bin/env node --experimental-vm-modules.

For example you can enter this command in the REPL:

> .init -f "example/homeworld.json"

Why Not Ordinary Stated Templates?

Ordinary stated templates run a change graph called a DAG. Stated flattens the DAG and executes it as a sequence of expressions called the plan. The example below illustrates how a plan executes a sequence of REST calls and transformations in an ordinary Stated template.

> .init -f "example/homeworld.json"
{
  "lukePersonDetails": "${ $fetch('https://swapi.dev/api/people/?search=luke').json().results[0]}",
  "lukeHomeworldURL": "${ lukePersonDetails.homeworld }",
  "homeworldDetails": "${ $fetch(lukeHomeworldURL).json() }",
  "homeworldName": "${ homeworldDetails.name }"
}
> .plan
[
  "/lukePersonDetails",
  "/lukeHomeworldURL",
  "/homeworldDetails",
  "/homeworldName"
]

homeworld workflow

As 'luke' moves through the plan two REST calls are made, and no new inputs can enter. This is because Stated queues execution plans, allowing one to complete before the next can enter. This is a clean way to prevent concurrent state mutations, and works well for in-memory computation DAGs. We can see that long running I/O operations, however, will bottleneck the template. If the template engine is shutdown, there is no way to 'restart' the work where it left off. We can see that for "workflows", which implies lots of I/O, and hence longer runtimes, we need a way to address these concerns.

Stated Workflow Pipelines

Stated Workflows solves these problems in order to provide a suitable runtime for long-running, I/O heavy workflows:

  • Concurrent, Event Driven, Non-blocking - Stated Workflows provide specific support for $serial and $parallel execution pipelines that can be mixed together and safely run in parallel. Unlike an ordinary expression plan, $serial and $parallel are pipelined and nonblocking. As events arrive they can directly enter a $serial or $parallel without waiting.
  • Atomic State Updates - An atomic state update operator allows concurrent pipelies to avoid Lost Updates.
  • Durability - Pipeline Functions ($serial and parallel) work with --options={"snapshot":{...opts...}} to snapshot their state to various stores. Stated Workflows can be started from a snapshot, hence restoring all pipeline functions to their state at the time of the snapshot.
  • Pub/Sub Connectors - Stated Workflows provide direct access to $publish and subscribe functions that can dispatch events into execution pipelines with any desired parallelism. A simple change to the subscriber configuration allows your workflow to operate against Kafka, Pulsar, and other real-world messaging systems.

Concurrent, Event Driven, Non-blocking

Stated-Workflows provides a set of functions that allow you to integrate with cloud events, consuming and producing from Kafka or Pulsar message buses, as well as HTTP. Publishers and subscribers can be initialized with test data. This example, joinResistance.yaml, generates URLS for members of the resistance, as test data. The data URLs are then published as events and dispatched to a subscriber with a settable parallelism factor. The subscriber $fetches the URL and extracts the Star Wars character's full name from the REST response.

start: ${ (produceParams.data; $millis()) } #record start time, after test dataset has been computed
# producer will be sending some test data
produceParams:
  type: "my-topic"
  data: ${['luke', 'han', 'leia', 'R2-D2', 'Owen', 'Biggs', 'Obi-Wan', 'Anakin', 'Chewbacca', 'Wedge'].('https://swapi.dev/api/people/?search='&$)}
  client:
    type: test
# the subscriber's 'to' function will be called on each received event
subscribeParams: #parameters for subscribing to an event
  source: cloudEvent
  type: /${ produceParams.type } # subscribe to the same topic as we are publishing to test events
  to: /${joinResistance}
  subscriberId: rebelArmy
  initialPosition: latest
  parallelism: 1
  client:
    type: test
joinResistance:  |
  /${ 
    function($url){(
        $rebel := $fetch($url).json().results[0].name; 
        $set( "/rebelForces/-", $rebel) /* append rebel to rebelForces */
    )}  
  }
# starts producer function
send$: $publish(produceParams)
# starts consumer function
recv$: $subscribe(subscribeParams)
# the subscriber's `to` function will write the received data here
rebelForces: [ ]
runtime: ${ (rebelForces; "Rebel forces assembled in " & $string($millis()-start) & " ms")}

Let's see how long it takes to run, using a parallelism of 1 in the subscriber as shown above:

> .init -f "example/joinResistance.yaml" --tail "/rebelForces until $~>$count=10"
Started tailing... Press Ctrl+C to stop.
[
"Luke Skywalker",
"Han Solo",
"Leia Organa",
"R2-D2",
"Owen Lars",
"Biggs Darklighter",
"Obi-Wan Kenobi",
"Anakin Skywalker",
"Chewbacca",
"Wedge Antilles"
]
> .out /runtime
"Rebel forces assembled in 3213 ms"

The runtime reflects the fact that parallelism:1 causes the REST calls to happen in serial. Now let's run a modified version where subscribeParams have "parallelism": 10. We should expect a speedup because there should be as many as 5 concurrent REST calls.

> .init -f "example/joinResistanceFast.yaml" --tail "/rebelForces until $~>$count=10"
Started tailing... Press Ctrl+C to stop.
[
"Owen Lars",
"Han Solo",
"R2-D2",
"Leia Organa",
"Luke Skywalker",
"Chewbacca",
"Obi-Wan Kenobi",
"Wedge Antilles",
"Anakin Skywalker",
"Biggs Darklighter"
]
> .out /runtime
"Rebel forces assembled in 775 ms"

Notice that the order of elements in /rebelForces is not the same as their order in the input data, reflecting the fact that we have 10 concurrent events being processed. The speedup for joinResistanceFast.yaml shows that many REST calls are happening in parallel. The $parallelism factor provides backpressure into the messaging system such as kafka which prevents the number of 'in flight' events from exceeding the $parallism factor.

Atomic State Updates

Stated Workflows provides an atomic primitive that prevents lost-updates with concurrent mutations of arrays. It does this by using a special JSON pointer defined by RFC 6902 JSON Patch for appending to an array. The joinResistanceFast.yaml example shows how to use the syntax: $set( "/rebelForces/-", $rebel) to safely append to an array.

joinResistance:  |
  /${ 
    function($url){(
        $rebel := $fetch($url).json().results[0].name; 
        $set( "/rebelForces/-", $rebel) /* append rebel to rebelForces */
    )}  
  }

Again we can show that joinResistanceFast.yaml produces the expected 10 results. The argument --tail "/rebelForces 10" instructs tail to stop tailing after 10 changes.

> .init -f "example/joinResistanceFast.yaml" --tail "/rebelForces 10"
Started tailing... Press Ctrl+C to stop.
[
  "Owen Lars",
  "Han Solo",
  "R2-D2",
  "Luke Skywalker",
  "Leia Organa",
  "Chewbacca",
  "Obi-Wan Kenobi",
  "Wedge Antilles",
  "Anakin Skywalker",
  "Biggs Darklighter"
]

Let's consider what happens if we don't use an atomic write primitive. Instead, we will read the value of the $rebelForces array and append $rebel to it.

joinResistance:  |
  /${ 
    function($url){(
        $rebel := $fetch($url).json().results[0].name; 
        $set( "/rebelForces", $rebelForces~>$append($rebel)) /* BUG!! */
    )}  
  }

This type of update, where you read a value, mutate it, write it back is a classic example of the lost-update problem. Each of the concurrent pipeline invocations have updated the array, but they have each updated it with the value they read and their change appended to it. The problem is they all read, concurrently, the initial value which is an empty array. You can see below that using a naive update strategy to update rebelForces results in only one of the 10 names appearing in the array.

> .init -f "example/joinResistanceBug.yaml" --tail "/rebelForces 10"
Started tailing... Press Ctrl+C to stop.
"Wedge Antilles"

Pure Function Pipelines - $serial and $parallel

A tried and true strategy for avoiding concurrent state mutation is to use pure functions like this, that pipeline the output of function to the input to the next. If it is necessary to update shared state, it can be done at the end of the pipeline, or at any point in between using atomic state mutations, like this.

function($x){ $x ~> f1 ~> f2 ~> f3 ~> function($x){$set('/saveResult/-', $x) } }

This pipeline can be concurrently dispatched safely.

Durability

Stated Workflows provides a $serial and a $parallel function that should be used when you want each stage of a concurrent pipeline to be snapshotted to durable storage. The stateflow REPL provides a local persistence option, as well as pluggable snapshot persistence.

Workflow Step Logs

The $serial and $parallel functions accept an array of object called "steps". A step is nothing but an object with a field named 'function'. A durable workflow is formed by passing an array of steps to $serial or $parallel, like this:

{
  "out": "${ 'luke' ~> $serial([f1, f2])}",
  "f1": {
    "function": "${ function($in){ $in & ' skywalker' } }"
  },
  "f2": {
    "function": "${ function($in){ $in ~> $uppercase } }"
  }  
}

$serial and $parallel generate a unique invocation id like 2023-11-14-1699922094477-8e85, each time they are invoked. The id is used as a log key. The logs are stored inside each step.

{
  "out": "${ $serial([f1, f2])}",
  "f1": {
    "function": "${ function($in){ $in + 1 } }",
    "log": {
      "2023-11-14-1699922094477-8e85": {
        "start": {
          "timestamp": 1699922094477,
          "args": "luke"
        },
        "end": {
          "timestamp": 1699922095809,
          "out": "luke skywalker"
        }
      }
    },
    "f2": {
      "function": "${ function($in){ $in * 2 } }"
    }
  }
}

When the step completes, its invocation log is removed.

{
  "out": "${ $serial([f1, f2])}",
  "f1": {
    "function": "${ function($in){ $in + 1 } }",
    "log": {}
   },
  "f2": {
    "function": "${ function($in){ $in * 2 } }",
    "log": {}
  }
}

snapshots

Snapshots save the state of a stated template, repeatedly as it runs. Snapshotting allows a Stated Workflow to be stopped non-gracefully, and restored from the snapshot. The step logs allow invocations to restart where they left off.

> .init -f "example/inhabitants.yaml" --options={"snapshot":{"seconds":1}}
{
  "produceParams": {
    "type": "my-topic",
    "data": "${[1..6].($fetch('https://swapi.dev/api/planets/?page=' & $string($)).json().results)}",
    "client": {
      "type": "test"
    }
  },
  "subscribeResidents": {
    "source": "cloudEvent",
    "type": "/${ produceParams.type }",
    "to": "/${ getResidentsWorkflow }",
    "subscriberId": "subscribeResidents",
    "parallelism": 4,
    "client": {
      "type": "test"
    }
  },
  "getResidentsWorkflow": {
    "function": "/${ function($planetInfo){ $planetInfo ~> $serial([extractResidents, fetchResidents]) }  }"
  },
  "extractResidents": {
    "function": "/${ function($planet){$planet.residents.($fetch($).json())}  }"
  },
  "fetchResidents": {
    "function": "/${ function($resident){$resident?$set('/residents/-',{'name':$resident.name, 'url':$resident.url})}  }"
  },
  "residents": [],
  "send$": "$publish(produceParams)",
  "recv$": "$subscribe(subscribeResidents)"
}

The --options={"snapshot":{"seconds":1}} causes a snapshot to be saved to defaultSnapshot.json once a second. The snapshot is just an ordinary json file with two parts: {"template":{...}, "output":{...}}

cat defaultSnapshot.json

retries

Each step can provide an optional boolean function shouldRetry. On a workflow invocation failure the function will be called with an invocation log passed as an argument. If the function returns true, the function will be retried. The invocatiopn log contains a retryCount field that can be used to limit the number of retries.

The following example shows how to use the shouldRetry function to retry a step 4 times before failing.

> .init -f example/homeworlds-steps-with-retry.json --options={"keepLogs":true}
{
  "output": "${   ['luke']~>$map(workflow) }",
  "workflow": "${ function($person){$person~>$serial(steps)} }",
  "connectionError": true,
  "steps": [
    {
      "function": "${  function($person){$fetch('https://swapi.dev/api/people/?search='& $person).json().results[0]}   }"
    },
    {
      "function": "${  function($personDetail){$personDetail.homeworld }  }"
    },
    {
      "function": "/${ function($homeworldURL){ ($url := connectionError ? $homeworldURL & '--broken--' : $homeworldURL ; $set('/connectionError', $not(connectionError)); $fetch($url).json(); ) }  }",
      "shouldRetry": "${  function($log){ $log.end ? false : $log.retryCount < 4 }  }"
    },
    {
      "function": "${  function($homeworldDetail){$homeworldDetail.name }  }"
    }
  ]
}

A template can be restored, and each of its invocations begins again, by loading the snapshot ...coming soon

stated-workflow's People

Stargazers

 avatar  avatar

Watchers

 avatar  avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.