Giter Club home page Giter Club logo

hatchet's Introduction

Hatchet Logo

A Distributed, Fault-Tolerant Task Queue

Docs License: MIT Go Reference NPM Downloads

Discord Twitter GitHub Repo stars

Documentation · Website · Issues

What is Hatchet?

Hatchet replaces difficult to manage legacy queues or pub/sub systems so you can design durable workloads that recover from failure and solve for problems like concurrency, fairness, and rate limiting. Instead of managing your own task queue or pub/sub system, you can use Hatchet to distribute your functions between a set of workers with minimal configuration or infrastructure:

What Makes Hatchet Great?

  • ⚡️ Ultra-low Latency and High Throughput Scheduling: Hatchet is built on a low-latency queue (25ms average start), perfectly balancing real-time interaction capabilities with the reliability required for mission-critical tasks.

  • ☮️ Concurrency, Fairness, and Rate Limiting: Implement FIFO, LIFO, Round Robin, and Priority Queues with Hatchet’s built-in strategies, designed to circumvent common scaling pitfalls with minimal configuration. Read Docs →

  • 🔥🧯 Resilience by Design: With customizable retry policies and integrated error handling, Hatchet ensures your operations recover swiftly from transient failures. You can break large jobs down into small tasks so you can finish a run without rerunning work. Read Docs →

Enhanced Visibility and Control:

  • Observability. All of your runs are fully searchable, allowing you to quickly identify issues. We track latency, error rates, or custom metrics in your run.
  • (Practical) Durable Execution. Replay events and manually pick up execution from specific steps in your workflow.
  • Cron. Set recurring schedules for functions runs to execute.
  • One-Time Scheduling. Schedule a function run to execute at a specific time and date in the future.
  • Spike Protection. Smooth out spikes in traffic and only execute what your system can handle.
  • Incremental Streaming. Subscribe to updates as your functions progress in the background worker.

Example Use Cases:

  • Fairness for Generative AI: Don't let busy users overwhelm your system. Hatchet lets you distribute requests to your workers fairly with configurable policies.
  • Batch Processing for Document Indexing: Hatchet can handle large-scale batch processing of documents, images, and other data and resume mid-job on failure.
  • Workflow Orchestration for Multi-Modal Systems: Hatchet can handle orchestrating multi-modal inputs and outputs, with full DAG-style execution.
  • Correctness for Event-Based Processing: Respond to external events or internal events within your system and replay events automatically.

Quick Start

Hatchet supports your technology stack with open-source SDKs for Python, Typescript, and Go. To get started, see the Hatchet documentation here, or check out our quickstart repos:

SDK repositories

Hatchet comes with a native Go SDK. The following SDKs are also available:

If you encounter any issues with the SDKs, please submit an issue in the respective repository.

Is there a managed cloud version of Hatchet?

Yes, we are offering a have a cloud version to select companies while in beta who are helping to build and shape the product. Please reach out or request access for more information.

Is there a self-hosted version of Hatchet?

Yes, instructions for self-hosting our open source docker containers can be found in our documentation. Please reach out if you're interested in support.

How does this compare to alternatives (Celery, BullMQ)?

Why build another managed queue? We wanted to build something with the benefits of full transactional enqueueing - particularly for dependent, DAG-style execution - and felt strongly that Postgres solves for 99.9% of queueing use-cases better than most alternatives (Celery uses Redis or RabbitMQ as a broker, BullMQ uses Redis). Since the introduction of SKIP LOCKED and the milestones of recent PG releases (like active-active replication), it's becoming more feasible to horizontally scale Postgres across multiple regions and vertically scale to 10k TPS or more. Many queues (like BullMQ) are built on Redis and data loss can occur when suffering OOM if you're not careful, and using PG helps avoid an entire class of problems.

We also wanted something that was significantly easier to use and debug for application developers. A lot of times the burden of building task observability falls on the infra/platform team (for example, asking the infra team to build a Grafana view for their tasks based on exported prom metrics). We're building this type of observability directly into Hatchet.

Issues

Please submit any bugs that you encounter via Github issues. However, please reach out on Discord before submitting a feature request - as the project is very early, we'd like to build a solid foundation before adding more complex features.

I'd Like to Contribute

See the contributing docs here, and please let us know what you're interesting in working on in the #contributing channel on Discord. This will help us shape the direction of the project and will make collaboration much easier!

hatchet's People

Contributors

abelanger5 avatar aeswibon avatar dependabot[bot] avatar eltociear avatar grutt avatar steebchen avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

hatchet's Issues

bug: improve error message in event push

This should return forbidden only when the query can't find the tenant or the authentication is wrong. Otherwise, the error message should be different and an internal or unavailable error should be returned. This can happen when the database query returns an error (e.g. due to it being unavailable).

if err != nil {
a.l.Debug().Err(err).Msg("error validating tenant token")
return nil, forbidden
}
// get the tenant id
queriedTenant, err := a.config.Repository.Tenant().GetTenantByID(tenantId)
if err != nil {
a.l.Debug().Err(err).Msg("error getting tenant by id")
return nil, forbidden
}

An important part is this line

if err != nil {

as this needs to be checked for a NotFound error, and then it should return a forbidden error if something is off with the jwt, and in all other cases a generic error with a 500 status code

bug(engine): goroutine leaks

List of goroutine leaks which are currently ignored, but should be investigate if they could become a problem.

Related code:

goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"),
goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"),
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/transport.(*controlBuffer).get"),

Goroutine leaks:

  • go.opencensus.io/stats/view.(*worker).start

    [Goroutine 6 in state select, with go.opencensus.io/stats/view.(*worker).start on top of the stack:
    go.opencensus.io/stats/view.(*worker).start(0xc0002feb80)
      /Users/steebchen/.asdf/installs/golang/1.21.5/packages/pkg/mod/[email protected]/stats/view/worker.go:292 +0x128
    created by go.opencensus.io/stats/view.init.0 in goroutine 1
      /Users/steebchen/.asdf/installs/golang/1.21.5/packages/pkg/mod/[email protected]/stats/view/worker.go:34 +0xf4
    
  • google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run

     Goroutine 23 in state select, with google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run on top of the stack:
    google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run(0xc000373500, {0x102fde440, 0xc00004aaf0})
      /Users/steebchen/.asdf/installs/golang/1.21.5/packages/pkg/mod/google.golang.org/[email protected]/internal/grpcsync/callback_serializer.go:76 +0x150
    created by google.golang.org/grpc/internal/grpcsync.NewCallbackSerializer in goroutine 54
      /Users/steebchen/.asdf/installs/golang/1.21.5/packages/pkg/mod/google.golang.org/[email protected]/internal/grpcsync/callback_serializer.go:52 +0x1f8
    
  • google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run

     Goroutine 24 in state select, with google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run on top of the stack:
    google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run(0xc000373530, {0x102fde440, 0xc00004ab40})
      /Users/steebchen/.asdf/installs/golang/1.21.5/packages/pkg/mod/google.golang.org/[email protected]/internal/grpcsync/callback_serializer.go:76 +0x150
    created by google.golang.org/grpc/internal/grpcsync.NewCallbackSerializer in goroutine 54
      /Users/steebchen/.asdf/installs/golang/1.21.5/packages/pkg/mod/google.golang.org/[email protected]/internal/grpcsync/callback_serializer.go:52 +0x1f8
    
  • google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run

     Goroutine 25 in state select, with google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run on top of the stack:
    google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run(0xc000373560, {0x102fde440, 0xc00004ab90})
      /Users/steebchen/.asdf/installs/golang/1.21.5/packages/pkg/mod/google.golang.org/[email protected]/internal/grpcsync/callback_serializer.go:76 +0x150
    created by google.golang.org/grpc/internal/grpcsync.NewCallbackSerializer in goroutine 54
      /Users/steebchen/.asdf/installs/golang/1.21.5/packages/pkg/mod/google.golang.org/[email protected]/internal/grpcsync/callback_serializer.go:52 +0x1f8
     Goroutine 40 in state IO wait, with internal/poll.runtime_pollWait on top of the stack:
    internal/poll.runtime_pollWait(0x14d67ce38, 0x72)
      /Users/steebchen/.asdf/installs/golang/1.21.5/go/src/runtime/netpoll.go:343 +0xa0
    internal/poll.(*pollDesc).wait(0xc00052c0a0, 0xc0000dd200?, 0x0)
      /Users/steebchen/.asdf/installs/golang/1.21.5/go/src/internal/poll/fd_poll_runtime.go:84 +0xb8
    internal/poll.(*pollDesc).waitRead(...)
      /Users/steebchen/.asdf/installs/golang/1.21.5/go/src/internal/poll/fd_poll_runtime.go:89
    internal/poll.(*FD).Read(0xc00052c080, {0xc0000dd200, 0x900, 0x900})
      /Users/steebchen/.asdf/installs/golang/1.21.5/go/src/internal/poll/fd_unix.go:164 +0x2e0
    net.(*netFD).Read(0xc00052c080, {0xc0000dd200, 0x900, 0x900})
      /Users/steebchen/.asdf/installs/golang/1.21.5/go/src/net/fd_posix.go:55 +0x48
    net.(*conn).Read(0xc00051e018, {0xc0000dd200, 0x900, 0x900})
      /Users/steebchen/.asdf/installs/golang/1.21.5/go/src/net/net.go:179 +0x8c
    crypto/tls.(*atLeastReader).Read(0xc00069a870, {0xc0000dd200, 0x900, 0x900})
      /Users/steebchen/.asdf/installs/golang/1.21.5/go/src/crypto/tls/conn.go:805 +0x7c
    bytes.(*Buffer).ReadFrom(0xc00060a2a8, {0x102fd69b8, 0xc00069a870})
      /Users/steebchen/.asdf/installs/golang/1.21.5/go/src/bytes/buffer.go:211 +0xf4
    crypto/tls.(*Conn).readFromUntil(0xc00060a000, {0x14d67cf30?, 0xc00051e018}, 0x5)
      /Users/steebchen/.asdf/installs/golang/1.21.5/go/src/crypto/tls/conn.go:827 +0x18c
    crypto/tls.(*Conn).readRecordOrCCS(0xc00060a000, 0x0)
      /Users/steebchen/.asdf/installs/golang/1.21.5/go/src/crypto/tls/conn.go:625 +0x480
    crypto/tls.(*Conn).readRecord(...)
      /Users/steebchen/.asdf/installs/golang/1.21.5/go/src/crypto/tls/conn.go:587
    crypto/tls.(*Conn).Read(0xc00060a000, {0xc0005e2000, 0x8000, 0x50000c0003bfb38?})
      /Users/steebchen/.asdf/installs/golang/1.21.5/go/src/crypto/tls/conn.go:1369 +0x1c4
    bufio.(*Reader).Read(0xc000594d80, {0xc000596200, 0x9, 0x9})
      /Users/steebchen/.asdf/installs/golang/1.21.5/go/src/bufio/bufio.go:244 +0x390
    io.ReadAtLeast({0x102fd5e58, 0xc000594d80}, {0xc000596200, 0x9, 0x9}, 0x9)
      /Users/steebchen/.asdf/installs/golang/1.21.5/go/src/io/io.go:335 +0xcc
    io.ReadFull(...)
      /Users/steebchen/.asdf/installs/golang/1.21.5/go/src/io/io.go:354
    golang.org/x/net/http2.readFrameHeader({0xc000596200, 0x9, 0x9}, {0x102fd5e58, 0xc000594d80})
      /Users/steebchen/.asdf/installs/golang/1.21.5/packages/pkg/mod/golang.org/x/[email protected]/http2/frame.go:237 +0x68
    golang.org/x/net/http2.(*Framer).ReadFrame(0xc0005961c0)
      /Users/steebchen/.asdf/installs/golang/1.21.5/packages/pkg/mod/golang.org/x/[email protected]/http2/frame.go:498 +0xbc
    google.golang.org/grpc/internal/transport.(*http2Client).reader(0xc00057c240, 0xc00060a000?)
      /Users/steebchen/.asdf/installs/golang/1.21.5/packages/pkg/mod/google.golang.org/[email protected]/internal/transport/http2_client.go:1587 +0x1e8
    created by google.golang.org/grpc/internal/transport.newHTTP2Client in goroutine 26
      /Users/steebchen/.asdf/installs/golang/1.21.5/packages/pkg/mod/google.golang.org/[email protected]/internal/transport/http2_client.go:398 +0x2234
    
  • google.golang.org/grpc/internal/transport.(*controlBuffer).get

     Goroutine 41 in state select, with google.golang.org/grpc/internal/transport.(*controlBuffer).get on top of the stack:
    google.golang.org/grpc/internal/transport.(*controlBuffer).get(0xc0004e4460, 0x1)
      /Users/steebchen/.asdf/installs/golang/1.21.5/packages/pkg/mod/google.golang.org/[email protected]/internal/transport/controlbuf.go:418 +0x120
    google.golang.org/grpc/internal/transport.(*loopyWriter).run(0xc0000e77a0)
      /Users/steebchen/.asdf/installs/golang/1.21.5/packages/pkg/mod/google.golang.org/[email protected]/internal/transport/controlbuf.go:552 +0xf0
    google.golang.org/grpc/internal/transport.newHTTP2Client.func6()
      /Users/steebchen/.asdf/installs/golang/1.21.5/packages/pkg/mod/google.golang.org/[email protected]/internal/transport/http2_client.go:452 +0x114
    created by google.golang.org/grpc/internal/transport.newHTTP2Client in goroutine 26
      /Users/steebchen/.asdf/installs/golang/1.21.5/packages/pkg/mod/google.golang.org/[email protected]/internal/transport/http2_client.go:450 +0x288c
    
  • google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run

     Goroutine 55 in state select, with google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run on top of the stack:
    google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run(0xc00012b920, {0x102fde440, 0xc00004ac30})
      /Users/steebchen/.asdf/installs/golang/1.21.5/packages/pkg/mod/google.golang.org/[email protected]/internal/grpcsync/callback_serializer.go:76 +0x150
    created by google.golang.org/grpc/internal/grpcsync.NewCallbackSerializer in goroutine 10
      /Users/steebchen/.asdf/installs/golang/1.21.5/packages/pkg/mod/google.golang.org/[email protected]/internal/grpcsync/callback_serializer.go:52 +0x1f8
    
  • google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run

     Goroutine 56 in state select, with google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run on top of the stack:
    google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run(0xc00012b950, {0x102fde440, 0xc00004ad70})
      /Users/steebchen/.asdf/installs/golang/1.21.5/packages/pkg/mod/google.golang.org/[email protected]/internal/grpcsync/callback_serializer.go:76 +0x150
    created by google.golang.org/grpc/internal/grpcsync.NewCallbackSerializer in goroutine 10
      /Users/steebchen/.asdf/installs/golang/1.21.5/packages/pkg/mod/google.golang.org/[email protected]/internal/grpcsync/callback_serializer.go:52 +0x1f8
    
  • google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run

     Goroutine 57 in state select, with google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run on top of the stack:
    google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run(0xc00012b980, {0x102fde440, 0xc00004adc0})
      /Users/steebchen/.asdf/installs/golang/1.21.5/packages/pkg/mod/google.golang.org/[email protected]/internal/grpcsync/callback_serializer.go:76 +0x150
    created by google.golang.org/grpc/internal/grpcsync.NewCallbackSerializer in goroutine 10
      /Users/steebchen/.asdf/installs/golang/1.21.5/packages/pkg/mod/google.golang.org/[email protected]/internal/grpcsync/callback_serializer.go:52 +0x1f8
     Goroutine 61 in state IO wait, with internal/poll.runtime_pollWait on top of the stack:
    internal/poll.runtime_pollWait(0x14d67cd40, 0x72)
      /Users/steebchen/.asdf/installs/golang/1.21.5/go/src/runtime/netpoll.go:343 +0xa0
    internal/poll.(*pollDesc).wait(0xc00021dda0, 0xc0000ddb00?, 0x0)
      /Users/steebchen/.asdf/installs/golang/1.21.5/go/src/internal/poll/fd_poll_runtime.go:84 +0xb8
    internal/poll.(*pollDesc).waitRead(...)
      /Users/steebchen/.asdf/installs/golang/1.21.5/go/src/internal/poll/fd_poll_runtime.go:89
    internal/poll.(*FD).Read(0xc00021dd80, {0xc0000ddb00, 0x900, 0x900})
      /Users/steebchen/.asdf/installs/golang/1.21.5/go/src/internal/poll/fd_unix.go:164 +0x2e0
    net.(*netFD).Read(0xc00021dd80, {0xc0000ddb00, 0x900, 0x900})
      /Users/steebchen/.asdf/installs/golang/1.21.5/go/src/net/fd_posix.go:55 +0x48
    net.(*conn).Read(0xc00051e0d0, {0xc0000ddb00, 0x900, 0x900})
      /Users/steebchen/.asdf/installs/golang/1.21.5/go/src/net/net.go:179 +0x8c
    crypto/tls.(*atLeastReader).Read(0xc000155d10, {0xc0000ddb00, 0x900, 0x900})
      /Users/steebchen/.asdf/installs/golang/1.21.5/go/src/crypto/tls/conn.go:805 +0x7c
    bytes.(*Buffer).ReadFrom(0xc00060b0a8, {0x102fd69b8, 0xc000155d10})
      /Users/steebchen/.asdf/installs/golang/1.21.5/go/src/bytes/buffer.go:211 +0xf4
    crypto/tls.(*Conn).readFromUntil(0xc00060ae00, {0x14d67cf30?, 0xc00051e0d0}, 0x5)
      /Users/steebchen/.asdf/installs/golang/1.21.5/go/src/crypto/tls/conn.go:827 +0x18c
    crypto/tls.(*Conn).readRecordOrCCS(0xc00060ae00, 0x0)
      /Users/steebchen/.asdf/installs/golang/1.21.5/go/src/crypto/tls/conn.go:625 +0x480
    crypto/tls.(*Conn).readRecord(...)
      /Users/steebchen/.asdf/installs/golang/1.21.5/go/src/crypto/tls/conn.go:587
    crypto/tls.(*Conn).Read(0xc00060ae00, {0xc000440000, 0x8000, 0x50000c0005258b8?})
      /Users/steebchen/.asdf/installs/golang/1.21.5/go/src/crypto/tls/conn.go:1369 +0x1c4
    bufio.(*Reader).Read(0xc00018f140, {0xc0000d63c0, 0x9, 0x9})
      /Users/steebchen/.asdf/installs/golang/1.21.5/go/src/bufio/bufio.go:244 +0x390
    io.ReadAtLeast({0x102fd5e58, 0xc00018f140}, {0xc0000d63c0, 0x9, 0x9}, 0x9)
      /Users/steebchen/.asdf/installs/golang/1.21.5/go/src/io/io.go:335 +0xcc
    io.ReadFull(...)
      /Users/steebchen/.asdf/installs/golang/1.21.5/go/src/io/io.go:354
    golang.org/x/net/http2.readFrameHeader({0xc0000d63c0, 0x9, 0x9}, {0x102fd5e58, 0xc00018f140})
      /Users/steebchen/.asdf/installs/golang/1.21.5/packages/pkg/mod/golang.org/x/[email protected]/http2/frame.go:237 +0x68
    golang.org/x/net/http2.(*Framer).ReadFrame(0xc0000d6380)
      /Users/steebchen/.asdf/installs/golang/1.21.5/packages/pkg/mod/golang.org/x/[email protected]/http2/frame.go:498 +0xbc
    google.golang.org/grpc/internal/transport.(*http2Client).reader(0xc00057cd80, 0xc00060ae00?)
      /Users/steebchen/.asdf/installs/golang/1.21.5/packages/pkg/mod/google.golang.org/[email protected]/internal/transport/http2_client.go:1587 +0x1e8
    created by google.golang.org/grpc/internal/transport.newHTTP2Client in goroutine 14
      /Users/steebchen/.asdf/installs/golang/1.21.5/packages/pkg/mod/google.golang.org/[email protected]/internal/transport/http2_client.go:398 +0x2234
    
  • google.golang.org/grpc/internal/transport.(*controlBuffer).get

     Goroutine 62 in state select, with google.golang.org/grpc/internal/transport.(*controlBuffer).get on top of the stack:
    google.golang.org/grpc/internal/transport.(*controlBuffer).get(0xc00004b540, 0x1)
      /Users/steebchen/.asdf/installs/golang/1.21.5/packages/pkg/mod/google.golang.org/[email protected]/internal/transport/controlbuf.go:418 +0x120
    google.golang.org/grpc/internal/transport.(*loopyWriter).run(0xc000520d20)
      /Users/steebchen/.asdf/installs/golang/1.21.5/packages/pkg/mod/google.golang.org/[email protected]/internal/transport/controlbuf.go:552 +0xf0
    google.golang.org/grpc/internal/transport.newHTTP2Client.func6()
      /Users/steebchen/.asdf/installs/golang/1.21.5/packages/pkg/mod/google.golang.org/[email protected]/internal/transport/http2_client.go:452 +0x114
    created by google.golang.org/grpc/internal/transport.newHTTP2Client in goroutine 14
      /Users/steebchen/.asdf/installs/golang/1.21.5/packages/pkg/mod/google.golang.org/[email protected]/internal/transport/http2_client.go:450 +0x288c
    ]
    

feat(engine): add core service which monitors system heartbeats

Tickers, dispatchers and workers all use heartbeats to report their status. While the tickers/dispatchers are designed to gracefully shut down after SIGTERM, there's no recovery if the tickers/dispatchers are unexpectedly terminated. The engine needs a core service which can handle components which aren't reporting their status.

Chore: set up golangci-lint

My recommendation is to set up golangci-lint with a few reasonable linters to check for code issues early directly in the editor, and run CI to block merging if it fails.

bug(frontend/app): replaying individual step run does not update workflowRun.status

Problem: We're invalidating the workflow run when replaying an individual step, but the refetched data workflow-runs/$run/index.tsx:34 does not properly represent the state (always succeeded in the case of a succeded run). Because of this, the refetchInterval is never set to

I've temporarily hotfix in #159 by always setting the refetch interval to 1s, but this is not a good solution in the long run.

api: backend should automatically populate the default timeout for steps

From the dashboard, step runs which time out after 5 minutes are incorrectly reporting the following:

This step was cancelled because it exceeded its timeout of 60s

Instead of computing the default timeout on the frontend as well, the backend should just populate the default timeout, so we keep the defaults as a single source of truth.

Chore: set up gosec

I've been running gosec locally occasionally to check for unhandled errors and for loop pointer bugs, would be great to have this as part of CI.

engine: can't render step outputs with a `-` in them

For example, this will fail because the - is not permitted as a field reference:

name: example-workflow
version: 0.1.0
triggers:
  events:
    - user:create
jobs:
  post-user-create:
    steps:
      - id: example-step
        action: default:action1
        with:
          object: '{{ .input.json }}'
      - id: example-step-2
        action: default:action2
        with:
          object: '{{ .steps.example-step.json }}'

refactor: assignment of `ticker` and `dispatcher` to steps and jobs

Need a better way to manage the assignment of jobs and step runs to tickers and dispatchers, respectively.

Here's an example of how a ticker is currently assigned to a job run.

There are a few problems with this:

  1. The ticker is assigned in the database before the ticker has acknowledged that it's registered that job run/started a timeout process.
  2. The JobsController shouldn't need to be aware of how to assign tickers to a job or select a ticker for the job run. The jobs controller should message the task queue that a job run is available, and it should be picked up by an available ticker.

The same problems exist for scheduled workflows, crons, and step runs. In particular, the admin service shouldn't be registering/unregistering tickers for old and new workflow versions.

feat: support parallel step execution

It's necessary to support steps which can:

  1. Execute the same step with different inputs in parallel.
  2. Execute different steps with the same input in parallel.

From a client perspective, the Go SDK should be able to detect parallel steps based on the output from a previous step and the input to the current step. For example, if one step has the following output:

func stepOne(ctx context.Context) ([]stepOneOutput, error)

And the second step has the following input:

func stepTwo(ctx context.Context, in *stepOneOutput) error

Then stepTwo should be run for each array element resulting from stepOne.

feat(go-sdk): middleware support at the `worker` and `service` level

Support middleware that can wrap each action with access to the function's context.Context.

It should be possible to register this globally with worker.Use, and per-service with service.Use.

For example:

worker.Use(func (ctx context.Context, next func(ctx context.Context) error) error {
  // can set values in context, handle and throw errors, etc
  next(ctx)
  return nil
})

Panic middleware (#71) should be present by default, but overwritten as needed.

Feat: add hoverable timestamp to relative dates

Relative dates are intuitive, but it would be great to support a hoverable component which shows the precise timestamp. Could replace all relativeDate calls with a shared component which shows the relative date by default, and the precise timestamp on hover. The precise timestamp should also be copyable, it sucks when they're not.

bug(engine): multiple step invocation after workflow delete

I had 2 workflows with step1, then deleted 1, but it's still registered on the server so step 1 was being dispatched twice regardless.

In other words, if a workflow is put and then the code is removed but the step is still registered in another workflow this step will still be invoked twice.

bug(python-sdk): auto-versioning

Problem: existing code in the admin client has incorrect logic for determining the strategy for putting and versioning a workflow.

If auto_version isn't set, should_put should be true if the workflow.version does not equal existing_workflow.versions[0].version, or if there is no existing_workflow. So perhaps something like:

def determine_workflow_update(auto_version, workflow, existing_workflow):
    if not auto_version and existing_workflow and existing_workflow.versions:
        return workflow.version != existing_workflow.versions[0].version, workflow.version
    if workflow.version == "":
        return True, "v0.1.0"
    if existing_workflow and existing_workflow.versions:
        new_version = bump_minor_version(
            existing_workflow.versions[0].version)
        should_put = new_version != workflow.version
        return [should_put, new_version]

    return [True, workflow.version]

This seems to match up much better with the Go SDK:

shouldPut := opts.autoVersion

	if err != nil {
		// if not found, create
		if statusErr, ok := status.FromError(err); ok && statusErr.Code() == codes.NotFound {
			shouldPut = true
		} else {
			return fmt.Errorf("could not get workflow: %w", err)
		}

		if workflow.Version == "" && opts.autoVersion {
			req.Opts.Version = "0.1.0"
		}
	} else {
		// if there are no versions, exit
		if len(apiWorkflow.Versions) == 0 {
			return fmt.Errorf("found workflow, but it has no versions")
		}

		// get the workflow version to determine whether to update
		if apiWorkflow.Versions[0].Version != workflow.Version {
			shouldPut = true
		}

		if workflow.Version == "" && opts.autoVersion {
			req.Opts.Version, err = bumpMinorVersion(apiWorkflow.Versions[0].Version)

			if err != nil {
				return fmt.Errorf("could not bump version: %w", err)
			}
		}
	}

Originally posted by @abelanger5 in #122 (comment)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.