Giter Club home page Giter Club logo

eventual's Introduction


eventual

Develop massively-distributed systems using APIs, Messaging and Workflows. Test locally, then deploy to your own cloud infrastructure.

Apache 2.0 License Discord Twitter

Overview

API Docs

Eventual is an open source TypeScript framework that offers "core abstractions" β€” including APIs, Messaging and long-running, durable Workflows β€” to shield you from the complexities of distributed systems and ensure a consistent, best-practice serverless architecture.

Develop

  • 🌎 APIs - expose RPC and REST endpoints
  • 🚦 Orchestration - build long running, durable workflows using plain TypeScript - such as if-else, loops, functions, async/await, and all that goodness
  • πŸ’¬ Messaging - publish and subscribe to events within and across service boundaries
  • βœ… End-to-end type safety - from your frontend β†’ backend β†’ infrastructure

Iterate

  • πŸ§ͺ Local Testing - unit test with fine-grained control over timing to catch tricky edge cases
  • πŸ€– Simulate - run your massively distributed system, locally and deploy only when it's working
  • 🐞 Debug Production - replay and debug production workflows from the comfort of your IDE

Deploy

  • πŸ›  Infrastructure as Code - integrates directly into the AWS CDK, SST and Pulumi
  • 🌩️ Your Cloud - runs in your own infrastructure and security boundaries
  • πŸ“ˆ Serverless - we shield you from the complexities of distributed systems and ensure a consistent, best-practice serverless architecture

Quick Start

Start a new project with Eventual or drop-in to an existing AWS CDK or SST application by visiting the Quick Start.

# create a new project
npm create eventual

# enter the new project's directory
cd <project-name>

# deploy to AWS
npm run deploy

Examples

How it works

Eventual offers "core abstractions" for distributed systems that are mapped to AWS Serverless Resources. These primitives include Commands, Events, Subscriptions, Workflows, Tasks, Signals and Actors (coming soon). They provide a simple, consistent and type-safe programming model for micro-services.

The top-level concept of Eventual is a Service that be deployed to AWS with a CDK Construct. Each Service gets its own API Gateway, Event Bus, and Workflow Engine that you customize and build on using the core abstractions.

Eventual ships with its own Workflow Engine that deploys into your AWS account. It consists of purely serverless AWS Resources, such as an AWS SQS FIFO Queue, S3 Bucket, DynamoDB Table and Event Bridge Scheduler Group. This provides an abstraction for orchestrating long running, durable workflows using plain TypeScript - such as if-else, loops, functions, async/await, Promise.all, etc. This gives you an expressive, Turing complete way to implement business logic, however complex, distributed or time-dependent it may be.

The business logic of your Service (including APIs, Subscriptions, Tasks, etc.) are discovered by analyzing your code and then optimally configuring AWS Resources for them. This includes optimal tree-shaking, bundling into individual Lambda Functions and providing supporting infrastructure such as Event Bus Rules, Dead Letter Queues, IAM Roles, etc.

Why Eventual?

Eventual accelerates the development of distributed systems in the cloud with its fully integrated tool-chain.

You don't have to worry about low-level primitives such as concurrency control, scaling, or glueing individual AWS Resources together and managing all of their failure cases. Instead, you simply write your application in TypeScript and then deploy directly to AWS using your favorite IaC framework.

Eventual provides core abstractions that shield you from the complexities of distributed systems and ensure a consistent, best-practice serverless architecture with a slick and type-safe programming model.

Everything can be written and tested in a single TypeScript code-base. You can run your massively scalable distributed cloud system LOCALLY before you even deploy. Run, test and iterate locally, then deploy only when it’s working.

You can even debug production locally. Workflows running in production can be replayed locally and debugged from the comfort of your IDE.

Eventual supports customization and integration via IaC. Our Construct provides best practices out of the box but you are free to override and configure according to your needs.

All of your data remains within your own AWS account and security boundaries. With broad strokes, you can apply your own security policies to the system, such as server-side or client-side encryption, minimally permissive IAM Policies, etc.

Contact us

Project Leads

eventual's People

Contributors

cfraz89 avatar jogold avatar sam-goodwin avatar thantos avatar yehudacohen avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

eventual's Issues

Support RemovalPolicy

It should be possible to create an eventual.Service with a RemovalPolicy that applies to all child resources.

Event Bridge Target

new Rule().addTarget(new EventualServiceTarget(service));

Allow forwarding events from another event bus to the Service.

CJS warning in runtime tests

This is bloating our logs and i wonder if it's a problem?

ts-jest[ts-compiler] (WARN) Unable to process '/home/runner/work/eventual/eventual/packages/@eventual/core/lib/cjs/error.js', falling back to original file content. You can also configure Jest config option `transformIgnorePatterns` to ignore /home/runner/work/eventual/eventual/packages/@eventual/core/lib/cjs/error.js from transformation or make sure that `outDir` in your tsconfig is neither `''` or `'.'`

Concurrency Eventuals

  • Promise.
    • all - resolve completed results of all eventuals or reject after first failed
    • race - resolve the first resolved (completed or failed) eventual
    • any - resolve the first completed eventual or fail
    • allSettled - resolve the error or result of all eventuals once all resolve

Move EventualApi into the (new) Service Construct

Multiple workflows are allowed as of #33 :

export const workflow1 = workflow("worfklow1", () => ..);
export const workflow2 = workflow("worfklow2", () => ..);

The Workflow Construct was renamed to Service :

const myService = new eventual.Service(stack, "myService", {
  entry: require.resolve("test-app-runtime/lib/my-workflow.js"),
});

So, what should we do with EventualApi?

new eventual.EventualApi(stack, "api", {
  services: [myWorkflow, openAccount],
});
  1. move it into the Service Construct and just orient the CLI tooling around a single service?
  2. rename it to ServiceHub or ServiceManager (something like that)?

I am leaning towards (1) - orient everything around a single micro-service.

CLI/Timeline: Timeline is throwing 500s

Note: there is no way to debug

sussmans@SurfacePro8:~/eventual$ AWS_PROFILE=sussman ./packages/@eventual/cli/bin/eventual.js timeline eventual-tests timedOut/01GM4W31MVPNB72QT9CTB4QABQ --debug
βœ” Visualiser running on http://localhost:3000

image

Activity Control intrinsic functions

It should be possible to control activities from the calling workflow or from other workflows.

  • Cancel - semantic cancellation, throws an error
  • Fail - throw error
  • Complete - successful result

Intrinsic

api.any(async () => {
   // or event handler or workflow
   await cancelActivity(activityToken, reason);
   await failActivity(activityToken, error, message);
   await completeActivity(activityToken, result);
})

Instance

const myAct = activity(...);
workflow(() => {
   // start activity
   const actInstance = myAct();
   
   // cancel, error, or complete the workflow
   await actInstance.cancel(reason);
   await actInstance.fail(new Error()); // or actInstance.fail(error, message);
   await actInstance.complete(result);

  await actInstance; 
});

Design: Async Activities, Timeout, and Heartbeat

Problem Statement:

As a developer/workflow author, I want to create activities that run for indefinite amounts of time, involve human interaction, invoke other services, or wait for the result of outside actions. I should be able to ensure inconsistencies and fault are recoverable from. I should be able to use the service to support idempotency of partial failures.

Stories:

  • Runtime Decision - a single activity can return either sync results or async results, decided at runtime.
  • Durable - async activities can be retried, can be given a timeout, and a heartbeat
  • Heartbeat - activities may be stuck before it's total timeout. Use a heartbeat to report that the processing is continuing to operate.
  • Complete By Client - Any service with permissions to write to the workflow should be able to complete an activity with data
  • Fail by Client - Any service with permissions to write to the workflow should be able to fail an activity with an error message
  • Consistent Token - Activities should generate a token used to complete, fail, or heartbeat them. That token should encode information for the workflow to interact with the right workflow and activity.
  • Checkpointing - future - - Activities may fail during processing, let the activity report a value on heart beat that is given back to the activity when the workflow is resumed.
  • Cancellation - ??

Strawman

workflow(() => {
	const result = await act1();
});

act1 = activity<{ result: string }>({
   heartbeat: { seconds: number },
   timeout: { seconds: number }
}, (context: Context): { result: string } | AsyncToken => {
   ...doSomeWork...

   await sendToQueue({ token: context.activity.token });

   return makeAsync();
})

// lambda function consuming the queue
const workflowClient = new WorkflowClient();
export const handler = async (event) => {
    Promise.all(event.Records.map(async (record) => {
        const payload = JSON.parse(record.body);
        
        // complete the activity with a payload
	    await workflowClient.completeActivity<typeof act1>(
		    payload.token,
		    { result: "done"}
		);

		// or fail
		await workflowClient.failActivity(payload.token, {result: "done"});
    }));
}

with heartbeat

workflow(() => {
	const result = await act1();
});

act1 = activity<{ result: string }>({
   heartbeat: { seconds: 20 },
   timeout: { seconds: 160 }
}, (context: Context): { result: string } | AsyncToken => {
   ...doSomeWork...

   await sendToQueue({ token: context.activity.token });

   await sendHeartbeat();

   return makeAsync();
})

// lambda function consuming the queue
const workflowClient = new WorkflowClient();
export const handler = async (event) => {
    Promise.all(event.Records.map(async (record) => {
        const payload = JSON.parse(record.body);

		while(true) {
		   // some long process
		   await workflowClient.heartbeatActivity(payload.token);
		}
        
        // complete the activity with a payload
	    await workflowClient.completeActivity<typeof act1>(
		    payload.token,
		    { result: "done"}
		);
    }));
}

with heartbeat checkpoint - FUTURE

workflow(() => {
	const act = act1();

    act.onHeartbeat(async ({ i: 100 }) => {
	    await reportProgress(i);
    });
});

const reportProceess = activity(...);

const act1 = activity<{ result: string }, { i: 100 } | undefined>({
   heartbeat: { seconds: 20 },
   timeout: { seconds: 160 }
}, (context: Context): { result: string } | AsyncToken => {
   ...doSomeWork...

   await sendToQueue({ token: context.activity.token, start: context.checkpoint });

	// should this be on the context to be typed?
   await sendHeartbeat();

   return makeAsync();
})

// lambda function consuming the queue
const workflowClient = new WorkflowClient();
export const handler = async (event) => {
    Promise.all(event.Records.map(async (record) => {
        const payload = JSON.parse(record.body);

		const items = [...];

		const start = event.start ?? 0;

		for(const i of items.slice()) {
		   // some long process
		   await workflowClient.heartbeatActivity<typeof act1>(
			   payload.token,
			   { i }
		   );
		}
        
        // complete the activity with a payload
	    await workflowClient.completeActivity<typeof act1>(
		    payload.token,
		    { result: "done"}
		);
    }));
}

CLI: History should emit a table and add `--json` to stdout

History events are truncated.

    payload: { amount: 1000 },
    signalId: 'AddMoney',
    timestamp: '2022-12-16T01:55:19.767Z'
  },
  { type: 'TaskStarted', timestamp: '2022-12-16T01:55:19.909Z' },
  { type: 'TaskCompleted', timestamp: '2022-12-16T01:55:20.135Z' },
  { type: 'TaskStarted', timestamp: '2022-12-16T01:55:20.307Z' },
  { type: 'TaskCompleted', timestamp: '2022-12-16T01:55:20.349Z' },
  { type: 'TaskStarted', timestamp: '2022-12-16T01:55:50.327Z' },
  ... 75 more items
]

Should print to stdout on --json, show a table otherwise, and support pagination.

Timeouts

Timeouts give the workflow author control over the durability of their own workflow. By setting timeouts, the author guarantees that they will get control back in the event of an unexpected issue.

  • Definition Time Timeouts (activity and workflow) #64
  • Invocation Time Timeouts (activity and workflow) myActivity.withTimeoutSeconds(100)
  • Arbitrary Timeouts Block timeout(100, async () => { myActivity() })

What can timeout?

Anything that blocks the workflow should be able to timeout.

  • Workflow itself - fail the workflow after X time
  • Child Workflow - throw an error when waiting for a child workflow
  • Activity - throw an error if an activity does not return before X time
  • Condition - throw an error if a condition does not resolve before X time
  • Expect Signal - throw an error if a signal is not received in expected time

Are all timeouts equal?

Yes, in all cases timeouts are a mechanism that tasks the Orchestrator with waiting for time time and then interacting with the workflow. In all cases, the orchestrator interacts with the workflow by failing an Eventual (chain, call, etc). In the case of a workflow timeout, the top level Chain is failed. In the case of an activity, the activity call being waited on is failed.

What about more complex situations like needing multiple actions to return an X time?

Anything can be given a timeout using the sleepFor/sleepUntil calls.

Note: Promise.any/race is not yet implemented. #62

const complexThing = chain(async () => {
    await act1();
    await act2();
});
Promise.race(complexThing(), sleepFor()); // or Promise.any()

How do we configure operation level timeouts?

const act1 = activity("act1", { timeoutSeconds: 1000 }, () => {}); // times out after 1000 seconds

// times out after 1000 seconds
const wf1 = workflow("wf1", { timeoutSeconds: 1000 }, () => {
    
     await expectSignal("mySignal", { timeoutSeconds: 100 }); // times out after 100 seconds if signal is not recieved
     await condition({ timeoutSeconds: 100 }}, () => x === y); // times out after 100 seconds if the condition has yet to be met
     await act1() // times out after 1000 seconds if does not return.
     await act1.withOptions({ timeoutSeconds: 100 })(); // overrides the timeout to 100 seconds if the activity has not completed
});

// runs until completion or error, no timeout
const wf2 = workflow("wf2", () => {
    const child = await wf1(); // will fail after 1000 seconds
    const child2  = await wf1.withOptions({ childTimeoutSeconds: 100 })(); // will fail after 100 seconds
});

Note: see below (Workflow timeout vs Child Workflow) for details about child workflow timeout options.

Workflow timeout vs Child Workflow

An interesting case is the difference between the workflow having a top level timeout and a workflow timing out on the operation of it's child. Under the current assumption that a failed workflow does not terminate it's children (workflows it started) the timeout of a child workflow eventual on the parent should not fail the child.

In the below example, if the child continues to run for 100 seconds without returning a completed message, an error will be thrown from the await (reject). The child will continue to run, may even success, and the parent, in theory, could still query for it's result again.

const childWf = workflow("childWf", () => { ... long running things... });
const parentWf = workflow("parentWf", () => {
    await childWf.withOptions({ timeoutSeconds: 100 })(); // throw an error to the parent after 100 seconds, child will continue
});

If the child itself times out, it will also throw an error in the parent.

const childWf = workflow("childWf", { timeoutSeconds: 100 }, () => { ... long running things... });
const parentWf = workflow("parentWf", () => {
    await childWf(); // throw an error to the parent after 100 seconds, child will is failed with timeout
});

Note: if the child is never awaited on a timeout will be ignored

const childWf = workflow("childWf", { timeoutSeconds: 100 }, () => { ... long running things... });
const parentWf = workflow("parentWf", () => {
    const child = childWf(); // child times out after 100 seconds, nothing happens here

   ...doOtherWork...

   await child; // throws
});

We will allow the configuration of two different timeouts on the child workflow to handle the ambiguity.

  • Execution Timeout Seconds - the time the workflow will run before it times out, overrides the timeout on the workflow definition
  • Child Timeout Seconds - the max time the current workflow will wait from the start of the child (ScheduleChildWorkflow event) to the it being resolved or failed.
const childWf = workflow("childWf", { timeoutSeconds: 1000 }, () => { ... long running things... });
const parentWf = workflow("parentWf", () => {
    await childWf(); // the execution will timeout after 1000 seconds
    await childWf.withOptions({ executionTimeoutSeconds: 100 })(); // the execution will timeout after 100 seconds
    await childWf.withOptions({ childTimeoutSeconds: 100 })(); // the execution will timeout after 1000 seconds, but the parent will fail the eventual after 100 seconds
});

What is the `id` in a `WorkflowTask`

What is the purpose of the id in the WorkflowTask?

export interface WorkflowTask {
  executionId: string;
  id: string;
  events: HistoryStateEvents[];
}

The service currently uses a ULID for this value - it's not deterministic, therefore we can't use it for anything valuable?

Activity Branding

Add custom colors and logos to any activity or integration. Show them on the visualization(s).

Event with ActivityToken and Signal with ActivityToken

Support sending an event or a signal from the workflow with an activity token without starting a workflow.

note: not sure how to make this ergonomic

In an example I have...

const RequestApprovalEvent =
  event<RequestApprovalEventPayload>("RequestApproval");

// auto approval for the human request approval event.
RequestApprovalEvent.on(async (event) => {
  await requestApproval.complete({
    activityToken: event.token,
    result: { approve: true },
  });
});

workflow(async () => {
  await requestApproval({
    price,
    recommendation: decision,
    symbol,
  });
});

const requestApproval = activity(
  "requestApproval",
  async (event: Omit<RequestApprovalEventPayload, "token">) => {
    return asyncResult<{ approve: boolean }>(async (token) => {
      await RequestApprovalEvent.publish({ ...event, token });
    });
  }
);

Should be able to do this like...

const requestApprovalEventActivity = asyncEventActivity((token, data: Omit<RequestApprovalEventPayload, "token">) => ({ 
   ...data, 
   token
}));

workflow(async () => {
  await requestApprovalEventActivity({
    price,
    recommendation: decision,
    symbol,
  });
});

Clients are greedy when it comes to environment variables

Our API clients require all environment variables even when a user only need a subset of the API operations

Our configureXYZ metrics reflect what they ideally need but not what they actually need, causing errors in the development API

public configureRecordHistory(func: Function) {
  this.grantRecordHistory(func);
  addEnvironment(func, {
    [ENV_NAMES.EXECUTION_HISTORY_BUCKET]: this.history.bucketName,
  });
}

Yes, recording history only needs the bucket but you can't construct the API without the WORKFLOW_QUEUE_URL etc.

Design: Command Resolved Values.

Problem Statement

The workflow may need to make use of values that are generated immediately by a command's execution, but do not require the action to be "complete".

Example

Working with a child workflow

A parent workflow MUST be able to manipulate the child, like a parent/child thread (fork). ex: send signals to the child, cancel the child, and await it's result (join).

const wf = workflow("wf1", () => {});

const wf2 = workflow("wf2", () => {
   const child = wf();

   child.sendSignal("MySignal"); // error: how does send signal know the ID?

   sendSignal(child.executionId, "MySignal"); // error: executionId property cannot be known here as scheduleWorkflow and sendSignal are executed in parallel

   await child();
})

In the above examples, the commands output would look like

ScheduleWorkflow
SendSignal // how do I get the execution id?
SendSignal // how do I get the execution id?

Solutions

1) All dynamic properties must be it's own command

This is what Temporal does, they have startChild and executeChild that do the same as startWorkflow and executeWorkflow below. https://docs.temporal.io/typescript/workflows#child-workflows

const wf2 = workflow("wf2", () => {
   const childHandle = await wf.startExecution(); // returns a execution handle with the ID.
   const child = childHandle.result() // returns a promise related to the end of the workflow.

   childHandle.sendSignal("MySignal");
   sendSignal(childHandle.executionId, "MySignal");

   return await child; //waits for completion and returns result
})
  1. Create two tiers of startWorkflow calls (ex: startWorkflow and executeWorkflow)
  2. startWorkflow returns a handle with functions and properties
  3. executeWorkflow just returns an awaitable that awaits the results.
  4. to use signals, cancel children, etc, the startWorkflow API must be used

Pros: we know it would work
Cons: kind of ugly, might need to take multiple loops of awaiting and resolving

2) Deterministic Execution IDs

Currently the execution name of the workflow is dynamically generated. If we made the name deterministically generated, the executionId could be generated.

Pros: simple?
Cons: Race condition if the signal is sent before the child workflow is started

3) Token/Reference system

const wf2 = workflow("wf2", () => {
   const child = wf(); // returns a execution handle with the ID and a "Promise/Eventual"

   child.sendSignal("MySignal"); 
   sendSignal(child.executionId, "MySignal");

   console.log(child.executionId) // { ref: "ExecutionId", seq: 0 }

   return await child; // returns result
})
  1. dynamic properties like executionId can return Tokens (like CDK) which resolve to a property we expect to exist after a command is called.
  2. Update the command logic to resolve in topological order with best attempt parallelism
  3. Commands can emit attributes the Tokens point to.
  4. All emitting operations like logging and attributes must resolve the Tokens before sending.

Pros: cleanest solution, enforces dependency order of commands with best attempt concurrency
Cons: Most complex implementation, may limit command execution parallelism (in rare cases), need to carefully resolve tokens

4) Don't allow signaling/cancelling children

Pros: no work
Cons: lose a strong use case

Drop NodeJSFunction in favor of a custom bundling script

We currently have a mix of NodeJSFunction and custom bundling behavior. I think we should drop NodeJSFunction in favor of the eventual-bundle script within @eventual/compiler for the following reasons:

  1. NodeJSFunction runs synchronously and is slow, eventual-bundle can perform all bundles in parallel
  2. NodeJSFunction is tied to AWS CDK and doesn't help us support Pulumi, Terraform or CloudFlare

CLI: Use EVENTUAL_SERVICE env var

  • Pick up service from an env var rather than needing to be specified every command.
  • Provide --service flag to override
  • pickup from dotenv

Reference Based result payloads

Data returned from activities and workflows may be larger than can we stored in dynamo and sqs.

  • Save values into s3, dynamo, or inline using an abstracted client based on size.
  • Dereference result values as needed for the workflow to operate. Dereference into s3 for workflow access.

Abort retrying on fatal errors

As an example, have this recurring error currently occuring:

Executions failed: \nremindMe/01GM3GQ6A6MCZ65XEF3G0GMR78: Error: no such workflow with name 'remindMe'

Presumably since the code was disappeared after a sleep/schedule was put in place.

In fatal cases such as these, with no hope of recovery, we'd want retries to be abandoned

eventual start --tail has bugs in output

  • undefined
  • [object Object]
βœ” Execution id: sleepy/01GKP51QXFE2TEZSE1MWNZ29AG
β„Ή undefined - WorkflowStarted- [object Object]
β„Ή undefined - TaskStarted
β„Ή undefined - TaskCompleted
β„Ή undefined - TaskStarted
β„Ή undefined - TaskCompleted
β„Ή undefined - SleepScheduled
β„Ή undefined - SleepScheduled
β„Ή undefined - WorkflowCompleted
β„Ή undefined - SleepScheduled
β„Ή undefined - SleepScheduled
βœ” Workflow complete

Event and Conditions

Introduces:

  • Event (.on)
  • Workflow (.ref)
  • ExecutionRef (.send(signal))
  • waitFor(signal)
  • condition(() => boolean)
import { workflow, WorkflowHandler } from "@eventual/core";

declare class Event<Payload = void> {
  constructor(public readonly id: string) {}
  on(handler: (payload: Payload) => Promise<void> | void): Promise<void>;
}

type EventPayload<E extends Event<any>> = E extends Event<infer P> ? P : never;

declare function waitForEvent<E extends Event<any>>(
  event: E,
  opts: { timeoutSeconds: number }
): Promise<EventPayload<E>>;
declare function condition(
  predicate: () => boolean,
  opts: { timeoutSeconds: number }
): Promise<void>;

const event = new Event<number>("event");
const doneEvent = new Event("done");

declare module "@eventual/core" {
  interface Workflow<F extends WorkflowHandler = WorkflowHandler> {
    ref(executionId: string): ExecutionRef;
    startExecution(input: Parameters<F>[0]): Promise<ExecutionRef>;
  }

  interface ExecutionRef {
    send<E extends Event<any>>(event: E, payload: EventPayload<E>): void;
  }
}

/**
 * the parent workflow uses thr `waitForEvent` function to block and wait for events from it's child workflow.
 */
export const workflow1 = workflow("workflow1", async () => {
  const child = await workflow2.startExecution({ name: "child" });
  while (true) {
    const n = await waitForEvent(event);

    console.log(n);

    if (n > 10) {
      child.send(doneEvent, undefined);
      break;
    }

    child.send(event, n + 1);
  }

  return "done";
});

/**
 * The child workflow shows a different way of having events using handlers, conditions, and local state.
 */
export const workflow2 = workflow(
  "workflow2",
  async (input: { name: string }, { execution: { parentId } }) => {
    let block = false;
    let done = false;
    let last = 0;

    if (!parentId) {
      throw new Error("I need an adult");
    }

    console.log(`Hi, I am ${input.name}`);
    const parent = workflow1.ref(parentId);

    await event.on((n) => {
      last = n;
      block = false;
    });
    await doneEvent.on(() => {
      done = true;
      block = false;
    });

    while (!done) {
      parent.send(event, last + 1);
      block = true;
      await condition(() => !block);
    }

    return "done";
  }
);

Interpreting activties in timeline

    I am little worried this won't scale with all of the Eventuals.
  • Activity - Scheduled, Completed, Failed, TimeoutOut, HeartbeatTimeout - Has Seq
  • Sleep - Started, Complete - Has Seq
  • Workflow - Schedule, Completed, Failed, TimeoutOut - Has Seq
  • Condition - Started - Has Seq
  • ExpectSingal - Started, Timedout - Has Seq
  • Signal - Received - Has ID
  • Events - Published - Has Seq

I think I would... group by seq and then use heuristics to determine 1. start 2. end and 3. other events
For Signals, just show them on the graph on their own (like start and end).

const groups = groupBy(events.filter(isHistoryEvent), event => event.seq);
const eventuals = Object.keys(groups).map(seq => {
    const started = groups[seq].find(isScheduledEvent);
    const ended = groups[seq].find(or(isCompletedEvent, isFailedEvent, isWorkflowTimedOut));
    const other = groups[seq].filter(event => event !== started && event !== ended);
    return { started, ended, other };
});
const otherEvents = events.filter(or(isSignalReceived, isWorkflowStarted, isWorkflowFailed, isWorkflowCompleted, isWorkflowTimedOut));

Originally posted by @thantos in #84 (comment)

Optimize startWorkflow procedure

The startWorkflow procedure currently runs the following operations in series:

  1. put the ExecutionRecord to DynamoDB
  2. write the History event to DynamoDB
  3. (finally) submit the WorkflowStarted event to SQS

Is it safe to do (3) before (1) or (2)?

The orchestrator doesn't rely on the ExecutionRecord or History event in DynamoDB to function correctly? Can't we write them last? Or all in parallel?

Activity Token Security

The activity token is currently base64 JSON object which contains the execution id and seq number of the activity.

This has a few issues:

  1. not secure - could be faked
  2. the base64 representation will grow with the execution id

Options:

  1. UUID generated for each activity and stored
  2. KMS encrypt the json payload before base64
  3. Encode the sequence number in a reversable way as to not be predictable

Slack Integration

  • create @eventual/integrations-slack that integrates the Slack API as activities and webhook apis #89
  • write a README.md instruction manual for how to configure a slack bot, maybe provide a manifest? https://api.slack.com/reference/manifests
  • perhaps publish a bot managed by us that simplifies set up? Is this necessary?

Cancel Workflows

Should be able to cancel a running workflow execution

  1. Fail the workflow with CancelationError
  2. cancel all running activities (by the activity looking up the workflow status during heartbeat)
  3. Cancel all children workflows that are running
  4. make no new commands
await cancelWorkflow(executionId, reason);

// cancels itself (?)
throw new CancelWorkflow(reason);

const execution = workflow.startExecution();
await execution.cancel(reason);

const execution = workflow();
await execution.cancel(reason);

Instrument workflow infra

Lambda - EMF to CloudWatch
SQS
Dynamo

  • activity worker - lambda
    • time
    • success
    • failure
    • claim failures
    • put event time
    • send sqs message time
    • claim time
    • r* equest age
  • activity task - by name - lambda
    • time
    • success
    • failure
  • workflow task - lambda
    • time
    • get history time
    • number of events
    • history size
    • interpret time
    • write history time
    • write history size
    • write history event quantity
    • sqs message age
    • write events time
    • write events quantity
      • successful tasks
      • failed tasks
  • workflow executions
    • total time
    • successes
    • failures
  • workflow queue - number of messages

CLI: executions should list in reverse order

Expect the CLI to return the executions in order with the last being the most visible to the user.

Order sees to be by workflow right now.

    "endTime": "2022-12-13T04:11:50.327Z",
    "error": "Timeout",
    "message": "Workflow timed out",
    "startTime": "2022-12-13T04:11:44.928Z",
    "status": "FAILED"
  },
  {
    "id": "slowWorkflow/##EVENTUAL##timedOut-01GM4VP15MF00JHBYJQKGWGQZ1-3",
    "endTime": "2022-12-13T04:25:37.078Z",
    "error": "Timeout",
    "message": "Workflow timed out",
    "startTime": "2022-12-13T04:25:31.559Z",
    "status": "FAILED"
  },
  {
    "id": "slowWorkflow/##EVENTUAL##timedOut-01GM4W0K7B8NSHRPDA17B6NMSZ-3",
    "endTime": "2022-12-13T04:31:22.490Z",
    "error": "Timeout",
    "message": "Workflow timed out",
    "startTime": "2022-12-13T04:31:17.104Z",
    "status": "FAILED"
  },
  {
    "id": "slowWorkflow/##EVENTUAL##timedOut-01GM4W31MVPNB72QT9CTB4QABQ-3",
    "endTime": "2022-12-13T04:32:44.069Z",
    "error": "Timeout",
    "message": "Workflow timed out",
    "startTime": "2022-12-13T04:32:38.350Z",
    "status": "FAILED"
  },
  {
    "id": "timedOut/01GKYJ0W6CT10B8PX5GRRVKWYK",
    "endTime": "2022-12-10T17:41:18.064Z",
    "result": {
      "condition": true,
      "signal": true,
      "activity": true,
      "workflow": true
    },
    "startTime": "2022-12-10T17:41:12.530Z",
    "status": "COMPLETE"
  },
  {
    "id": "timedOut/01GKYJQHQAQ0Y43P9MF2VYAXKY",
    "endTime": "2022-12-10T17:53:40.515Z",
    "result": {
      "condition": true,
      "signal": true,
      "activity": true,
      "workflow": true
    },
    "startTime": "2022-12-10T17:53:35.468Z",
    "status": "COMPLETE"
  },
  {
    "id": "timedOut/01GKYMZKP4VRGF1MRR12W869QD",
    "endTime": "2022-12-10T18:33:01.873Z",
    "result": {
      "condition": true,
      "signal": true,
      "activity": true,
      "workflow": true
    },
    "startTime": "2022-12-10T18:32:56.776Z",
    "status": "COMPLETE"
  },
  {
    "id": "timedOut/01GKYNFZ7G4NGY809PDCCSD9G3",
    "endTime": "2022-12-10T18:41:58.977Z",
    "result": {
      "condition": true,
      "signal": true,
      "activity": true,
      "workflow": true
    },
    "startTime": "2022-12-10T18:41:52.883Z",
    "status": "COMPLETE"
  },
  {
    "id": "timedOut/01GKYPM48VS8WTA0FWV15ZJYG1",
    "endTime": "2022-12-10T19:01:42.995Z",
    "result": {
      "condition": true,
      "signal": true,
      "activity": true,
      "workflow": true
    },
    "startTime": "2022-12-10T19:01:37.697Z",
    "status": "COMPLETE"
  },
  {
    "id": "timedOut/01GM4RS8GPZX9M4XDD5KQ46Y8A",
    "endTime": "2022-12-13T03:35:00.300Z",
    "result": {
      "condition": true,
      "signal": true,
      "activity": true,
      "workflow": true
    },
    "startTime": "2022-12-13T03:34:49.624Z",
    "status": "COMPLETE"
  },
  {
    "id": "timedOut/01GM4S12P6KK4YKY7FWW95YP5Q",
    "endTime": "2022-12-13T03:39:15.523Z",
    "result": {
      "condition": true,
      "signal": true,
      "activity": true,
      "workflow": true
    },
    "startTime": "2022-12-13T03:39:05.799Z",
    "status": "COMPLETE"
  },
  {
    "id": "timedOut/01GM4SKS42MVCG8Z4W5PV46V4R",
    "endTime": "2022-12-13T03:49:27.761Z",
    "result": {
      "condition": true,
      "signal": true,
      "activity": true,
      "workflow": true
    },
    "startTime": "2022-12-13T03:49:18.596Z",
    "status": "COMPLETE"
  },
  {
    "id": "timedOut/01GM4T7BYSW1AGK1FSNB1PSTTH",
    "endTime": "2022-12-13T04:00:10.066Z",
    "result": {
      "condition": true,
      "signal": true,
      "activity": true,
      "workflow": true
    },
    "startTime": "2022-12-13T04:00:00.474Z",
    "status": "COMPLETE"
  },
  {
    "id": "timedOut/01GM4TWSXK50FEBNFFQPDB3S04",
    "endTime": "2022-12-13T04:11:51.620Z",
    "result": {
      "condition": true,
      "signal": true,
      "activity": true,
      "workflow": true
    },
    "startTime": "2022-12-13T04:11:42.900Z",
    "status": "COMPLETE"
  },
  {
    "id": "timedOut/01GM4VP15MF00JHBYJQKGWGQZ1",
    "endTime": "2022-12-13T04:25:37.701Z",
    "result": {
      "condition": true,
      "signal": true,
      "activity": true,
      "workflow": true
    },
    "startTime": "2022-12-13T04:25:29.526Z",
    "status": "COMPLETE"
  },
  {
    "id": "timedOut/01GM4W0K7B8NSHRPDA17B6NMSZ",
    "endTime": "2022-12-13T04:31:23.684Z",
    "result": {
      "condition": true,
      "signal": true,
      "activity": true,
      "workflow": true
    },
    "startTime": "2022-12-13T04:31:15.693Z",
    "status": "COMPLETE"
  },
  {
    "id": "timedOut/01GM4W31MVPNB72QT9CTB4QABQ",
    "endTime": "2022-12-13T04:32:45.475Z",
    "result": {
      "condition": true,
      "signal": true,
      "activity": true,
      "workflow": true
    },
    "startTime": "2022-12-13T04:32:35.997Z",
    "status": "COMPLETE"
  }

Optimize finishActivity

The finishActivity procedure prioritizes the execution history over submitting the SQS message which is slower. But this code can be run in parallel. No part of the orchestrator depends on the dynamodb state, so there is no need to add the overhead.

async function finishActivity(event: ActivityCompleted | ActivityFailed) {
  await timed(metrics, ActivityMetrics.EmitEventDuration, () =>
    executionHistoryClient.putEvent(request.executionId, event)
  );

  await timed(metrics, ActivityMetrics.SubmitWorkflowTaskDuration, () =>
    workflowClient.submitWorkflowTask(request.executionId, event)
  );

  logActivityCompleteMetrics(isWorkflowFailed(event), event.duration);
}

State primitive

A useState like call.

workflow(() => {
   const state = new State<"IN_PROGRESS" | "SUCCESS">("IN_PROGRESS");

   signal.on(() => {
      state.update("SUCCESS");
   });

   await condition(() => state.value === "IN_PROGRESS");

   return "DONE";
})

Or

  • State
  • Entity
  • LocalEntity

Temporal has a concept called a "trigger" which is a promise that can be resolved. Entity/State would be similar, but support any value and could be "resolved"/set multiple times.

This would works well with conditions. Conditions do not trigger Workflow Runs on their own, they need to be triggered by something. State will trigger a Complete/Update event when the status changes, in order of the status change.

We could also make shared entities and complex entities, which get closer to DurableEntities/actors.

Broadcast Event

To go with Event (#49), which has Singlecast semantics via execution.send, we should support a broadcast semantic.

const ping = new Event<string>("ping");
const pong = new Event<string>("pong");

const worker = workflow(({ id: number }, { execution: { parentId } }) => {
    const parent = controller.ref(parentId);
    const sub = event.on(() => {
         parent.send(pong, { ack: 1 });
    });

   await sleep(3 * 60);

   if(id === 1) {
       sub.dispose(); // unsubscribe;
   }
})

const controller = workflow(() => {
    const workers = [1,2,3].map((i) => worker.startWorkflow());

    let resposnes = 0;

    pong.on(() => { response++ });

    // send to specific executions
    ping.sendMany(workers, "hello");

    await condition(() => responses === workers.len);

    // send to anyone listening
    ping.broadcast("hello");
})

Idea: Promise utilities

  • abort
  • resolve

reject

let x = 0;

const cond1 = condition(() => x === 1);

event.on(() => {
   eventual.reject(cond1);
});

if(await cond1) {
   x = 2;
}

resolve

let x = 0;

const cond1 = condition(() => x === 1);

 // when reject is called, cond1 returns a false response, even if the condition is met later.
event.on(() => {
   eventual.resolve(cond1);
});

if(await cond1) {
   x = 2; // when resolve is called, cond1 returns a true response, even if the condition is not met.
}

The alternative is using race and some local state.

let cancel = false;

event.on(() => {
   eventual.resolve(cond1);
});

const cond1 = race(
   condition(() => x === 1), 
   condtion(() => cancel === true).then(() => false)
);

if(await cond1) {
    x = 2;
}

CLI: --tail sometimes throws an error

sussmans@SurfacePro8:~/eventual$ AWS_PROFILE=sussman ./packages/@eventual/cli/bin/eventual.js start test-runner test-runner ./examples/lambda-test-runner/runtime/test-files/test-cases.json --tail
βœ” Execution id: test-runner/01GM6TS7P83T039M83J69WKSFB
βœ– No events at all. Check your execution id
sussmans@SurfacePro8:~/eventual$ AWS_PROFILE=sussman ./packages/@eventual/cli/bin/eventual.js start test-runner test-runner ./examples/lambda-test-runner/runtime/test-files/test-cases.json --tail
βœ” Execution id: test-runner/01GM6TSRFJ0SMCMDSYM13QY87E
β„Ή 2022-12-13T22:48:32.032Z - WorkflowStarted- {
  target: {
    name: 'arn:aws:lambda:us-east-1:983725774087:function:example-test-runner-testFunction483F4CBE-nQBI88y30Jfk',
    type: 'lambda'
  },
  testCases: [ { input: [Object], expected: 3 } ]
}
β„Ή 2022-12-13T22:48:32.146Z - TaskStarted
β„Ή 2022-12-13T22:48:32.220Z - TaskCompleted
β„Ή 2022-12-13T22:48:32.220Z - ActivityScheduled- runTestCase
β„Ή 2022-12-13T22:48:32.392Z - ActivityCompleted- [object Object]
β„Ή 2022-12-13T22:48:32.434Z - TaskStarted
β„Ή 2022-12-13T22:48:32.453Z - WorkflowCompleted
β„Ή 2022-12-13T22:48:32.453Z - TaskCompleted
βœ” Workflow complete
{"passed":0,"failed":1,"errored":0,"results":[{"status":"fail","actual":{"0":51},"expected":3}]}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    πŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. πŸ“ŠπŸ“ˆπŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❀️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.