Giter Club home page Giter Club logo

zeebe-client-node-js's Introduction

Zeebe Node.js Client

Compatible with: Camunda Platform 8 Community Extension Lifecycle License

DEPRECATED

This package is deprecated. Please use the official SDK package @camunda8/sdk. See: https://github.com/camunda/camunda-8-js-sdk

This is a Node.js gRPC client for Zeebe, the workflow engine in Camunda Platform 8. It is written in TypeScript and transpiled to JavaScript in the dist directory.

Comprehensive API documentation is available online.

See CHANGELOG.md to see what has changed with each release.

Get a hosted instance of Zeebe on Camunda Cloud.

Table of Contents

Quick Start

Connection Behaviour

Connecting to a Broker

Job Workers

Client Commands

Other Concerns

Programming with Safety

Development of the Library itself

Quick Start

Install

Add the Library to your Project

npm i zeebe-node

For Zeebe broker versions prior to 1.0.0:

npm i zeebe-node@0

Refer to here for the documentation for the pre-1.0.0 version of the library.

Get Broker Topology

const ZB = require('zeebe-node')

void (async () => {
	const zbc = new ZB.ZBClient()
	const topology = await zbc.topology()
	console.log(JSON.stringify(topology, null, 2))
})()

Deploy a process

const ZB = require('zeebe-node')
const fs = require('fs')

void (async () => {
	const zbc = new ZB.ZBClient() // localhost:26500 || ZEEBE_GATEWAY_ADDRESS

	const res = await zbc.deployProcess('./domain-mutation.bpmn')
	console.log(res)

	// Deploy multiple with an array of filepaths
	await zbc.deployProcess(['./wf1.bpmn', './wf2.bpmn'])

	const buffer = fs.readFileSync('./wf3.bpmn')

	// Deploy from an in-memory buffer
	await zbc.deployProcess({ definition: buffer, name: 'wf3.bpmn' })
})()

Start and service a process

This code demonstrates how to deploy a Zeebe process, create a process instance, and handle a service task using the Zeebe Node.js client. The 'get-customer-record' service task worker checks for the presence of a customerId variable, simulates fetching a customer record from a database, and completes the task with a customerRecordExists variable.

// Import the Zeebe Node.js client and the 'fs' module
const ZB = require('zeebe-node');
const fs = require('fs');

// Instantiate a Zeebe client with default localhost settings or environment variables
const zbc = new ZB.ZBClient();

// Create a Zeebe worker to handle the 'get-customer-record' service task
const worker = zbc.createWorker({
    // Define the task type that this worker will process
    taskType: 'get-customer-record',
    // Define the task handler to process incoming jobs
    taskHandler: job => {
        // Log the job variables for debugging purposes
        console.log(job.variables);

        // Check if the customerId variable is missing and return an error if so
        if (!job.variables.customerId) {
            return job.error('NO_CUSTID', 'Missing customerId in process variables');
        }

        // Add logic to retrieve the customer record from the database here
        // ...

        // Complete the job with the 'customerRecordExists' variable set to true
        return job.complete({
            customerRecordExists: true
        });
    }
});

// Define an async main function to deploy a process, create a process instance, and log the outcome
async function main() {
    // Deploy the 'new-customer.bpmn' process
    const res = await zbc.deployProcess('./new-customer.bpmn');
    // Log the deployment result
    console.log('Deployed process:', JSON.stringify(res, null, 2));

    // Create a process instance of the 'new-customer-process' process, with a customerId variable set
    // 'createProcessInstanceWithResult' awaits the outcome
    const outcome = await zbc.createProcessInstanceWithResult({
        bpmnProcessId: 'new-customer-process',
        variables: { customerId: 457 }
    });
    // Log the process outcome
    console.log('Process outcome', JSON.stringify(outcome, null, 2));
}

// Call the main function to execute the script
main();

Versioning

To enable that the client libraries can be easily supported to the Zeebe server we map the version numbers, so that Major, Minor match the server application. Patches are independent and indicate client updates.

NPM Package version 0.26.x supports Zeebe 0.22.x to 0.26.x.

NPM Package version 1.x supports Zeebe 1.x. It uses the C-based gRPC library by default.

NPM Package version 2.x supports Zeebe 1.x, and requires Node >= 16.6.1, >=14.17.5, or >=12.22.5. It removes the C-based gRPC library and uses the pure JS implementation.

Compatible Node Versions

Version 1.x of the package: Node versions <=16.x. Version 1.x uses the C-based gRPC library and does not work with Node 17. The C-based gRPC library is deprecated and no longer being maintained.

Version 2.x and later of the package: Node versions 12.22.5+, 14.17.5+, or 16.6.1+. Version 2.x uses the pure JS implementation of the gRPC library, and requires a fix to the nghttp2 library in Node (See #201).

Breaking changes in Zeebe 8.1.0

All deprecated APIs are removed in the 8.1.0 package version. If your code relies on deprecated methods and method signatures, you need to use a package version prior to 8.1.0 or update your application code.

Breaking changes in Zeebe 1.0.0

For Zeebe brokers prior to 1.0.0, use the 0.26.z version of zeebe-node. This README documents the Zeebe 1.0.0 API. The previous API is documented here.

Zeebe 1.0.0 contains a number of breaking changes, including the gRPC protocol and the API surface area. You must use a 1.x.y version of the client library with Zeebe 1.0.0 and later.

The pre-1.0.0 API of the Node client has been deprecated, but not removed. This means that your pre-1.0.0 applications should still work, just by changing the version of zeebe-node in the package.json.

gRPC Implementation

From version 2.x, the Zeebe Node client uses the pure JS gRPC client implementation.

For version 1.x, the Zeebe Node client uses the C gRPC client implementation grpc-node by default. The C-based gRPC implementation is deprecated and is not being maintained.

Type difference from other Zeebe clients

Protobuf fields of type int64 are serialised as type string in the Node library. These fields are serialised as numbers (long) in the Go and Java client. See grpc/#7229 for why the Node library serialises them as string. The Process instance key, and other fields that are of type long in other client libraries, are type string in this library. Fields of type int32 are serialised as type number in the Node library.

A note on representing timeout durations

All timeouts are ultimately communicated in milliseconds. They can be specified using the primitive type number, and this is always a number of milliseconds.

All timeouts in the client library can also, optionally, be specified by a time value that encodes the units, using the typed-durations package. You can specify durations for timeouts like this:

const { Duration } = require('zeebe-node')

const timeoutS = Duration.seconds.of(30) // 30s timeout
const timeoutMs = Duration.milliseconds.of(30000) // 30s timeout in milliseconds

Using the value types makes your code more semantically specific.

There are five timeouts to take into account.

The first is the job timeout. This is the amount of time that the broker allocates exclusive responsibility for a job to a worker instance. By default, this is 60 seconds. This is the default value set by this client library. See "Job Workers".

The second is the requestTimeout. Whenever the client library sends a gRPC command to the broker, it has an explicit or implied requestTimeout. This is the amount of time that the gRPC gateway will wait for a response from the broker cluster before returning a 4 DEADLINE gRPC error response.

If no requestTimeout is specified, then the configured timeout of the broker gateway is used. Out of the box, this is 15 seconds by default.

The most significant use of the requestTimeout is when using the createProcessInstanceWithResult command. If your process will take longer than 15 seconds to complete, you should specify a requestTimeout. See "Start a Process Instance and await the Process Outcome".

The third is the longpoll duration. This is the amount of time that the job worker holds a long poll request to activate jobs open.

The fourth is the maximum back-off delay in client-side gRPC command retries. See "Client-side gRPC retry in ZBClient".

Finally, the connectionTolerance option for ZBClient can also take a typed duration. This value is used to buffer reporting connection errors while establishing a connection - for example with Camunda SaaS, which requires a token exchange as part of the connection process.

Connection Behaviour

Client-side gRPC retry in ZBClient

If a gRPC command method fails in the ZBClient - such as ZBClient.deployProcess or ZBClient.topology(), the underlying gRPC library will throw an exception.

If no workers have been started, this can be fatal to the process if it is not handled by the application logic. This is especially an issue when a worker container starts before the Zeebe gRPC gateway is available to service requests, and can be inconsistent as this is a race condition.

To mitigate against this, the Node client implements some client-side gRPC operation retry logic by default. This can be configured, including disabled, via configuration in the client constructor.

  • Operations retry, but only for gRPC error codes 8 and 14 - indicating resource exhaustion (8) or transient network failure (14). Resource exhaustion occurs when the broker starts backpressure due to latency because of load. Network failure can be caused by passing in an unresolvable gateway address (14: DNS Resolution failed), or by the gateway not being ready yet (14: UNAVAILABLE: failed to connect to all addresses).
  • Operations that fail for other reasons, such as deploying an invalid bpmn file or cancelling a process that does not exist, do not retry.
  • Retry is enabled by default, and can be disabled by passing { retry: false } to the client constructor.
  • Values for retry, maxRetries and maxRetryTimeout can be configured via the environment variables ZEEBE_CLIENT_RETRY, ZEEBE_CLIENT_MAX_RETRIES and ZEEBE_CLIENT_MAX_RETRY_TIMEOUT respectively.
  • maxRetries and maxRetryTimeout are also configurable through the constructor options, or through environment variables. By default, if not supplied, the values are:
const { ZBClient, Duration } = require('zeebe-node')

const zbc = new ZBClient(gatewayAddress, {
    retry: true,
    maxRetries: -1, // infinite retries
    maxRetryTimeout: Duration.seconds.of(5)
})

The environment variables are:

ZEEBE_CLIENT_MAX_RETRIES
ZEEBE_CLIENT_RETRY
ZEEBE_CLIENT_MAX_RETRY_TIMEOUT

Retry is provided by promise-retry, and the back-off strategy is simple ^2.

Additionally, the gRPC Client will continually reconnect when in a failed state, such as when the gateway goes away due to pod rescheduling on Kubernetes.

Eager Connection

The ZBClient eagerly connects to the broker by issuing a topology command in the constructor. This allows you an onReady event to be emitted. You can disable this (for example, for testing without a broker), by either passing eagerConnection: false to the client constructor options, or setting the environment variable ZEEBE_NODE_EAGER_CONNECTION to false.

onReady(), onConnectionError(), and connected

The client has a connected property that can be examined to determine if it has a gRPC connection to the gateway.

The client and the worker can take an optional onReady() and onConnectionError() handler in their constructors, like this:

const { ZBClient, Duration } = require('zeebe-node')

const zbc = new ZBClient({
	onReady: () => console.log(`Connected!`),
	onConnectionError: () => console.log(`Disconnected!`)
})

const zbWorker = zbc.createWorker({
    taskType: 'demo-service',
	taskHandler: handler,
    onReady: () => console.log(`Worker connected!`),
    onConnectionError: () => console.log(`Worker disconnected!`)
})

These handlers are called whenever the gRPC channel is established or lost. As the grpc channel will often "jitter" when it is lost (rapidly emitting READY and ERROR events at the transport layer), there is a connectionTolerance property that determines how long the connection must be in a connected or failed state before the handler is called. By default this is 3000ms.

You can specify another value either in the constructor or via an environment variable.

To specify it via an environment variable, set ZEEBE_CONNECTION_TOLERANCE to a number of milliseconds.

To set it via the constructor, specify a value for connectionTolerance like this:

const { ZBClient, Duration } = require('zeebe-node')

const zbc = new ZBClient({
	onReady: () => console.log(`Connected!`),
	onConnectionError: () => console.log(`Disconnected!`),
	connectionTolerance: 5000 // milliseconds
})

const zbWorker = zbc.createWorker({
	taskType: 'demo-service',
	taskHandler: handler,
    onReady: () => console.log(`Worker connected!`),
    onConnectionError: () => console.log(`Worker disconnected!`),
    connectionTolerance: Duration.seconds.of(3.5) // 3500 milliseconds
})

As well as the callback handlers, the client and workers extend the EventEmitter class, and you can attach listeners to them for the 'ready' and 'connectionError' events:

const { ZBClient, Duration } = require('zeebe-node')

const zbc = new ZBClient()

const zbWorker = zbc.createWorker({
	taskType: 'demo-service',
	taskHandler: handler,
    connectionTolerance: Duration.seconds.of(3.5)
})

zbWorker.on('ready', () => console.log(`Worker connected!`))
zbWorker.on('connectionError', () => console.log(`Worker disconnected!`))

Initial Connection Tolerance

Some broker connections can initially emit error messages - for example: when connecting to Camunda SaaS, during TLS negotiation and OAuth authentication, the eager commands used to detect connection status will fail, and the library will report connection errors.

Since this is expected behaviour - a characteristic of that particular connection - the library has a configurable "initial connection tolerance". This is a number of milliseconds representing the expected window in which these errors will occur on initial connection.

If the library detects that you are connecting to Camunda SaaS, it sets this window to five seconds (5000 milliseconds). In some environments and under some conditions this may not be sufficient.

You can set an explicit value for this using the environment variable ZEEBE_INITIAL_CONNECTION_TOLERANCE, set to a number of milliseconds.

The effect of this setting is to suppress connection errors during this window, and only report them if the connection did not succeed by the end of the window.

Connecting to a Broker

TLS

The Node client does not use TLS by default.

Enable a secure connection by setting useTLS: true:

const { ZBClient } = require('zeebe-node')

const zbc = new ZBClient(tlsSecuredGatewayAddress, {
	useTLS: true,
})

Via environment variable:

ZEEBE_SECURE_CONNECTION=true

Using a Self-signed Certificate

You can use a self-signed SSL certificate with the Zeebe client. You need to provide the root certificates, the private key and the SSL cert chain as Buffers. You can pass them into the ZBClient constructor:

const rootCertsPath = '/path/to/rootCerts'
const privateKeyPath = '/path/to/privateKey'
const certChainPath = '/path/to/certChain'

const zbc = new ZBClient({
    useTLS: true,
    customSSL: {
        rootCerts: rootCertsPath,
        privateKey: privateKeyPath,
        certChain: certChainPath
    }
})

Or you can put the file paths into the environment in the following variables:

ZEEBE_CLIENT_SSL_ROOT_CERTS_PATH
ZEEBE_CLIENT_SSL_PRIVATE_KEY_PATH
ZEEBE_CLIENT_SSL_CERT_CHAIN_PATH

Enable TLS

ZEEBE_SECURE_CONNECTION=true

In this case, they will be passed to the constructor automatically.

OAuth

In case you need to connect to a secured endpoint with OAuth, you can pass in OAuth credentials. This will enable TLS (unless you explicitly disable it with useTLS: false), and handle the OAuth flow to get / renew a JWT:

const { ZBClient } = require('zeebe-node')

const zbc = new ZBClient("my-secure-broker.io:443", {
	oAuth: {
		url: "https://your-auth-endpoint/oauth/token",
		audience: "my-secure-broker.io",
        scope: "myScope",
		clientId: "myClientId",
		clientSecret: "randomClientSecret",
		customRootCert: fs.readFileSync('./my_CA.pem'),
		cacheOnDisk: true
	}
}

The cacheOnDisk option will cache the token on disk in $HOME/.camunda, which can be useful in development if you are restarting the service frequently, or are running in a serverless environment, like AWS Lambda.

If the cache directory is not writable, the ZBClient constructor will throw an exception. This is considered fatal, as it can lead to denial of service or hefty bills if you think caching is on when it is not.

The customRootCert argument is optional. It can be used to provide a custom TLS certificate as a Buffer, which will be used while obtaining the OAuth token from the specified URL. If not provided, the CAs provided by Mozilla will be used.

Basic Auth

If you put a proxy in front of the broker with basic auth, you can pass in a username and password:

const { ZBClient } = require('zeebe-node')

const zbc = new ZBClient("my-broker-with-basic-auth.io:443", {
	basicAuth: {
		username: "user1",
		password: "secret",
	},
	useTLS: true
}

Basic Auth will also work without TLS.

Camunda 8 SaaS

Camunda 8 SaaS is a hosted SaaS instance of Zeebe. The easiest way to connect is to use the Zero-conf constructor with the Client Credentials from the Camunda SaaS console as environment variables.

You can also connect to Camunda SaaS by using the camundaCloud configuration option, using the clusterId, clientSecret, and clientId from the Camunda SaaS Console, like this:

const { ZBClient } = require('zeebe-node')

const zbc = new ZBClient({
	camundaCloud: {
		clientId,
		clientSecret,
		clusterId,
		clusterRegion, // optional, defaults to bru-2
	},
})

That's it! Under the hood, the client lib will construct the OAuth configuration for Camunda SaaS and set the gateway address and port for you.

We recommend the Zero-conf constructor with the configuration passed in via environment variables. This allows you to run your application against different environments via configuration.

Zero-Conf constructor

The ZBClient has a 0-parameter constructor that takes the config from the environment. This is useful for injecting secrets into your app via the environment, and switching between development and production environments with no change to code.

To use the zero-conf constructor, you create the client like this:

const { ZBClient } = require('zeebe-node')

const zbc = new ZBClient()

With no relevant environment variables set, it will default to localhost on the default port with no TLS.

The following environment variable configurations are possible with the Zero-conf constructor:

From 8.3.0, multi-tenancy:

ZEEBE_TENANT_ID

Camunda SaaS:

ZEEBE_ADDRESS
ZEEBE_CLIENT_SECRET
ZEEBE_CLIENT_ID
ZEEBE_TOKEN_AUDIENCE
ZEEBE_AUTHORIZATION_SERVER_URL

Self-hosted or local broker (no TLS or OAuth):

ZEEBE_ADDRESS

Self-hosted with self-signed SSL certificate:

ZEEBE_CLIENT_SSL_ROOT_CERTS_PATH
ZEEBE_CLIENT_SSL_PRIVATE_KEY_PATH
ZEEBE_CLIENT_SSL_CERT_CHAIN_PATH
ZEEBE_SECURE_CONNECTION=true

Self-hosted or local broker with OAuth + TLS:

ZEEBE_CLIENT_ID
ZEEBE_CLIENT_SECRET
ZEEBE_TOKEN_AUDIENCE
ZEEBE_TOKEN_SCOPE
ZEEBE_AUTHORIZATION_SERVER_URL
ZEEBE_ADDRESS

Multi-tenant self-hosted or local broker with OAuth and no TLS:

ZEEBE_TENANT_ID='<default>'
ZEEBE_SECURE_CONNECTION=false
ZEEBE_ADDRESS='localhost:26500'
ZEEBE_CLIENT_ID='zeebe'
ZEEBE_CLIENT_SECRET='zecret'
ZEEBE_AUTHORIZATION_SERVER_URL='http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token'
ZEEBE_TOKEN_AUDIENCE='zeebe.camunda.io'
ZEEBE_TOKEN_SCOPE='not needed'
CAMUNDA_CREDENTIALS_SCOPES='Zeebe'
CAMUNDA_OAUTH_URL='http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token'

Basic Auth:

ZEEBE_BASIC_AUTH_PASSWORD
ZEEBE_BASIC_AUTH_USERNAME

Job Workers

Types of Job Workers

There are two different types of job worker provided by the Zeebe Node client:

  • The ZBWorker - this worker operates on individual jobs.
  • The ZBBatchWorker - this worker batches jobs on the client, to allow you to batch operations that pool resources. (This worker was introduced in 0.23.0 of the client).

Much of the information in the following ZBWorker section applies also to the ZBBatchWorker. The ZBBatchWorker section covers the features that differ from the ZBWorker.

The ZBWorker Job Worker

The ZBWorker takes a job handler function that is invoked for each job. It is invoked as soon as the worker retrieves a job from the broker. The worker can retrieve any number of jobs in a response from the broker, and the handler is invoked for each one, independently.

The simplest signature for a worker takes a string task type, and a job handler function.

The job handler receives the job object, which has methods that it can use to complete or fail the job, and a reference to the worker itself, which you can use to log using the worker's configured logger (See Logging).

Note: The second argument is deprecated, and remains for backward-compatibility - it is a complete function. In the 1.0 version of the API, the complete function methods are available on the job object.

const ZB = require('zeebe-node')

const zbc = new ZB.ZBClient()

const zbWorker = zbc.createWorker({
	taskType: 'demo-service',
	taskHandler: handler,
})

function handler(job) {
	zbWorker.log('Task variables', job.variables)

	// Task worker business logic goes here
	const updateToBrokerVariables = {
		updatedProperty: 'newValue',
	}

	return job.complete(updateToBrokerVariables)
}

Here is an example job:

{ key: '578',
  type: 'demo-service',
  jobHeaders:
   { processInstanceKey: '574',
     bpmnProcessId: 'test-process',
     processDefinitionVersion: 1,
     processKey: '3',
     elementId: 'ServiceTask_0xdwuw7',
     elementInstanceKey: '577' },
  customHeaders: '{}',
  worker: 'test-worker',
  retries: 3,
  deadline: '1546915422636',
  variables: { testData: 'something' } }

The worker can be configured with options. To do this, you should use the object parameter constructor.

Shown below are the defaults that apply if you don't supply them:

const { ZBClient, Duration } = require('zeebe-node')

const zbc = new ZBClient()

const zbWorker = zbc.createWorker({
	taskType: 'demo-service',
    taskHandler: handler,
    // the number of simultaneous tasks this worker can handle
    maxJobsToActivate: 32,
    // the amount of time the broker should allow this worker to complete a task
    timeout: Duration.seconds.of(30),
    // One of 'DEBUG', 'INFO', 'NONE'
    loglevel: 'INFO',
    // Called when the connection to the broker cannot be established, or fails
    onConnectionError: () => zbWorker.log('Disconnected')
    // Called when the connection to the broker is (re-)established
    onReady: () => zbWorker.log('Connected.')
})

Unhandled Exceptions in Task Handlers

Note: this behaviour is for the ZBWorker only. The ZBBatchWorker does not manage this.

When a task handler throws an unhandled exception, the library will fail the job. Zeebe will then retry the job according to the retry settings of the task. Sometimes you want to halt the entire process so you can investigate. To have the library cancel the process on an unhandled exception, pass in {failProcessOnException: true} to the createWorker call:

import { ZBClient } from 'zeebe-node'

const zbc = new ZBClient()

zbc.createWorker({
	taskType: 'console-log',
	taskHandler: maybeFaultyHandler,
	failProcessOnException: true,
})

Completing tasks with success, failure, error, or forwarded

To complete a task, the job object that the task worker handler function receives has complete, fail, and error methods.

Call job.complete() passing in a optional plain old JavaScript object (POJO) - a key:value map. These are variable:value pairs that will be used to update the process state in the broker. They will be merged with existing values. You can set an existing key to null or undefined, but there is no way to delete a key.

Call job.fail() to fail the task. You mus t pass in a string message describing the failure. The client library decrements the retry count, and the broker handles the retry logic. If the failure is a hard failure and should cause an incident to be raised in Operate, then pass in 0 for the optional second parameter, retries:

job.fail('This is a critical failure and will raise an incident', 0)

From version 8.0.0 of the package, used with a 8.0.0 Zeebe broker, you can specify to the broker an optional backoff for the reactivation of the job, like this:

job.fail({
	errorMessage: 'Triggering a retry with a two second back-off',
	retryBackOff: 2000,
	retries: 1,
})

Call job.error() to trigger a BPMN error throw event. You must pass in a string error code for the error code, and you can pass an optional error message as the second parameter. If no BPMN error catch event exists for the error code, an incident will be raised.

job.error('RECORD_NOT_FOUND', 'Could not find the customer in the database')

From 8.2.5 of the client, you can update the variables in the workflow when you throw a BPMN error in a worker:

job.error({
    errorCode: 'RECORD_NOT_FOUND',
    errorMessage: 'Could not find the customer in the database',
    variables: {
        someVariable: 'someValue'
    }
})

Call job.forwarded() to release worker capacity to handle another job, without completing the job in any way with the Zeebe broker. This method supports the decoupled job completion pattern. In this pattern, the worker forwards the job to another system - a lambda or a RabbitMQ queue. Some other process is ultimately responsible for completing the job.

Working with Process Variables and Custom Headers

Process variables are available in your worker job handler callback as job.variables, and any custom headers are available as job.customHeaders.

These are read-only JavaScript objects in the Zeebe Node client. However, they are not stored that way in the broker.

Both process variables and custom headers are stored in the broker as a dictionary of named strings. That means that the variables and custom headers are JSON.parsed in the Node client when it fetches the job, and any update passed to the success() function is JSON.stringified.

If you pass in a circular JSON structure to complete() - like, for example the response object from an HTTP call - it will throw, as this cannot be serialised to a string.

To update a key deep in the object structure of a process variable, you can use the deepmerge utility:

const merge = require('deepmerge')
import { ZBClient } from 'zeebe-node'

const zbc = new ZBClient()

zbc.createWorker({
    taskType: 'some-task',
    taskHandler: job => {
        const { people } = job.variables
        // update bob's age, keeping all his other properties the same
        job.complete(merge(people, { bob: { age: 23 } }))
    }
})

When setting custom headers in BPMN tasks, while designing your model, you can put stringified JSON as the value for a custom header, and it will show up in the client as a JavaScript object.

Process variables and custom headers are untyped in the Zeebe broker, however the Node client in TypeScript mode provides the option to type them to provide safety. You can type your worker as any to turn that off:

// No type checking - totally dynamic and unchecked
zbc.createWorker<any>({
    taskType: 'yolo-jobs',
    taskHandler: (job) => {
        console.log(`Look ma - ${job.variables?.anything?.goes?.toUpperCase()}`)
        job.complete({what: job.variables.could.possibly.go.wrong})
    }
})

See the section Writing Strongly-typed Job Workers for more details.

Constraining the Variables Fetched by the Worker

Sometimes you only need a few specific process variables to service a job. One way you can achieve constraint on the process variables received by a worker is by using input variable mappings on the task in the model.

You can also use the fetchVariable parameter when creating a worker. Pass an array of strings, containing the names of the variables to fetch, to the fetchVariable parameter when creating a worker. Here is an example, in JavaScript:

zbc.createWorker({
	taskType: 'process-favorite-albums',
	taskHandler: job => {
		const { name, albums } = job.variables
		console.log(`${name} has the following albums: ${albums.join(', ')}`)
		job.complete()
	},
	fetchVariable: ['name', 'albums'],
})

If you are using TypeScript, you can supply an interface describing the process variables, and parameterize the worker:

interface Variables {
    name: string
    albums: string[]
}

zbc.createWorker<Variables>({
    taskType: 'process-favorite-albums',
    taskHandler: (job) => {
        const { name, albums = [] } = job.variables
        console.log(`${name} has the following albums: ${albums?.join?.(', ')}`)
        job.complete()
    },
    fetchVariable: ['name', 'albums'],
})

This parameterization does two things:

  • It informs the worker about the expected types of the variables. For example, if albums is a string, calling join on it will fail at runtime. Providing the type allows the compiler to reason about the valid methods that can be applied to the variables.
  • It allows the type-checker to pick up spelling errors in the strings in fetchVariable, by comparing them with the Variables typing.

Note, that this does not protect you against run-time exceptions where your typings are incorrect, or the payload simply does not match the definition that you provided.

See the section Writing Strongly-typed Job Workers for more details on run-time safety.

You can turn off the type-safety by typing the worker as any:

zbc.createWorker<any>({
    taskType: 'process-favorite-albums',
    taskHandler: (job) => {
        const { name, albums = [] } = job.variables
        // TS 3.7 safe access to .join _and_ safe call, to prevent run-time exceptions
        console.log(`${name} has the following albums: ${albums?.join?.(', ')}`)
        job.complete()
    },
    fetchVariable: ['name', 'albums'],
})

The "Decoupled Job Completion" pattern

The Decoupled Job Completion pattern uses a Zeebe Job Worker to activate jobs from the broker, and some other asynchronous (remote) system to do the work.

You might activate jobs and then send them to a RabbitMQ queue, or to an AWS lambda. In this case, there may be no outcome about the job that this worker can report back to the broker about success or failure. That will be the responsibility of another part of your distributed system.

The first thing you should do is ensure that you activate the job with sufficient time for the complete execution of your system. Your worker will not be completing the job, but it informs the broker how long the expected loop will take to close.

Next, call job.forward() in your job worker handler. This has no side-effect with the broker - so nothing is communicated to Zeebe. The job is still out there with your worker as far as Zeebe is concerned. What this call does is release worker capacity to request more jobs.

If you are using the Zeebe Node library in the remote system, or if the remote system eventually reports back to you (perhaps over a different RabbitMQ queue), you can use the ZBClient methods completeJob(), failJob(), and throwError() to report the outcome back to the broker.

You need at least the job.key, to be able to correlate the result back to Zeebe. Presumably you also want the information from the remote system about the outcome, and any updated variables.

Here is an example:

  • You have a COBOL system that runs a database.
  • Somebody wrote an adapter for this COBOL database. In executes commands over SSH.
  • The adapter is accessible via a RabbitMQ "request" queue, which takes a command and a correlation id, so that its response can be correlated to this request.
  • The adapter sends back the COBOL database system response on a RabbitMQ "response" queue, with the correlation id.
  • It typically takes 15 seconds for the round-trip through RabbitMQ to the COBOL database and back.

You want to put this system into a Zeebe-orchestrated BPMN model as a task.

Rather than injecting a RabbitMQ listener into the job handler, you can "fire and forget" the request using the decoupled job completion pattern.

Here is how you do it:

  • Your worker gets the job from Zeebe.
  • Your worker makes the command and sends it down the RabbitMQ "request" queue, with the job.jobKey as the correlation id.
  • Your worker calls job.forward()

Here is what that looks like in code:

import { RabbitMQSender } from './lib/my-awesome-rabbitmq-api'
import { ZBClient, Duration } from 'zeebe-node'

const zbc = new ZBClient()

const cobolWorker = zbc.createWorker({
    taskType: 'cobol-insert',
    timeout: Duration.seconds.of(20), // allow 5s over the expected 15s
    taskHandler: job => {
        const { key, variables } = job
        const request = {
            correlationId: key,
            command: `INSERT ${variables.customer} INTO CUSTOMERS`
        }
        RabbitMQSender.send({
            channel: 'COBOL_REQ',
            request
        })
        // Call forward() to release worker capacity
        return job.forward()
    }
)

Now for the response part:

  • Another part of your system listens to the RabbitMQ response queue.
  • It gets a response back from the COBOL adapter.
  • It examines the response, then sends the appropriate outcome to Zeebe, using the jobKey that has been attached as the correlationId
import { RabbitMQListener } from './lib/my-awesome-rabbitmq-api'
import { ZBClient } from 'zeebe-node'

const zbc = new ZBClient()

const RabbitMQListener.listen({
    channel: 'COBOL_RES',
    handler: message => {
        const { outcome, correlationId } = message
        if (outcome.SUCCESS) {
            zbc.completeJob({
                jobKey: correlationId,
                variables: {}
            })
        }
        if (outcome.ERROR) {
            zbc.throwError({
                jobKey: correlationId,
                errorCode: "5",
                errorMessage: "The COBOL Database reported an error. Boo!"
            })
        }
    })
}

See also the section "Publish a Message", for a pattern that you can use when it is not possible to attach the job key to the round trip data response.

The ZBBatchWorker Job Worker

The ZBBatchWorker Job Worker batches jobs before calling the job handler. Its fundamental differences from the ZBWorker are:

  • Its job handler receives an array of one or more jobs.
  • The handler is not invoked immediately, but rather when enough jobs are batched, or a job in the batch is at risk of being timed out by the Zeebe broker.

You can use the batch worker if you have tasks that benefit from processing together, but are not related in the BPMN model.

An example would be a high volume of jobs that require calls to an external system, where you have to pay per call to that system. In that case, you may want to batch up jobs, make one call to the external system, then update all the jobs and send them on their way.

The batch worker works on a first-of batch size or batch timeout basis.

You must configure both jobBatchMinSize and jobBatchMaxTime. Whichever condition is met first will trigger the processing of the jobs:

  • Enough jobs are available to the worker to satisfy the minimum job batch size;
  • The batch has been building for the maximum amount of time - "we're doing this now, before the earliest jobs in the batch time out on the broker".

You should be sure to specify a timeout for your worker that is jobBatchMaxTime plus the expected latency of the external call plus your processing time and network latency, to avoid the broker timing your batch worker's lock and making the jobs available to another worker. That would defeat the whole purpose.

Here is an example of using the ZBBatchWorker:

import { API } from './lib/my-awesome-external-api'
import { ZBClient, BatchedJob, Duration } from 'zeebe-node'

const zbc = new ZBClient()

// Helper function to find a job by its key
const findJobByKey = jobs => key => jobs.filter(job => job.jobKey === id)?.[0] ?? []

const handler = async (jobs: BatchedJob[]) => {
    console.log("Let's do this!")
    const {jobKey, variables} = job
    // Construct some hypothetical payload with correlation ids and requests
    const req = jobs.map(job => ({id: jobKey, data: variables.request}))
    // An uncaught exception will not be managed by the library
    try {
        // Our API wrapper turns that into a request, and returns
        // an array of results with ids
        const outcomes = await API.post(req)
        // Construct a find function for these jobs
        const getJob = findJobByKey(jobs)
        // Iterate over the results and call the succeed method on the corresponding job,
        // passing in the correlated outcome of the API call
        outcomes.forEach(res => getJob(res.id)?.complete(res.data))
    } catch (e) {
        jobs.forEach(job => job.fail(e.message))
    }
}

const batchWorker = zbc.createBatchWorker({
    taskType: 'get-data-from-external-api',
    taskHandler: handler,
    jobBatchMinSize: 10, // at least 10 at a time
    jobBatchMaxTime: 60, // or every 60 seconds, whichever comes first
    timeout: Duration.seconds.of(80) // 80 second timeout means we have 20 seconds to process at least
})

See this blog post for some more details on the implementation.

Long polling

With Zeebe 0.21 onward, long polling is supported for clients, and is used by default. Rather than polling continuously for work and getting nothing back, a client can poll once and leave the request open until work appears. This reduces network traffic and CPU utilization in the server. Every JobActivation Request is appended to the event log, so continuous polling can significantly impact broker performance, especially when an exporter is loaded (see here).

Long polling sends the ActivateJobs command to the broker, and waits for up to the long poll interval for jobs to be available, rather than returning immediately with an empty response if no jobs are available at that moment.

The default long poll duration is 30s.

To use a different long polling duration, pass in a long poll timeout in milliseconds to the client. All workers created with that client will use it. Alternatively, set a period per-worker.

Long polling for workers is configured in the ZBClient like this:

const { ZBClient, Duration } = require('zeebe-node')

const zbc = new ZBClient('serverAddress', {
	longPoll: Duration.minutes.of(10), // Ten minutes - inherited by workers
})

const longPollingWorker = zbc.createWorker({
	taskType: 'task-type',
	taskHandler: handler,
	longPoll: Duration.minutes.of(2), // override client, poll 2m
})

The poll interval is a timer that fires on the configured interval and sends an ActivateJobs command if no pending command is currently active. By default, this is set to 300ms. This guarantees that there will be a minimum of 300ms between ActivateJobs commands, which prevents flooding the broker.

Too many ActivateJobs requests per period of time can cause broker backpressure to kick in, and the gateway to return a GRPC 8 error code.

You can configure this with the pollInterval option in the client constructor, in which case all workers inherit it as their default. You can also override this by specifying a value in the createWorker call:

const zbc = new ZBClient({
	pollInterval: Duration.milliseconds.of(500),
})

const worker = zbc.createWorker({
	taskType: 'send-email',
	taskHandler: sendEmailWorkerHandler,
	pollInterval: Duration.milliseconds.of(750),
})

Client Commands

Deploy Process Models and Decision Tables

From version 8 of Zeebe, deployProcess in deprecated in favor of deployResource which allows you to deploy both process models and DMN tables.

You can deploy a resource as a buffer, or by passing a filename - in which case the client library will load the file into a buffer for you.

Deploy Process Model

By passing a filename, and allowing the client library to load the file into a buffer:

async function deploy() {
	const zbc = new ZBClient()
	const result = await zbc.deployResource({
		processFilename: `./src/__tests__/testdata/Client-DeployWorkflow.bpmn`,
	})
}

By passing a buffer, and a name:

async function deploy() {
	const zbc = new ZBClient()
	const process = fs.readFileSync(
		`./src/__tests__/testdata/Client-DeployWorkflow.bpmn`
	)
	const result = await zbc.deployResource({
		process,
		name: `Client-DeployWorkflow.bpmn`,
	})
}

Deploy DMN Table

By passing a filename, and allowing the client library to load the file into a buffer:

async function deploy() {
	const zbc = new ZBClient()
	const result = await zbc.deployResource({
		decisionFilename: `./src/__tests__/testdata/quarantine-duration.dmn`,
	})
}

By passing a buffer, and a name:

async function deploy() {
	const zbc = new ZBClient()
	const decision = fs.readFileSync(
		`./src/__tests__/testdata/quarantine-duration.dmn`
	)
	const result = await zbc.deployResource({
		decision,
		name: `quarantine-duration.dmn`,
	})
}

Deploy Form

From 8.3.1, you can deploy a form to the Zeebe broker:

async function deploy() {
    const zbc = new ZBClient()
    const form = fs.readFileSync(
		'./src/__tests__/testdata/form_1.form'
	)
	const result = await zbc.deployResource({
		form,
		name: 'form_1.form',
	})
}

Start a Process Instance

const ZB = require('zeebe-node')

;(async () => {
	const zbc = new ZB.ZBClient('localhost:26500')
	const result = await zbc.createProcessInstance({
        bpmnProcessId: 'test-process',
		variables: {
            testData: 'something'
        }
	})
	console.log(result)
})()

Example output:

{ processKey: '3',
  bpmnProcessId: 'test-process',
  version: 1,
  processInstanceKey: '569' }

Start a Process Instance of a specific version of a Process definition

From version 0.22 of the client onward:

const ZB = require('zeebe-node')

;(async () => {
	const zbc = new ZB.ZBClient('localhost:26500')
	const result = await zbc.createProcessInstance({
		bpmnProcessId: 'test-process',
		variables: {
			testData: 'something',
		},
		version: 5,
	})
	console.log(result)
})()

Start a Process Instance and await the Process Outcome

From version 0.22 of the broker and client, you can await the outcome of a process end-to-end execution:

async function getOutcome() {
	const result = await zbc.createProcessInstanceWithResult({
        bpmnProcessId: processId,
        variables: {
		    sourceValue: 5
        }
	})
	return result
}

Be aware that by default, this will throw an exception if the process takes longer than 15 seconds to complete.

To override the gateway's default timeout for a process that needs more time to complete:

const { ZBClient, Duration } = require('zeebe-node')

const zbc = new ZBClient()

const result = await zbc.createProcessInstanceWithResult({
	bpmnProcessId: processId,
	variables: {
		sourceValue: 5,
		otherValue: 'rome',
	},
	requestTimeout: Duration.seconds.of(25),
	// also works supplying a number of milliseconds
	// requestTimeout: 25000
})

Publish a Message

You can publish a message to the Zeebe broker that will be correlated with a running process instance:

const { ZBClient, Duration } = require('zeebe-node');
const uuid = require('uuid');

const zbc = new ZBClient()

zbc.publishMessage({
	correlationKey: 'value-to-correlate-with-process-variable',
	messageId: uuid.v4(),
	name: 'message-name',
	variables: { valueToAddToProcessVariables: 'here', status: 'PROCESSED' },
	timeToLive: Duration.seconds.of(10), // seconds
})

When would you do this? Well, the sky is not even the limit when it comes to thinking creatively about building a system with Zeebe - and here's one concrete example to get you thinking:

Recall the example of the remote COBOL database in the section "The "Decoupled Job Completion" pattern". We're writing code to allow that system to be participate in a BPMN-modelling process orchestrated by Zeebe.

But what happens if the adapter for that system has been written in such a way that there is no opportunity to attach metadata to it? In that case we have no opportunity to attach a job key. Maybe you send the fixed data for the command, and you have to correlate the response based on those fields.

Another example: think of a system that emits events, and has no knowledge of a running process. An example from one system that I orchestrate with Zeebe is Minecraft. A logged-in user in the game performs some action, and code in the game emits an event. I can catch that event in my Node-based application, but I have no knowledge of which running process to target - and the event was not generated from a BPMN task providing a worker with the complete context of a process.

In these two cases, I can publish a message to Zeebe, and let the broker figure out which processes are:

  • Sitting at an intermediate message catch event waiting for this message; or
  • In a sub-process that has a boundary event that will be triggered by this message; or
  • Would be started by a message start event, on receiving this message.

The Zeebe broker correlates a message to a running process instance not on the job key - but on the value of one of the process variables (for intermediate message events) and the message name (for all message events, including start messages).

So the response from your COBOL database system, sans job key, is sent back to Zeebe from the RabbitMQListener not via completeJob(), but with publishMessage(), and the value of the payload is used to figure out which process it is for.

In the case of the Minecraft event, a message is published to Zeebe with the Minecraft username, and that is used by Zeebe to determine which processes are running for that user and are interested in that event.

See the article "Zeebe Message Correlation" for a complete example with code.

Publish a Start Message

You can also publish a message targeting a Message Start Event. In this case, the correlation key is optional, and all Message Start events that match the name property will receive the message.

You can use the publishStartMessage() method to publish a message with no correlation key (it will be set to a random uuid in the background):

const { ZBClient, Duration } = require('zeebe-node');
const uuid = require('uuid');

const zbc = new ZBClient('localhost:26500');
zbc.publishStartMessage({
  messageId: uuid.v4(),
  name: 'message-name',
  variables: { initialProcessVariable: 'here' },
  timeToLive: Duration.seconds.of(10), // seconds
});

Both normal messages and start messages can be published idempotently by setting both the messageId and the correlationKey. They will only ever be correlated once. See: A message can be published idempotent.

Activate Jobs

If you have some use case that doesn't fit the existing workers, you can write your own custom workers using the ZBClient.activateJobs() method. It takes an ActivateJobsRequest object, and returns a stream for that call.

Attach a listener to the stream's 'data' event, and it will be called with an ActivateJobsResponse object if there are jobs to work on.

To complete these jobs, use the ZBClient methods completeJob(), failJob(), and throwError().

For more details, read the source code of the library, particularly the ZBWorkerBase class. This is an advanced use case, and the existing code in the library is the best documentation.

Other Concerns

Graceful Shutdown

To drain workers, call the close() method of the ZBClient. This causes all workers using that client to stop polling for jobs, and returns a Promise that resolves when all active jobs have either finished or timed out.

console.log('Closing client...')
zbc.close().then(() => console.log('All workers closed'))

Logging

Control the log output for the client library by setting the ZBClient log level. Valid log levels are NONE (supress all logging), ERROR (log only exceptions), INFO (general logging), or DEBUG (verbose logging). You can set this in the client constructor:

const zbc = new ZBClient('localhost', { loglevel: 'DEBUG' })

And also via the environment:

ZEEBE_NODE_LOG_LEVEL='ERROR' node start.js

By default the library uses console.info and console.error for logging. You can also pass in a custom logger, such as pino:

const logger = require('pino')()
const zbc = new ZBClient({ stdout: logger })

From version v0.23.0-alpha.1, the library logs human-readable logs by default, using the ZBSimpleLogger. If you want structured logs as stringified JSON, pass in ZBJSONLogger to the constructor stdout option, like this:

const { ZBJsonLogger, ZBClient } = require('zeebe-node')
const zbc = new ZBClient({ stdout: ZBJsonLogger })

You can also control this via environment variables:

export ZEEBE_NODE_LOG_TYPE=SIMPLE  # Simple Logger (default)
export ZEEBE_NODE_LOG_TYPE=JSON  # JSON Logger

Generating TypeScript constants for BPMN Models

Message names and Task Types are untyped magic strings. You can generate type information to avoid some classes of errors.

0.22.0-alpha.5 and above

Install the package globally:

npm i -g zeebe-node

Now you have the command zeebe-node <filename> that parses a BPMN file and emits type definitions.

All versions

The BpmnParser class provides a static method generateConstantsForBpmnFiles(). This method takes a filepath and returns TypeScript definitions that you can use to avoid typos in your code, and to reason about the completeness of your task worker coverage.

const ZB = require('zeebe-node')
;(async () => {
	console.log(await ZB.BpmnParser.generateConstantsForBpmnFiles(processFile))
})()

This will produce output similar to:

// Autogenerated constants for msg-start.bpmn

export enum TaskType = {
    CONSOLE_LOG = "console-log"
};

export enum MessageName = {
    MSG_EMIT_FRAME = "MSG-EMIT_FRAME",
    MSG_START_JOB = "MSG-START_JOB"
};

Generating code from a BPM Model file

You can scaffold your worker code from a BPMN file with the zeebe-node command. To use this command, install the package globally with:

npm i -g zeebe-node

Pass in the path to the BPMN file, and it will output a file to implement it:

zeebe-node my-model.bpmn

Writing Strongly-typed Job Workers

You can provide interfaces to get design-time type safety and intellisense on the process variables passed in the a worker job handler, the custom headers that it will receive, and the variables that it will pass back to Zeebe in the complete.success call:

interface InputVariables {
    name: string,
    age: number,
    preferences: {
        beverage: 'Coffee' | 'Tea' | 'Beer' | 'Water',
        color: string
    }
}

interface OutputVariables {
    suggestedGift: string
}

interface CustomHeaders {
    occasion: 'Birthday' | 'Christmas' | 'Hannukah' | 'Diwali'
}

const giftSuggester = zbc.createWorker<
    InputVariables,
    CustomHeaders,
    OutputVariables>
    ('get-gift-suggestion', (job) => {
        const suggestedGift = `${job.customHeaders.occasion} ${job.variables.preferences.beverage}`
        job.complete({ suggestedGift })
})

If you decouple the declaration of the job handler from the createWorker call, you will need to explicitly specify its type, like this:

import { ZBWorkerTaskHandler } from 'zeebe-node'

function getGiftSuggestion(job): ZBWorkerTaskHandler<InputVariables, CustomHeaders, OutputVariables> {
    const suggestedGift = `${job.customHeaders.occasion} ${job.variables.preferences.beverage}`
    job.complete({ suggestedGift })
}

const giftSuggester = zbc.createWorker({
    taskType: 'get-gift-suggestion',
    taskHandler: getGiftSuggestion
})

Run-time Type Safety

The parameterization of the client and workers helps to catch errors in code, and if your interface definitions are good, can go a long way to making sure that your workers and client emit the correct payloads and have a strong expectation about what they will receive, but it does not give you any run-time safety.

Your type definition may be incorrect, or the variables or custom headers may simply not be there at run-time, as there is no type checking in the broker, and other factors are involved, such as tasks with input and output mappings, and data added to the process variables by REST calls and other workers.

You should consider:

  • Writing interface definitions for your payloads to get design-time assist for protection against spelling errors as you demarshal and update variables.
  • Testing for the existence of variables and properties on payloads, and writing defensive pathways to deal with missing properties. If you mark everything as optional in your interfaces, the type-checker will force you to write that code.
  • Surfacing code exceptions operationally to detect and diagnose mismatched expectations.
  • If you want to validate inputs and outputs to your system at runtime, you can use io-ts. Once data goes into that, it either exits through an exception handler, or is guaranteed to have the shape of the defined codec at run-time.

As with everything, it is a balancing act / trade-off between correctness, safety, and speed. You do not want to lock everything down while you are still exploring.

I recommend the following scale, to match the maturity of your system:

  • Start with <any> typing for the workers; then
  • Develop interfaces to describe the DTOs represented in your process variables;
  • Use optional types on those interfaces to check your defensive programming structures;
  • Lock down the run-time behaviour with io-ts as the boundary validator.

You may choose to start with the DTOs. Anyway, there are options.

Developing Zeebe Node

The source is written in TypeScript in src, and compiled to ES6 in the dist directory.

To build:

npm run build

To start a watcher to build the source and API docs while you are developing:

npm run dev

Tests

Tests are written in Jest, and live in the src/__tests__ directory. To run the unit tests:

npm t

Integration tests are in the src/__tests__/integration directory.

They require a Zeebe broker to run. You can start a dockerised broker:

cd docker
docker-compose up

And then run them manually:

npm run test:integration

For the failure test, you need to run Operate and manually verify that an incident has been raised.

Writing Tests

Zeebe is inherently stateful, so integration tests need to be carefully isolated so that workers from one test do not service tasks in another test. Jest runs tests in a random order, so intermittent failures are the outcome of tests that mutate shared state.

The tests use a templating function to replace the process id, task types and message names in the bpmn model to produce distinct, isolated namespaces for each test and each test run.

Contributors

Name
Josh Wulf
Colin Raddatz
Jarred Filmer
Timothy Colbert
Olivier Albertini
Patrick Dehn

zeebe-client-node-js's People

Contributors

akeller avatar barmac avatar celanthe avatar colrad avatar cs-jfilmer avatar csuermann avatar dependabot[bot] avatar github-actions[bot] avatar jaikanth6 avatar jwulf avatar luca-waldmann-cimt avatar lwille avatar mainaero avatar mrsateeshp avatar nikku avatar nithinssabu avatar nwittstruck avatar olivieralbertini avatar pedesen avatar pepopowitz avatar rahibbutt avatar renovate[bot] avatar s3than avatar treffynnon avatar vil02 avatar vsgoulart avatar walliee avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

zeebe-client-node-js's Issues

Handle gRPC failure

If the broker is not up when workers start, then the first call to a ZBClient method will throw an unhandled exception, which can crash an entire process if it's not handled in the application.

Here is an example of handling it at the application layer, when waiting - for example - for a broker container to start:

async function main() {
	const zbc = new ZBClient(config.ZEEBE_GATEWAY);
	await promiseRetry((retry, number) => {
		if (number > 1) {
			console.log("gRPC connection is in failed state...");
		}
		return deployBpmn(zbc).catch(retry);
	});
	console.log("gRPC connection to broker established.");
	startZeebeWorkers(zbc);
	startRESTServer(zbc);
}

This should be wrapped into the ZBClient class, to remove the responsibility for handling this from the application.

Passing worker options object overrides log level set via env

This code:

process.env.ZB_NODE_LOG_LEVEL = process.env.ZB_NODE_LOG_LEVEL || 'NONE'

const zbc2 = new ZBClient('localtoast:234532534', {})
zbc2.createWorker(
    null,
    'whatever',
    (_, complete) => complete.success,
    {},
    () => {
       called++
})

looks to create a worker that has the LogLevel set to something other than none.

Prior art: #12

Intermittent failure of test in CI

● onConnectionError Handler › Debounces onConnectionError

    expect(received).toBe(expected) // Object.is equality

    Expected: 2
    Received: 3

      80 |             expect(zbc2.connected).toBe(false)
      81 |             zbc2.close()
    > 82 |             expect(called).toBe(2)
         |                            ^
      83 |             done()
      84 |         }, 15000)
      85 |     })

      at Timeout.setTimeout (src/__tests__/local-integration/OnConnectionError.spec.ts:82:19)

complete.failure('this has failed') appears on console as 'Timed out task'

import { ZBClient } from 'zeebe-node'

export class MyWorker {
  zbClient: ZBClient

  constructor(gatewayAddress: string) {
    this.zbClient = new ZBClient(gatewayAddress)
    this.createWorker()
  }

  createWorker() {
    this.zbClient.createWorker(
      'my-worker',
      'my-task-type',
      (job, complete) => this.handler(job, complete)
    )
  }

  handler(job, complete) {
    complete.failure('this has failed!', 0) // <<--------------- !!!
  }
}

console output:

[ my-task-type my-worker ] > Timed out task a4f33646-2acb-4627-aef9-46e281411bdc for my-task-type

Update complete syntax in README

Hi. The README suggests to use complete() to complete a task worker while such a function is not defined. Rather complete.success and complete.failure are to be used as suggested in README again.

It might need to be updated. Thanks.

ZB_NODE_LOG_LEVEL should precede options given in the ctor

Actual Behaviour

If you set options in ctor like

new ZBClient('localhost', { loglevel: 'DEBUG' })

And you do

ZB_NODE_LOG_LEVEL='NONE' node myfile.js

'NONE' is not taken because options precede.

Expected behaviour

ZB_NODE_LOG_LEVEL should precede the option. It should have the last word.

Version

zeebe-node : 1.2.0

Here the PR

gRPC does not reconnect after broker restart

To emulate this, start a broker, then a worker.

Kill the broker, then restart it.

Previously connected workers do not reconnect. With a long-polling worker with loglevel set to DEBUG you can see the failure retrying continuously.

According to these, the underlying gRPC library should reconnect automagically:

Maybe something else needs to be configured in the optional third parameter to Client in GRPCClient?
https://grpc.github.io/grpc/core/group__grpc__arg__keys.html#gad7d9d143858d8f5e138cf704b0082973

Client reports task timed out for every task

At the moment, the client logs that a task timed out in every case, even when it was completed.

The setTimeout is not being cleared, because the code is calling clearInterval instead of clearTimeout.

Unexpected json-path token ROOT_OBJECT using latest server

Running on OSX
$ docker run -p 26500:26500 camunda/zeebe:latest
Then tried to load example workflow: $ node ./example/workflow.js
Server returns error:
23:37:43.757 [io.zeebe.gateway.impl.broker.BrokerRequestManager] [gateway-zb-actors-0] ERROR io.zeebe.gateway - Error handling gRPC request
io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Command rejected with code 'CREATE': Expected to deploy new resources, but encountered the following validation errors:
'test.bpmn': - Element: Message_0ex7vc9 > extensionElements > subscription
- ERROR: JSON path query is invalid: Unexpected json-path token ROOT_OBJECT

Lodash vuln

lodash/lodash#4336

Affected versions of lodash are vulnerable to Prototype Pollution.
The function defaultsDeep could be tricked into adding or modifying properties of Object.prototype using a constructor payload.

Babel dependency

Flatten Job Headers for Zeebe 0.19

Zeebe 0.19 changes the shape of the job object passed to the worker task handler, flattening the jobHeaders subkey into the main body of the job.

Reference: camunda/camunda@1a1555d

This will be a breaking change, so the library should rev to 3.x for this change.

Disconnects from cloud

Since cloud went to Zeebe 0.21.1, this happens every day:

{"context":"/server/endpoint/node_modules/@magikcraft/nestjs-zeebe/node_modules/zeebe-node/dist/zb/ZBWorker.js:122","id":"retrieve-spells_103","level":50,"message":"Stalled on gRPC error","pollMode":"Long Poll","taskType":"retrieve-spells","time":"2019 Oct-28 01:48:50AM","timestamp":"2019-10-28T01:48:50.672Z"}
{"context":"/server/endpoint/node_modules/@magikcraft/nestjs-zeebe/node_modules/zeebe-node/dist/lib/GRPCClient.js:71","id":"gRPC Channel","level":50,"message":"GRPC ERROR: 1 CANCELLED: Received http2 header with status: 502","pollMode":"Long Poll","taskType":"gRPC Channel","time":"2019 Oct-28 01:48:50AM","timestamp":"2019-10-28T01:48:50.672Z"}
{"context":"/server/endpoint/node_modules/@magikcraft/nestjs-zeebe/node_modules/zeebe-node/dist/lib/GRPCClient.js:242","id":"gRPC Channel","level":50,"message":"GRPC Channel State: READY","pollMode":"Long Poll","taskType":"gRPC Channel","time":"2019 Oct-28 01:48:50AM","timestamp":"2019-10-28T01:48:50.673Z"}
{"context":"/server/endpoint/node_modules/@magikcraft/nestjs-zeebe/node_modules/zeebe-node/dist/lib/GRPCClient.js:77","id":"gRPC Channel","level":30,"message":"gRPC Channel reconnected","pollMode":"Long Poll","taskType":"gRPC Channel","time":"2019 Oct-28 01:48:50AM","timestamp":"2019-10-28T01:48:50.673Z"}
{"context":"/server/endpoint/node_modules/@magikcraft/nestjs-zeebe/node_modules/zeebe-node/dist/lib/GRPCClient.js:258","id":"gRPC Channel","level":30,"message":"gRPC Channel State: READY","pollMode":"Long Poll","taskType":"gRPC Channel","time":"2019 Oct-28 01:48:50AM","timestamp":"2019-10-28T01:48:50.855Z"}
{"context":"/server/endpoint/node_modules/@magikcraft/nestjs-zeebe/node_modules/zeebe-node/dist/lib/GRPCClient.js:259","id":"gRPC Channel","level":30,"message":"gRPC Retry count: 2","pollMode":"Long Poll","taskType":"gRPC Channel","time":"2019 Oct-28 01:48:50AM","timestamp":"2019-10-28T01:48:50.855Z"}
{"context":"/server/endpoint/node_modules/@magikcraft/nestjs-zeebe/node_modules/zeebe-node/dist/lib/GRPCClient.js:262","id":"gRPC Channel","level":30,"message":"gRPC reconnected","pollMode":"Long Poll","taskType":"gRPC Channel","time":"2019 Oct-28 01:48:50AM","timestamp":"2019-10-28T01:48:50.855Z"}
{"context":"/server/endpoint/node_modules/@magikcraft/nestjs-zeebe/node_modules/zeebe-node/dist/lib/GRPCClient.js:77","id":"gRPC Channel","level":30,"message":"gRPC Channel reconnected","pollMode":"Long Poll","taskType":"gRPC Channel","time":"2019 Oct-28 01:48:50AM","timestamp":"2019-10-28T01:48:50.856Z"}

The client reports it is connected, but does not retrieve any jobs.

Could this be due to pod rescheduling?

listWorkflows is not a function

Hi. I tried listing the workflows as suggested in the readme by doing zbc.listWorkflows() and it said listWorkflows is not a function. I then went into node_modules to check and I did not find a function defined like that.

Am I missing something?

PS: Great work with the library btw.

Long Polling workers do not reconnect if broker goes away then comes back

During Kubernetes node pre-emption, I've observed workers failing to reconnect.

I'd say that somewhere in the long-polling loop is a condition that does not trigger the poll retry in this scenario.

Fixing this will involve setting up - at least local - tests where the broker is bounced. This will be more challenging to simulate in CI, but should not be a blocker for this fix, because it is a critical production failure.

Inject identifier to gRPC Client

At the moment, gRPC Client logs don't identify the task type of the worker, or if it is the gRPC Client for a ZBClient. In projects with multiple workers, they do not provide information about which parts are failing.

For example:

endpoint     | {"id":"gRPC Channel","level":50,"message":"GRPC ERROR: 14 UNAVAILABLE: GOAWAY received","pollMode":"Long Poll","taskType":"gRPC Channel","time":"2019 Oct-26 10:44:19AM","timestamp":"2019-10-26T10:44:19.354Z"}
endpoint     | {"id":"gRPC Channel","level":50,"message":"GRPC Channel State: IDLE","pollMode":"Long Poll","taskType":"gRPC Channel","time":"2019 Oct-26 10:44:19AM","timestamp":"2019-10-26T10:44:19.354Z"}
endpoint     | {"id":"gRPC Channel","level":30,"message":"gRPC Channel reconnected","pollMode":"Long Poll","taskType":"gRPC Channel","time":"2019 Oct-26 10:44:19AM","timestamp":"2019-10-26T10:44:19.355Z"}

The GRPCClient class should take an id string that is Worker: ${taskType} or ZBClient to add in debugging.

Jobs do not stream to client with 0.17.0 server

Steps to reproduce:

  • Start a 0.17 Zeebe server. For example: docker run -p 26500:26500 camunda/zeebe:0.17.0
  • Transpile the client library: npm i && tsc
  • Run the example worker: cd example && node worker.js
  • Start some workflow instances: cd example && node start-workflow-instance.js

Expected Behaviour:

See jobs being serviced by worker.

Actual Behaviour:

The worker does nothing.

Further Information

  • Starting a 0.17 Zeebe instance with the Simple Monitor running shows no workers in the Simple Monitor worker listing.
  • Workflow instances and tasks are created, but no workers service them.
  • The call to activateJobs() in the client executes and returns a stream, but the returned stream's on('data') handler is never called.

Message event error: EXTRACT_VALUE_ERROR

In my workflow, there is a message event, which should be triggered by the client. I believe I’m sending the correct message to the broker but It’s raising an incidence { "Error Type": "EXTRACT_VALUE_ERROR", "Error Message": "Failed to extract the correlation-key by ‘worker1Id’: no value found" }.

Following is the message event catch:
<bpmn:message id="Message_01t2tpj" name="worker_status"> <bpmn:extensionElements> <zeebe:subscription correlationKey="worker_Id" /> </bpmn:extensionElements> </bpmn:message>

Following is the trigger:
await this._zbClient.publishMessage({ correlationKey: 'worker_Id', messageId: uuid.v4(), name: 'worker_status', variables: { status: 'PROCESSED' }, timeToLive: 60000, });

failJob should be method of worker, or callback parameter

If a worker fails to execute a job, at the moment you have to get a reference to the ZBClient to call failJob. This should be easily available in the worker, like a resolve, reject pattern.

When failing a job, the method needs to set the retries. By default this should be the currentValue - 1, and if the worker wants to hard fail the workflow and raise an incident it should set it to 0.

This will raise an incident in Operate, and also halt the business process.

Note that if, for example, a worker is out of disk space, you can't communicate this in Operate via an incident without halting the process. You will need some other visibility/monitoring - it will be available in ElasticSearch as a failure. You would need to create a Grafana dashboard to see this.

The failure errorMessage becomes the incident message when job retries === 0.

brokerAddress -> gatewayAddress

The client actually communicates with the gRPC Gateway, not with a broker. Because the broker comes out of the box with an embedded gateway, they are the same address. But the gateway can be configured as a standalone gateway, to decouple the gateway from the broker and isolate it from broker load impact.

So the parameter brokerAddress in the ZB.Client constructor should be named gatewayAddress.

Long poll on Cloud - is it working correctly?

Zeebe-node 0.21.2 via @magikcraft/nestjs-zeebe.

In the K8s logs, it looks like a worker set to 30000ms long poll is polling every 10 seconds.

Steps to take to diagnose this:

  1. Set up a new cluster.
  2. Deploy a test workflow.
  3. Connect a single long polling worker.
  4. Watch the K8s log to determine the polling interval.

Interfaces don't reflect received data and grpc behaviour for "Long" type

Current behaviour

By default, All fields of type int64 are serialized/desrialized to string instead of number.
Recent interfaces changes have set those fields to number but data are actually string.

Expected behaviour

Interfaces should reflect what data we have.

Recommandation

See grpc/grpc#7229. Since int64 are keys, this is not something that we increment. We should stay with default grpc behaviour.
@jwulf recommends to add those fields as readonly too.

We don't need to change README because int64 fields are already displayed to string
see:

{ key: '578',
  type: 'demo-service',
  jobHeaders:
   { workflowInstanceKey: '574',
     bpmnProcessId: 'test-process',
     workflowDefinitionVersion: 1,
     workflowKey: '3',
     elementId: 'ServiceTask_0xdwuw7',
     elementInstanceKey: '577' },
  customHeaders: '{}',
  worker: 'test-worker',
  retries: 3,
  deadline: '1546915422636',
  variables: { testData: 'something' } }

Service Task followed by Message Catch Events

Hi. I was testing few things and found a strange bug. When we have a service task followed by a message catch event like given in the screenshot below, and if I use complete.success(updatedWorkflowObj) to complete the worker followed by publishMessage() immediately in succession, the message catch event is not received by Zeebe.

I have to rather add a setTimeOut or delay before I do publishMessage() to the workflow post complete.success.

My assumption here is that the workflow receives the message event faster than it receives the task complete event (not sure though). Are the protocols used for both message catch event as well as task worker completion the same? If yes, may I know if the order in which such events are published are preserved/queued?

Screenshot from 2019-11-14 18-59-14

Graceful shutdown

We need a way to gracefully shut down workers, so they drain and don't take any new connections, then return to let a caller know they are done.

Possibly a method that returns a Promise that the worker is drained. Then an application can Promise.all() the shutdown method of all its workers to allow a graceful shutdown.

Exporting ZBWorker Class

Hi,

I made a PR in order to get the type ZBWorker. Since my app is developed in Typescript, It would be useful.

vscode targets to dist folder which is not safe.

Let me know if you need anything else. Thanks!

ZBClient.updateWorkflowInstancePayload fails.

ZBClient.updateWorkflowInstancePayload fails to execute due to a misspelled function call.

public updateWorkflowInstancePayload(request: ZB.UpdateWorkflowInstancePayloadRequest): Promise<void> { return this.gRPCClient.updateWorkflowInstancePayloadRequestSync(stringifyPayload(request)); }

This method above (see ZBClient.js) calls "this.gRPCClient.updateWorkflowInstancePayloadRequestSync" and I believe it should be "this.gRPCClient.updateWorkflowInstancePayloadSync".

Discussion: Automatic retries

At the moment, the client does not automate any retries. This issue is to capture the current behaviour of the client library in various failure scenarios - taken from here.

Points of failure client-broker over gRPC

There are three points of contact:

  1. Client initiating an operation on the broker
  • failJob
  • publishMessage
  • createWorkflowInstance
  • resolveIncident
  1. Worker activating jobs
  • activateJobs
  1. Worker completing / failing job
  • completeJob
  • failJob

Current Failure Modes

Looking at the current behaviour of each one:

  1. Client initiating an operation on the broker.
  • If the client is started and the broker is not contactable, any operations will throw.
  • If the broker becomes available, operations succeed.
  • If the broker goes away, operations throw again.
  • No automatic retries.
  1. Worker activating jobs
  • If the worker is started and the broker is not contactable, an error is printed to the console.
  • If the broker becomes available, the worker activates jobs.
  • If the broker goes away, no error is thrown or printed.
  • If the broker comes back, jobs are activated.
  • In this case, the worker polling is an automatic retry.
  1. Worker completing jobs
  • If the broker goes away after the worker has taken a job, the worker throws Error: 14 UNAVAILABLE when it attempts to complete the job.
  • No automatic retry.

Ways the broker could go away / be unavailable:

  • Broker address misconfigured.
  • Transient network failure.
  • Broker under excessive load.

Broker Address Misconfigured
This is an unrecoverable hard failure. Retries will not fix this.

Transient network failure
Some temporary disruption in connectivity between worker and broker. This could include a broker restarting or (potentially) a change in DNS (have to test this).
A retry will deal with this case if the transient network failure is fixed before the retries timeout.

Broker is under excessive load and cannot respond
In this case, retries may actually make it worse. Zeebe is horizontally scalable, but I have driven it to failure on a single node by pumping in a massive number of workloads when it is memory starved (I can kill it with 2GB of memory, but haven't yet with 4GB) or runs out of disk space (a slow exporter with a high through-put can do this). Having automated retries will not recover any of these situations.

If the broker is experiencing excessive load because of a traffic spike, then automated retries may drive it to failure, whereas workers failing to complete tasks once and letting the broker reschedule them may allow the broker to recover.

Other failure modes not distinguished
The as-yet unknown unknowns. Any ideas?


Conclusions

I'm not yet sure that automatic retry is (a) necessary; (b) a good idea.

The transient network failure seems to be the only case. I'm not sure how much of an issue that is in an actual system, and if it warrants complicating the code, or the potential downside of hammering a broker when it is experiencing excessive load (which will be either ineffective if it is a hard failure, and could contribute to a hard failure if it is a spike).


I'm open to more data on this, but I don't have a case for implementing retries yet.

nginx proxy terminates gRPC Channel after 60 seconds

The Camunda Cloud nginx proxy terminates the gRPC connection after 60 seconds of inactivity.

This is a problem for long-polling, and will be an issue for the new create and await command in Zeebe 0.22.

They are going to crank it up, but requested that we turn on gRPC heartbeat to see if this mitigates it.

Workflow instances started using current publishStartMessage() method are not load-balanced

The convenience publishStartMessage method currently sets a default correlation key. The hash of the correlation key is actually used to load-balance messages across partitions, so at the moment, all start event messages will create a workflow instance on the same partition.

The correlation key should be set to a random UUID in order to ensure that the workflows start on distinct partitions.

onReady is not called initially for workers

When I create a worker like this:

zbc.createWorker(
  null,
  'demo-service',
  handler,
  {
    onReady: () => console.log(`Worker connected!`),
    onConnectionError: () => console.log(`Worker disconnected!`),
  }
)

then onReady is not called initially after creating the worker. I only gets called, when a connection error happened before.

Add client-side timeout

With Zeebe 0.19, the Java client allows the developer to specify a gRPC request timeout.

This is useful in the scenario where the gateway is accepting requests (so it doesn’t throw immediately) and doesn’t respond with “cannot contact any broker”, but doesn’t get back a response from a broker because load.

If the client-side timeout is exceeded, the call is cancelled on the client, and it exceptionally completes the Future (basically Promise.reject).

Menski says about this:

We encounter problems in load test scenarios, and while investigating the cause found that we forgot to specify a deadline. I highly recommend to set a deadline in any client. At the moment the go and java client have deadline for every request.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.