Giter Club home page Giter Club logo

celos's Introduction

Celos: A Scriptable Scheduler for Periodical Hadoop Workflows

  • Written in Java and scripted using JavaScript.

  • Using a general-purpose scripting language for defining workflows removes most configuration boilerplate.

  • Simple and robust architecture that keeps running even when Hadoop is down and picks up where it left off when it’s back up.

  • Support for continuous delivery of workflows.

  • Parsimonious user interface that nevertheless gives a good overview over hundreds of workflows.

  • Battle-tested for years at Collective, Inc and found practically defect-free.



Overview

Celos UI

The Celos UI

Oozie Overview

Celos works in conjunction with the Apache Oozie execution engine, so we’ll quickly describe Oozie before diving into Celos proper.

Oozie provides services such as distribution of jobs across a cluster, log aggregation, and command-line and Web UI tools for inspecting running jobs.

To run a job in Oozie it is packaged into a workflow directory and put into HDFS, as the following diagram shows:

Oozie workflow directory

Oozie requires a small XML file that describes what the job should do (in this case call a Java main with some arguments), and then runs it on a processing node in the cluster. Artefacts in the lib directory are automatically placed on the classpath. Arguments (such as inputPath and outputPath in the above example) can be passed to the job.

Oozie XML files support many different kinds of actions besides calling Java, such as manipulating HDFS, loading data into databases, or sending emails. This is mostly out of scope for this document, but in general, Celos workflows can use all features of Oozie.

You can view all currently running jobs in the Oozie Web UI, Hue:

Hue index

You can also view details of a particular job in Hue:

Hue details

Celos Concepts

Workflows

If we have the Oozie workflow directory to run in HDFS at /wordcount, hourly input data in /input/YYYY-MM-DD/HH00, and want to write output data to /output/YYYY-MM-DD/HH00, we can set up a simple Celos workflow with the ID wordcount like this:

celos.defineWorkflow({
    "id": "wordcount",
    "schedule": celos.hourlySchedule(),
    "schedulingStrategy": celos.serialSchedulingStrategy(),
    "trigger": celos.hdfsCheckTrigger("/input/${year}-${month}-${day}/${hour}00/_READY"),
    "externalService": celos.oozieExternalService({
        "oozie.wf.application.path": "/wordcount/workflow.xml",
        "inputPath": "/input/${year}-${month}-${day}/${hour}00/",
        "outputPath": "/output/${year}-${month}-${day}/${hour}00/",
    })
});

A Celos workflow always has an Oozie workflow (/wordcount/workflow.xml in this case) as its "meat".

If we were to receive data from two sources, say two datacenters in /input/nyc and /input/lax, we can define a helper function and use that to quickly define two workflows with the IDs wordcount-nyc and wordcount-lax:

function defineWordCountWorkflow(dc) {
    celos.defineWorkflow({
        "id": "wordcount-" + dc,
        "schedule": celos.hourlySchedule(),
        "schedulingStrategy": celos.serialSchedulingStrategy(),
        "trigger": celos.hdfsCheckTrigger("/input/" + dc + "/${year}-${month}-${day}/${hour}00/_READY"),
        "externalService": celos.oozieExternalService({
            "oozie.wf.application.path": "/wordcount/workflow.xml",
            "inputPath": "/input/" + dc + "/${year}-${month}-${day}/${hour}00/",
            "outputPath": "/output/" + dc + "/${year}-${month}-${day}/${hour}00/",
        })
    });
}
defineWordCountWorkflow("nyc");
defineWordCountWorkflow("lax");

Here’s an overview over schedules, triggers, and scheduling strategies, described below:

Celos concepts

Schedules

Each workflow has a schedule that determines the points in time (called slots) at which the workflow should run.

Celos supports cron-like schedules with [celos.cronSchedule]:

// A workflow using this schedule will run every hour.
celos.cronSchedule("0 0 * * * ?");
// A workflow using this schedule will run every day at midnight.
celos.cronSchedule("0 0 0 * * ?");
// A workflow using this schedule will run every day at 5am.
celos.cronSchedule("0 0 5 * * ?");

Another type of schedule is [celos.dependentSchedule], which makes a workflow use the same schedule as another workflow. This is useful for setting up a downstream workflow that tracks an upstream workflow, without having to duplicate the schedule definition.

Triggers

For each slot of a workflow, a trigger is used to determine whether it’s ready to run, or needs to wait.

Simple Triggers

Let’s look at some commonly used simple triggers.

[celos.hdfsCheckTrigger] waits for a file or directory in HDFS:

// A slot at time T will wait for the file /logs/YYYY-MM-DD/HH00/_READY in HDFS.
celos.hdfsCheckTrigger("/logs/${year}-${month}-${day}/${hour}00/_READY");

[celos.successTrigger] waits for the success of another workflow, allowing the definition of dependencies among workflows:

// A slot at time T will wait until the slot at time T of
// the workflow with the ID "workflow-foo" is successful.
celos.successTrigger("workflow-foo")

[celos.delayTrigger] waits until the current wallclock time is a given number of seconds after the slot’s time:

// A slot at time T will wait until the current time is one hour after T.
celos.delayTrigger(60 * 60)

[celos.offsetTrigger] lets us offset another trigger a given number of seconds into the future or past.

// A slot at time T will wait until the _next hour's_ file is available in HDFS.
celos.offsetTrigger(60 * 60, celos.hdfsCheckTrigger("/logs/${year}-${month}-${day}/${hour}00/_READY"));
Combined Triggers

We can also combine triggers with [celos.andTrigger], [celos.orTrigger], and [celos.notTrigger]:

// A slot at time T will wait until both /input-a/YYYY-MM-DD/HH00/_READY
// and /input-b/YYYY-MM-DD/HH00/_READY is in HDFS.
celos.andTrigger(celos.hdfsCheckTrigger("/input-a/${year}-${month}-${day}/${hour}00/_READY"),
                 celos.hdfsCheckTrigger("/input-b/${year}-${month}-${day}/${hour}00/_READY"));
// A slot at time T will wait until the current hour's file, the next hour's file,
// and the file for the hour after that are in HDFS.
var hdfsCheck = celos.hdfsCheckTrigger("/logs/${year}-${month}-${day}/${hour}00/_READY");
celos.andTrigger(hdfsCheck,
                 celos.offsetTrigger(60 * 60 * 1, hdfsCheck),
                 celos.offsetTrigger(60 * 60 * 2, hdfsCheck));
// A slot at time T will be ready if, after one hour, the slot at time T
// of the other workflow "workflow-bar" is _not_ successful.
// This can be used to send an alert for example.
celos.andTrigger(celos.delayTrigger(60 * 60),
                 celos.notTrigger(celos.successTrigger("workflow-bar"));

This last trigger should be used in conjunction with a celos.dependentSchedule("workflow-bar").

Scheduling Strategies

A workflow’s scheduling strategy determines when and in which order the ready slots of the workflow should be run.

There’s only one scheduling strategy at the moment, [celos.serialSchedulingStrategy], which executes ready slots oldest first, with a configurable concurrency level.

// A workflow using this scheduling strategy will run three slots in parallel.
celos.serialSchedulingStrategy(3);

How it Works

The main data sources Celos uses are:

Workflows Directory

The workflows directory contains JavaScript files that define workflows.

It may look like this:

workflows/
  wordcount.js
  some-other-workflow.js
  yet-another-workflow.js

State Database

The state database directory contains the state of each slot as a small JSON file.

db/
  state/
    wordcount-lax/
      2015-09-15/
        00:00:00.000Z
        01:00:00.000Z
        02:00:00.000Z
        ...
    wordcount-nyc/
      2015-09-15/
        00:00:00.000Z
        01:00:00.000Z
        02:00:00.000Z
        ...

An individual slot file in the state database, e.g. db/state/wordcount-lax/2015-09-15/01:00:00.000Z, looks like this:

{
  "status": "SUCCESS",
  "externalID": "0008681-150911205802478-oozie-oozi-W",
  "retryCount": 0
}

The status field records the state the slot is in.

The externalID field contains the Oozie ID of the corresponding Oozie workflow execution if the slot is running, successful, or failed (otherwise it’s null).

The retryCount records how many times the slot has already been retried after failure.

Scheduler Step

On each scheduler step (typically triggered once per minute from cron), Celos evaluates all JavaScript files in the workflows directory, yielding a set of uniquely identified workflows.

Then, for each workflow, Celos fetches all slot files within a sliding window of 7 days before the current date from the state database.

Each slot is a state machine with the following states:

Slot states

Celos takes the following action, depending on the state of the slot:

State Action

WAITING

Call the workflow’s trigger to determine whether the slot is ready. If the trigger signals readiness, put the slot into the READY state. If the slot has been waiting for too long, put the slot into the WAIT_TIMEOUT state. Otherwise, keep the slot in the WAITING state.

READY

Pass the slot as a candidate for scheduling to the workflow’s scheduling strategy. If the strategy chooses to execute the slot, submit it to Oozie, and put it into the RUNNING state. Otherwise, keep the slot in the READY state.

RUNNING

Ask Oozie for the status of the execution. If the slot is still executing, keep it in the RUNNING state. If the slot has succeeded, put it into the SUCCESS state. If the slot has failed, but there are retries left, put the slot into the WAITING state again. If the slot has failed, and there are no more retries left, put the slot into the FAILURE state.

SUCCESS

Do nothing.

FAILURE

Do nothing.

WAIT_TIMEOUT

Do nothing.

KILLED

Do nothing.

The state database contains additional information about slots that have been manually rerun with the /rerun HTTP API.

In the following example, the slots 2015-08-01T01:00Z and 2015-08-01T02:00Z of the workflow wordcount-nyc have been rerun. They are outside the sliding window, so the above scheduling algorithm would not look at the slots.

However, rerunning a slot touches an additional file in the rerun subdirectory of the state database, and slots for which such a file exists are fed into the scheduling algorithm in addition to the slots from the 7 day sliding window.

db/
  state/
    ... as above ...
  rerun/
    wordcount-nyc/
      2015-08-01/
        01:00:00.000Z
        02:00:00.000Z

Rerunning thus serves two purposes: besides the main use of rerunning a slot, it can also be used to backfill data, by using it to mark slots outside the sliding window that the scheduler should care about.

Further Directories

Celos has a defaults directory that contains JavaScript files that can be imported into a workflow JavaScript file with [celos.importDefaults]. Such defaults files are used for sharing global variables and utility functions.

Celos writes daily-rotating logs to a logs directory.

All directories (workflows, defaults, logs, and database) are configurable via [Server Command-Line Arguments].

Continuous Delivery of Workflows

Changing a workflow definition in Celos is as simple as updating the workflow JavaScript file and/or the Oozie workflow definition in HDFS. On the next scheduler step, Celos will pick up the changes.

Bundled with Celos comes a tool called Celos CI (see [Celos CI Reference] as well as samples/quickstart) that automates this process, and can be used in conjunction with GitHub and a CI server such as Jenkins for continuous delivery of Celos workflows.

For each group of related workflows, we have a GitHub repository and a Jenkins job that deploys the workflows on push to master using Celos CI. Celos CI copies the JavaScript files to the Celos host with SFTP, and uploads the Oozie workflow directory to HDFS.

Architecture

Experience

As of September 2015, Celos has been in use at Collective for about two years, and is currently running all of our Hadoop processing (hundreds of individual workflows across dozens of projects).

Celos is productively used by people from different backgrounds, such as data science, operations, software engineering, and database administration, and has proven to be a welcome improvement on our previous Oozie coordinator-based scheduling.

We’re proud that in two years of use, not a single bug in Celos has caused any downtime, which is attributable to the small codebase (about 2500 non-blank, non-comment lines of code for core Celos, as measured by cloc 1.56) and the rigorous test suite (hundreds of unit tests and an extensive integration test).

Building & Running Celos

Prerequisites

You can probably get away with slightly older Hadoop and Oozie versions.

Building Celos

scripts/build.sh

This will build the following JARs:

Getting Started

Head over to samples/quickstart.

Get in Touch

We’d love to help you try out and use Celos!

For now, please use the Issue Tracker if you have questions or comments.

People

Developers, developers, developers:

Head honcho: Chris Ingrassia

JavaScript API Reference

Workflows Reference

celos.defineWorkflow

Description

This is the main API call that registers a new workflow.

Syntax
celos.defineWorkflow(options)
Parameters

The options argument is an object with the following fields:

Name Type Required Description

id

String

Yes

The identifier string for the workflow, must be unique.

trigger

Trigger

Yes

The trigger that determines data availability for the workflow.

schedule

Schedule

Yes

The schedule that determines the points in time at which the workflow should run.

schedulingStrategy

SchedulingStrategy

Yes

The scheduling strategy that determines when and in which order ready slots should be run.

externalService

ExternalService

Yes

The external service actually responsible for executing the job.

startTime

String (ISO 8601, UTC)

No

The date when the workflow should start executing (default: "1970-01-01T00:00Z").

maxRetryCount

Number

No

The number of times a slot of this workflow should be automatically retried if it fails (default: 0).

waitTimeoutSeconds

Number

No

The number of seconds a workflow should stay waiting until it times out (default: Integer.MAX_VALUE (68 years)).

Examples
celos.defineWorkflow({
    "id": "my-workflow",
    "schedule": celos.hourlySchedule(),
    "schedulingStrategy": celos.serialSchedulingStrategy(),
    "trigger": celos.alwaysTrigger(),
    "externalService": celos.oozieExternalService({
        "oozie.wf.application.path": "/my-workflow/workflow.xml",
        "param1": "Hello",
        "param2": "World"
    })
});

celos.importDefaults

Description

Evaluates a file from the defaults directory in the current scope, so all variables and functions from the file become available in the current file.

Syntax
celos.importDefaults(name)
Parameters
Name Type Required Description

name

String

Yes

The name of the defaults file to import, without the ".js" suffix.

Examples
// Loads the file foo.js from the defaults directory
celos.importDefaults("foo");

Triggers Reference

A trigger determines (for each point in time at which a workflow runs) whether the preconditions for running the workflow (such as data availability, or success of upstream workflows are met).

celos.hdfsCheckTrigger

Description

Makes a workflow wait for a file or directory in HDFS. Often used to wait for _READY or _SUCCESS files.

Syntax
celos.hdfsCheckTrigger(path, fs?)
Parameters
Name Type Required Description

path

String

Yes

The HDFS path to wait for. May include the variables ${year}, ${month}, ${day}, ${hour}, ${minute}, and ${second}, which will be replaced by the zero-padded values from the slot’s scheduled time.

fs

String

No

The hdfs:// URI of the HDFS filesystem to use. If not specified, the value of the [CELOS_DEFAULT_HDFS] variable will be used.

Examples
celos.hdfsCheckTrigger("/logs/${year}-${month}-${day}/${hour}-00/_READY");

celos.successTrigger

Description

Makes a workflow wait for the success of another workflow at the same time. This is used to define dependencies among workflows.

Syntax
celos.successTrigger(workflowID)
Parameters
Name Type Required Description

workflowID

String

Yes

The ID of the other workflow to wait for.

Examples
// A workflow using this trigger will run at time T only after the
// workflow "bar" has succeeded at time T.
celos.successTrigger("bar");

celos.andTrigger

Description

Logical AND of nested triggers.

Syntax
celos.andTrigger(trigger1, ..., triggerN)
Parameters
Name Type Required Description

trigger1, ..., triggerN

Trigger

No

The nested triggers. If no nested triggers are specified, the trigger is always ready.

Examples
// Wait for the HDFS paths /foo and /bar
celos.andTrigger(celos.hdfsCheckTrigger("/foo"),
                 celos.hdfsCheckTrigger("/bar"));

celos.orTrigger

Description

Logical OR of nested triggers.

Syntax
celos.orTrigger(trigger1, ..., triggerN)
Parameters
Name Type Required Description

trigger1, ..., triggerN

Trigger

No

The nested triggers. If no nested triggers are specified, the trigger is never ready.

Examples
// Wait for the HDFS paths /foo or /bar
celos.orTrigger(celos.hdfsCheckTrigger("/foo"),
                celos.hdfsCheckTrigger("/bar"));

celos.notTrigger

Description

Logical NOT of a nested trigger.

Syntax
celos.notTrigger(trigger)
Parameters
Name Type Required Description

trigger

Trigger

Yes

The nested trigger to negate.

Examples
// Wait until HDFS path /foo doesn't exist.
celos.notTrigger(celos.hdfsCheckTrigger("/foo"));

celos.offsetTrigger

Description

Offset a nested trigger into the future or past.

Syntax
celos.offsetTrigger(seconds, trigger)
Parameters
Name Type Required Description

seconds

Number

Yes

The number of seconds to offset into the future (if positive) or past (if negative).

trigger

Trigger

Yes

The nested trigger to offset.

Examples
// Wait for this hour's and next hour's HDFS file.
var trigger = celos.hdfsCheckTrigger("/${year}-${month}-${day}/${hour}-00/_READY");
celos.andTrigger(trigger,
                 celos.offsetTrigger(60 * 60, trigger);

celos.delayTrigger

Description

Waits until a specified amount of time has passed between the slot’s scheduled time and the current wallclock time.

Syntax
celos.delayTrigger(seconds)
Parameters
Name Type Required Description

seconds

Number

Yes

The number of seconds to wait.

Examples
// Will become ready one hour after its scheduled time.
celos.delayTrigger(60 * 60);

// Can also be used for e.g. alerting: will trigger if, after 1 hour,
// workflow "foo" is not successful.
celos.andTrigger(celos.delayTrigger(60 * 60),
                 celos.notTrigger(celos.successTrigger("foo")));

celos.alwaysTrigger

Description

A trigger that’s always ready, to be used when a workflow has no preconditions and should simply run at any scheduled time.

Syntax
celos.alwaysTrigger()
Examples
celos.alwaysTrigger();

Schedules Reference

A schedule determines the points in time (slots) at which a workflow should run.

celos.cronSchedule

Description

A cron-like schedule.

Syntax
celos.cronSchedule(cronExpr)
Parameters
Name Type Required Description

cronExpr

String

Yes

The cron expression.

Examples
// Runs a workflow at 10:15am every day.
celos.cronSchedule("0 15 10 * * ?");

celos.hourlySchedule

Description

Runs a workflow every hour.

A shortcut for celos.cronSchedule("0 0 * * * ?").

Syntax
celos.hourlySchedule()
Examples
celos.hourlySchedule();

celos.minutelySchedule

Description

Runs a workflow every minute.

A shortcut for celos.cronSchedule("0 * * * * ?").

Syntax
celos.minutelySchedule()
Examples
celos.minutelySchedule();

celos.dependentSchedule

Description

Runs a workflow with the same schedule as another workflow.

Syntax
celos.dependentSchedule(workflowID)
Parameters
Name Type Required Description

workflowID

String

Yes

The workflow ID of the other workflow.

Examples
// A workflow using this schedule will run with the same schedule as
// the workflow with the ID "foo".
celos.dependentSchedule("foo");

Scheduling Strategies Reference

A scheduling strategy determines the order in which the ready slots of a workflow are executed.

celos.serialSchedulingStrategy

Description

Executes slots oldest first, with a configurable concurrency level.

Syntax
celos.serialSchedulingStrategy(concurrency?)
Parameters
Name Type Required Description

concurrency

Number

No

The number of slots to execute at the same time (defaults to 1).

Examples
// A workflow using this scheduling strategy will have
// at most three slots executing concurrently.
celos.serialSchedulingStrategy(3);

External Services Reference

An external service actually executes a workflow.

celos.oozieExternalService

Description

Executes slots with Oozie.

Syntax
celos.oozieExternalService(properties, oozieURL?)
Parameters
Name Type Required Description

properties

Object

Yes

Properties to pass to Oozie.

oozieURL

String

No

The HTTP URL of the Oozie API. If not specified, the value of the [CELOS_DEFAULT_OOZIE] variable will be used.

Inside property values, the variables ${year}, ${month}, ${day}, ${hour}, ${minute}, and ${second}, will be replaced by the zero-padded values from the slot’s scheduled time.

year, month, day, hour, minute, and second will also be set as Oozie properties, so they can be used in the Oozie workflow XML file.

Additionally, Celos will set the Oozie property celosWorkflowName to a string like "my-workflow@2015-09-12T20:00Z", useful for display.

oozie.wf.application.path is the only property required by Oozie. It points to a Oozie workflow XML file within an Oozie workflow directory. There can be multiple XML files within a single Oozie workflow directory.

If [CELOS_DEFAULT_OOZIE_PROPERTIES] is defined and an Object, its members are added (before other properties, so they can be overridden) to the Oozie properties.

Examples
celos.oozieExternalService({
    "oozie.wf.application.path": "/workflow-dir/workflow.xml",
    "prop1": "Hello. It is the year ${year}!",
    "prop2": "Just another property."
});

Variables

If defined, these global variables influence some API functions. It’s a good idea to set them in a common defaults file imported by all workflows.

CELOS_DEFAULT_HDFS

The String value of this variable will be used as the default HDFS name node URI by [celos.hdfsCheckTrigger].

CELOS_DEFAULT_OOZIE

The String value of this variable will be used as the default Oozie API URL by [celos.oozieExternalService].

CELOS_DEFAULT_OOZIE_PROPERTIES

The members of this Object will be added (before other properties, so they can be overridden) to the Oozie properties of a workflow by [celos.oozieExternalService].

Celos Server Reference

The celos-server.jar launches Celos.

The celos-server.jar must be run in the following way, due to the need to put the Hadoop configuration on the classpath:

java -cp celos-server.jar:/etc/hadoop/conf com.collective.celos.server.Main <arguments...>

Server Command-Line Arguments

Name Type Required Description

--port

Integer

Yes

HTTP port for server.

--workflows

Path

No

Workflows directory (defaults to /etc/celos/workflows).

--defaults

Path

No

Defaults directory (defaults to /etc/celos/defaults).

--logs

Path

No

Logs directory (defaults to /var/log/celos).

--db

Path

No

State database directory (defaults to /var/lib/celos/db).

--autoSchedule

Integer

No

Interval (in seconds) between scheduler steps. If not supplied, Celos will not automatically step the scheduler, and wait for POSTs to the /scheduler servlet instead.

Server HTTP API

/scheduler

Doing a POST to this servlet initiates a scheduler step.

In production we do this once a minute from cron.

Example
curl -X POST localhost:1234/scheduler

/workflow-list

Doing a GET to this servlet returns the list of workflows loaded into Celos.

Example
curl "localhost:1234/workflow-list"

prints:

{
  "ids" : [ "workflow-1", "workflow-2", "workflow-3" ]
}

/workflow-slots

Doing a GET to this servlet returns the slots of a workflow within a time range.

It also returns other information about the workflow, such as its paused state (see the /pause servlet).

Parameters
Name Type Required Description

id

String

Yes

ID of the workflow.

end

String (ISO 8601, UTC)

No

Time (exclusive) of most recent slot to return. Defaults to current time.

start

String (ISO 8601, UTC)

No

Time (inclusive) of earliest slot to return. Defaults to 1 week before end.

Example
curl "localhost:1234/workflow-slots?id=workflow-1"

prints:

{
  "paused": false,
  "slots" : [ {
    "time" : "2015-09-13T13:50:00.000Z",
    "status" : "READY",
    "externalID" : null,
    "retryCount" : 0
  }, {
    "time" : "2015-09-13T13:45:00.000Z",
    "status" : "SUCCESS",
    "externalID" : "0004806-150911205802478-oozie-oozi-W",
    "retryCount" : 0
  }, {
    "time" : "2015-09-13T13:40:00.000Z",
    "status" : "SUCCESS",
    "externalID" : "0004804-150911205802478-oozie-oozi-W",
    "retryCount" : 0
  },
  ...
  ]
}

/trigger-status

Doing a GET to this servlet returns human-readable information about why a slot is waiting.

Parameters
Name Type Required Description

id

String

Yes

ID of the workflow.

time

String (ISO 8601, UTC)

Yes

Scheduled time of slot to check.

Example
curl "localhost:1234/trigger-status?id=workflow-1&time=2015-09-13T13:00Z"

prints:

{
  "type" : "AndTrigger",
  "ready" : false,
  "description" : "Not all nested triggers are ready",
  "subStatuses" : [ {
    "type" : "DelayTrigger",
    "ready" : false,
    "description" : "Delayed until 2015-09-14T16:00:00.000Z",
    "subStatuses" : [ ]
  }, {
    "type" : "HDFSCheckTrigger",
    "ready" : true,
    "description" : "HDFS path hdfs://nameservice1/logs/dc3/2015-09-14/1500 is ready",
    "subStatuses" : [ ]
  } ]
}

/rerun

Doing a POST to this servlet instructs Celos to mark a slot for rerun.

The slot’s state will be reset to waiting and its retry count will be reset to 0.

Parameters
Name Type Required Description

id

String

Yes

ID of the workflow.

time

String (ISO 8601, UTC)

Yes

Scheduled time of slot to rerun.

Example
curl -X POST "localhost:1234/rerun?id=workflow-1&time=2015-09-13T13:40Z"

/kill

Doing a POST to this servlet marks a slot as killed and also kills its underlying Oozie job, if any.

Parameters
Name Type Required Description

id

String

Yes

ID of the workflow.

time

String (ISO 8601, UTC)

Yes

Scheduled time of slot to kill.

Example
curl -X POST "localhost:1234/kill?id=workflow-1&time=2015-09-13T13:40Z"

/pause

Doing a POST to this servlet pauses or unpauses a workflow. While a workflow is paused, the scheduler will simply ignore it. This means it doesn’t check any triggers for the workflow, doesn’t submit new jobs to the workflow’s external service, and doesn’t perform any other action related to the workflow.

You can check whether a workflow is paused by looking at the paused field of the result of the /workflow-slots servlet.

Parameters
Name Type Required Description

id

String

Yes

ID of the workflow.

paused

Boolean

Yes

Whether to pause (true) or unpause (false) the workflow.

Example
# Pause a workflow
curl -X POST "localhost:1234/pause?id=workflow-1&paused=true"
# Unpause a workflow
curl -X POST "localhost:1234/pause?id=workflow-1&paused=false"

Celos CI Reference

The celos-ci-fat.jar can be used to deploy workflow and defaults JavaScript files, as well as Oozie workflow directories automatically.

java -jar celos-ci-fat.jar <arguments...>

CI Command-Line Arguments

Name Type Required Description

--mode

String

Yes

deploy or undeploy

--workflowName

String

Yes

Name of workflow (or rather, project).

--deployDir

Path

Yes

The deployment directory (not needed for undeploy).

--target

URL

Yes

The target file (file: or sftp: URL).

--hdfsRoot

Path

No

HDFS data will be placed under this root (defaults to /user/celos/app).

Deployment Directory

A deployment directory must follow a canonical directory layout:

workflow.js
defaults.js
hdfs/
  workflow.xml
  ...
  lib/
    ...
  • workflow.js is the JavaScript file containing workflow definitions.

  • defaults.js is a JavaScript defaults file containing variables and utility functions.

  • hdfs is the Oozie workflow directory.

If WORKFLOW_NAME is the value of the --workflowName argument, and HDFS_ROOT is the value of the --hdfsRoot argument, and WORKFLOWS_DIR and DEFAULTS_DIR are the Celos workflows and defaults directories specified in the target file, respectively, then the files will be deployed to the following locations:

workflow.js -> $WORKFLOWS_DIR/$WORKFLOW_NAME.js
defaults.js -> $DEFAULTS_DIR/$WORKFLOW_NAME.js
hdfs/       -> $HDFS_ROOT/$WORKFLOW_NAME

Target File

A target file is a JSON file that describes a Celos and HDFS setup.

Name Type Required Description

hadoop.hdfs-site.xml

URL

Yes

URL of Hadoop hdfs-site.xml File

hadoop.core-site.xml

URL

Yes

URL of Hadoop core-site.xml File

defaults.dir.uri

URL

Yes

URL of Celos defaults directory.

workflows.dir.uri

URL

Yes

URL of Celos workflows directory.

All fields must be file: or sftp: URLs.

Example:

{
    "hadoop.hdfs-site.xml": "sftp://celos002.ewr004.collective-media.net/etc/hadoop/conf/hdfs-site.xml",
    "hadoop.core-site.xml": "sftp://celos002.ewr004.collective-media.net/etc/hadoop/conf/core-site.xml",
    "defaults.dir.uri": "sftp://celos002.ewr004.collective-media.net/etc/celos/defaults",
    "workflows.dir.uri": "sftp://celos002.ewr004.collective-media.net/etc/celos/workflows",
}

The best practice for using Celos CI is putting a target file for each Celos installation (e.g. production and staging) in a central, SFTP-accessible location, and storing the target file’s SFTP URL in an environment variable (e.g. PRODUCTION_TARGET and STAGING_TARGET). Deploy scripts using Celos CI should then pass this variable as the --target argument to Celos CI, making them independent of the Celos installation to which the workflow is to be deployed. See samples/quickstart for an example.

CI Environment Variables

CELOS_CI_USERNAME

If defined, overrides the username in sftp: URLs in the target file.

Celos UI Reference

The celos-ui.jar runs the Celos user interface.

java -jar celos-ui.jar <arguments...>

UI Command-Line Arguments

Name Type Required Description

--port

Integer

Yes

HTTP port for UI.

--celos

URL

Yes

Celos URL.

--hue

URL

No

Hue (Oozie UI) URL. If specified, slots become linked to Hue.

--config

Path

No

JSON [UI Config File].

UI Config File

If the --config argument is not supplied to the UI, it simply renders one long list of all loaded workflows.

By supplying a JSON file with the following format to --config, the workflows can be hierarchically grouped (one level):

{
  "groups":
  [
    {
      "name": "Group 1",
      "workflows":
      [
        "workflow-1",
        "workflow-2"
      ]
    },
    {
      "name": "Group 2",
      "workflows":
      [
        "workflow-3"
      ]
    }
  ]
}

UI HTTP API

/ui

Doing a GET to this servlet displays the Celos UI.

Parameters
Name Type Required Description

time

String (ISO 8601, UTC)

No

Time of most recent slot to display (defaults to current time).

zoom

String (Number)

No

Zoom level in minutes (defaults to 60).

Miscellaneous

Two similar, programmable schedulers:

The Celos Name

The Lord of the Rings Dictionary defines it as:

Celos S; also Kelos; freshet; kel- flow away [Sil; *kelu-]; one would
want to choose los snow [Sil] for the final element, but the text of
Unfinished Tales, Index, entry Celos states the final form derives
from Q -sse, -ssa, a form of emphasis [some say locative], making the
definition 'much flowing' or 'freshet', often resulting from melting
snow; perhaps 'snow' is then implied from the ending; a river in
Gondor

Alternatively, the Devil’s Dictionary of Programming defines it as:

Configurable: It’s your job to make it usable.
Elegant: The only use case is making me feel smart.
Lightweight: I don’t understand the use-cases the alternatives solve.
Opinionated: I don’t believe that your use case exists.
Simple: It solves my use case.

Acknowledgements

Thanks to our in-house users and to the developers of the many fine open source libraries we’re able to use, including but not limited to Oozie, Hadoop, Jetty, Rhino, Joda, Jackson, Quartz, and Gradle.

celos's People

Contributors

manuel avatar akonopko avatar ollie64 avatar ivorwilliams avatar clintcombs avatar btrofimov avatar

Stargazers

tristan avatar Quan Kong avatar Daniel Boline avatar YangJun avatar Sid Cao avatar Tom Mulder avatar Alberto Miorin avatar Kevin Tran avatar Han Ju avatar  avatar Jie Zhou avatar  avatar Denys Popov avatar Angus H. avatar gclover avatar Erik Bernhardsson avatar Nikolay Kolev avatar Deepu Mohan Puthrote avatar Eugene Zhulenev avatar Chris Ingrassia avatar Guru Dharmateja Medasani avatar  avatar

Watchers

Mike Vincent avatar Brenden Grace avatar Bob Larrick avatar Sascha Sadat-Guscheh avatar Korpan Eugene avatar Amol Brid avatar  avatar James Cloos avatar Ethan Urie avatar Eugene Zhulenev avatar  avatar  avatar Sriram Eswaran avatar  avatar Aleks Navratil avatar Sergey Malov avatar Chris Putnam avatar Mandeep Singh avatar John Puccino avatar SivaKumar avatar Allan Lazarovici avatar Latika Mehra avatar Surya Prabhakar avatar Carol Lin avatar Anand Parthasarathy avatar Vijayarajeswari Gopalakrishnan avatar Ankit Tandon avatar Andriy Shumylo avatar David Simmons avatar praxnet avatar  avatar Boris Drovetsky avatar Jeff Pinard avatar Ashton Erler avatar  avatar Victor Seet avatar Thomas Barr avatar  avatar Mathew Konopinski avatar Sanjay Srikonda avatar Anthony Porcano avatar  avatar  avatar Abinash Panda avatar  avatar Priya Harish avatar Vishal Vijay Verma avatar  avatar  avatar  avatar  avatar  avatar  avatar

celos's Issues

Constraining number of running workflows

We mustn't arbitrarily fire off new workflows, or we'll overload the system.

Resolution:

Use the serial scheduling strategy for all workflows.

The serial strategy ensures that only a single workflow instance of a given workflow is running at any time. Given that we initially won't be running more than a handful workflows with Celos, this will give us enough concurrency control.

Original proposal - the discussion below refers to this:

A solution that's simple to implement and should have nice properties is:

  • Only ever run N workflows at a time (e.g. N = 20).
  • Only ever run a single workflow of a given type (e.g. edge2-batchimport).

collectivemedia/tracker#36
@collectivemedia/syn-datapipe2

Automatic state database backup

The state database should be backed up to a safe location every day.

collectivemedia/tracker#36
@collectivemedia/syn-datapipe2

Make testrunner script generic

The manuel_sample_workflow branch contains an initial implementation of a test running script.

It takes a sample workflow, uploads it to the virtual cluster, steps the scheduler until no more jobs are running, and then compares the workflow's outputs against fixtures.

This should be generalized so arbitrary workflows can be tested:

  • A workflow specifies its inputs and outputs under src/test/celos-input and src/test/celos-output.
  • Running a script from the workflow directory performs all steps currently done by the test.yaml script, but in a generic way.

Basically, it should be possible to integration test a workflow by calling a single generic script from the workflow's repo / build process.

CI

Automatically test the system.

collectivemedia/tracker#36
@collectivemedia/syn-datapipe2

Monitor Celos is running every minute

We need to ensure that Celos is running approximately every minute and raise an alert if it doesn't.

Plan: implement as a canary workflow running every minute that sends an event to Riemann, and monitor from there.

collectivemedia/tracker#36
@collectivemedia/syn-datapipe2

Rerun failed workflows via REST API

@manuel - does celos support "rerun action" like it as done in OozieClient?
void rerunAction(actionId : String)?
And have you active celos instance somewhere installed in order to test rest API?

Workflows

Requirements:

  • Use the Oozie API to submit workflows by specifying their workflow.xml path and arbitrary Properties, which will minimally include y/m/d/h
  • Upon submission, return the unique external ID for each workflow, so we can reference it in our system.
  • Check external workflow status by external ID (one of running, success, failure).
  • Integration testing against local Oozie?

Something like this:

public interface ExternalService {
    // returns external ID or throws Exception if something goes wrong
    // the system will set YEAR/MONTH/DAY/HOUR in the props automatically
    // other properties like path to workflow.xml will simply be passed through from the workflow configuration
    public String run(Properties props);
    public ExternalStatus getStatus(String externalWorkflowID);
}

Initially, all Oozie API interactions should simply run on the main thread, synchronously.

Additional, nice to haves:

  • Get workflow tracking URL so we can reference it in our GUI

References: https://github.com/collectivemedia/tracker/issues/36#issuecomment-27583160, https://github.com/collectivemedia/tracker/issues/36#issuecomment-27629763

https://github.com/collectivemedia/tracker/issues/36
@collectivemedia/syn-datapipe2

JSON Configuration

As a first step, we will read in configuration from a local filesystem directory containing JSON files. Each file describes a workflow, as follows:

$ cat dorado-nym1-bid.json
{
    "id": "dorado-nym1-bid",
    "schedule": {
        "type": "com.collective.celos.HourlySchedule" 
    },
    "schedulingStrategy": {
        "type": "com.collective.celos.SerialSchedulingStrategy" 
    },
    "trigger": {
        "type": "com.collective.celos.HDFSCheckTrigger",
        "properties": {
            "path": "/logs/nym1/dorado/${year}-${month}-${day}/${hour}00/_READY"
        }
    },
    "externalService": {
        "type": "com.collective.celos.OozieExternalService",
        "properties": {
            "oozie.wf.application.path": "/deploy/dorado/workflow.xml",
            "jobTracker": "nn01.ny7.collective-media.net:8032",
            "nameNode": "hdfs://cluster-ny7"
        }
    }
}

The code that reads the files should create the individual objects (schedule, scheduling strategy, trigger, external service) using reflection/Class.forName. Each object should have a constructor taking a Properties object as input.

The configuration reader should take as input the local directory File and produce as output a WorkflowConfiguration.

collectivemedia/tracker#36
@collectivemedia/syn-datapipe2

Mock external systems

Until we have a proper integration testing setup, we should add mock tests for any external systems (Oozie, HDFS) we use, to make sure the right methods get called with the right arguments.

collectivemedia/tracker#36
@collectivemedia/syn-datapipe2

Dockerize Celos

Put Celos via Ansible into a Docker container.

Then deploy the container to the virtual cluster for integration testing and to the real cluster for deployment.

Requirements

  • Celos must be able to talk to cluster services like Oozie and HDFS.
  • Outside services must be able to talk to Celos' HTTP API at a well-known URL.
  • Deployment of docker image should be automated.
  • Docker image can be deployed to prod cluster as well as AWS test cluster.

Nice to haves

  • Stage and run integration tests (on the prod cluster) against docker images before deployment.
  • Run docker image inside a Virtualbox for non-Linux development.

Questions

  • Where does the created image get stored? What's the process for updating it?

Recommended by @jpfuentes2: https://github.com/signalfuse/maestro-ng

collectivemedia/tracker#36
@collectivemedia/syn-datapipe2

REST API

GET /workflow-list

{
    "ids": ["workflow-1", "workflow-2"]
}

GET /workflow?id=workflow-1

{
    "2013-12-07T13:00:00.000Z": { "status": "RUNNING", "externalID": "237982137-371832798321-W" },
    "2013-12-07T14:00:00.000Z": { "status": "READY" },
    ...
}

collectivemedia/tracker#36
@collectivemedia/syn-datapipe2

Handling Oozie SUSPEND and other "esoteric" states

I think this is more of a version 2 feature.

For version 1, the scheduler could simply require manual intervention (e.g. via Hue or Oozie command line) if a job runs forever or gets into SUSPENDED state.

collectivemedia/tracker#36
@collectivemedia/syn-datapipe2

Logging

  • Log all exceptions.
  • Log all database updates in the form Changing status of slot workflow-1@2013-12-03T13:00Z to RUNNING with external ID = 2398721837913-2767321868713-W or something along those lines. Maybe it makes sense to have a LoggingStateDatabase, that wraps another StateDatabase and logs all changes.

Every log line pertaining to a workflow instance should include the workflow ID and the scheduled time, in addition to the real time, for greppability.

Real time, then workflow ID and scheduled time, then message:

2013-12-03T13:49Z [workflow-1@2013-12-03T13:00Z] INFO: Changing status of slot workflow-1@2013-12-03T13:00Z to RUNNING with external ID = 2398721837913-2767321868713-W

Likewise, each line of an exception stack trace should be prefixed with this information.

collectivemedia/tracker#36
@collectivemedia/syn-datapipe2

Logging to /var/log/celos/

  • Log to /var/log/celos/celos.log
  • Provision directory with ansible, writable by celos user
  • Use log line format where timestamp including milliseconds appears at the beginning of line
  • Integration test that log works
  • Probably add some log rolling to create a new log file if the current one gets big, and move the old one to /var/log/celos/celos.2013-01-24.log or similar.

JAR loading extension mechanism

Celos should automatically load any JARs in /etc/celos/lib and add them to the classpath.

This mechanism will be used to extend Celos with workflow-specific triggers, schedules, scheduling strategies.

collectivemedia/tracker#36
@collectivemedia/syn-datapipe2

HDFSCheckTrigger

A Trigger implementation that checks for the existence of a file or directory in HDFS.

As shown in single-workflow-config.json it is configured with a path property which it receives in the Properties passed to its isDataAvailable method.

It should replace ${year}, ${month}, ${day}, and ${hour} in the path value with the corresponding values from the ScheduledTime (which probably needs to be extended with getYear(), etc), and then check for the existence of the file or dir.

collectivemedia/tracker#36
@collectivemedia/syn-datapipe2

Continuous Deployment

Overview

Celos' purpose is to submit workflows to Oozie and monitor their execution status.

Celos gets triggered once a minute by cron. It then performs a bunch of HDFS file-exists-tests to see if new data is available, and then submits new jobs to and queries the status of existing jobs in Oozie (via Oozie's HTTP API).

Celos is driven by workflow configuration files (typically under /etc/celos) and a state database (under /var/lib/celos) on disk.

To access the services, Celos depends on a periodically refreshed Kerberos keytab.

A monitoring GUI (currently running on admin1) pulls information from Celos via a JSON HTTP API.

deploy-overview

Notes

chmod 700 /.ssh
put ci's key into celos@celos001:
/.ssh/authorized_keys

https://github.com/collectivemedia/tracker/issues/36
@collectivemedia/syn-datapipe2

Configurable retry interval for workflows

Currently, per #52, a failed workflow will be made ready to run immediately after the failure.

It would probably make sense to be able to specifiy a retry interval to wait between making a failed workflow runnable again.

collectivemedia/tracker#36
@collectivemedia/syn-datapipe2

destroy.sh says "There are active instances using security group 'manuel_celos-dn'"

However, the EC2 console lists the instances as terminated.

Complete output:

mjs@hotel:~/prj/celos/provisioner$ ./destroy.sh 
terminating cluster
"manuel_celos"
destroying security groups
/var/lib/gems/1.9.1/gems/aws-sdk-1.29.1/lib/aws/core/client.rb:366:in `return_or_raise': There are active instances using security group 'manuel_celos-dn' (AWS::EC2::Errors::InvalidGroup::InUse)
    from /var/lib/gems/1.9.1/gems/aws-sdk-1.29.1/lib/aws/core/client.rb:467:in `client_request'
    from (eval):3:in `delete_security_group'
    from /var/lib/gems/1.9.1/gems/aws-sdk-1.29.1/lib/aws/ec2/security_group.rb:329:in `delete'
    from ./sec_groups_delete.rb:29:in `block in '
    from /var/lib/gems/1.9.1/gems/aws-sdk-1.29.1/lib/aws/ec2/security_group_collection.rb:125:in `block in each'
    from /var/lib/gems/1.9.1/gems/aws-sdk-1.29.1/lib/aws/core/data.rb:97:in `block in method_missing'
    from /var/lib/gems/1.9.1/gems/aws-sdk-1.29.1/lib/aws/core/data.rb:96:in `each'
    from /var/lib/gems/1.9.1/gems/aws-sdk-1.29.1/lib/aws/core/data.rb:96:in `method_missing'
    from /var/lib/gems/1.9.1/gems/aws-sdk-1.29.1/lib/aws/ec2/security_group_collection.rb:120:in `each'
    from ./sec_groups_delete.rb:27:in `'
delete host file

Expand Scheduler test suite

The state space of the scheduler is very big, and requires more additional tests, especially when automatic retries #10 are added.

  • Test that exceptions in one workflow don't affect other workflows.
  • Test that triggers are called repeatedly until they return true.
  • Test that exceptions are logged.

collectivemedia/tracker#36
@collectivemedia/syn-datapipe2

Prevent multiple instances of same workflow

If we fire off a workflow in Oozie, but the scheduler crashes before we can write the instance's external ID to the database, then we will fire it off again in the next cycle, leading to two same copies of the same workflow running.

We could submit the workflow first without starting it, store its external ID in our DB, and then start it.

This means that if we crash before noting the external ID, the external workflow instance will be submitted, but there will never be two running instances.

Plan: Remove run from ExternalService and replace it with String submit(ScheduledTime t) and void start(String externalID).

collectivemedia/tracker#36
@collectivemedia/syn-datapipe2

Configurable concurrency level for workflows

Like in Oozie, it should be possible to specify how many executions of a workflow can run concurrently. (ATM, the serial scheduling strategy has a concurrency of 1 for all workflows.)

Probably for version 2.

collectivemedia/tracker#36
@collectivemedia/syn-datapipe2

Ensure only a single Celos instance is running at any time

To prevent database corruption, we need to make sure only a single Celos instance is running, using a directory-based lock ( http://stackoverflow.com/a/731634 ) at /var/run/celos/lock.

main should try to mkdir the directory, and if mkdir returns false, it should throw an Exception.

On exit, the directory should be deleted (in a finally block around the rest of main).

An integration test should create the lock dir, call main, and verify that it throws an exception.

A second integration test should ensure that the directory doesn't exist, call main, and ensure that it doesn't exist afterwards.

collectivemedia/tracker#36
@collectivemedia/syn-datapipe2

Configuration defaults

Right now, all configuration properties for a workflow must be specified in the workflow.json file, e.g. https://gist.github.com/manuel/0a339b6cff6771b7267e

There should be a mechanism for reusing properties across workflows, so properties shared by similar workflows can be reused and don't need to be specified in every JSON file.

collectivemedia/tracker#36
@collectivemedia/syn-datapipe2

Kerberize Celos

Celos needs a Kerberos setup (probably a keytab) so it can access Oozie.

This will also need periodic renewal.

collectivemedia/tracker#36
@collectivemedia/syn-datapipe2

Scheduler

The "main loop" of the system, that repeatedly looks for triggers #2 that haven't run yet and executes workflows #3 when a trigger fires.

collectivemedia/tracker#36
@collectivemedia/syn-datapipe2

Database

Keeps track of data returned by triggers #2 and the status of executed workflows #3.

collectivemedia/tracker#36
@collectivemedia/syn-datapipe2

SerialSchedulingStrategy

This SchedulingStrategy implementation will be used for Pythia. It runs jobs serially, and only ever runs a single job at a time. See #11

It should:

  • always run the oldest slot within the sliding window that isn't in SUCCESS or FAILURE states yet. This guarantees serial execution.
  • only ever run a single slot at a time (i.e. look for any slots that are RUNNING - should be always only one - and only submit a new slot if no slots are RUNNING).
  • not "skip ahead of the queue" to newer slots if there is a WAITING slot. The strategy should wait for this slot to become READY, and not execute any newer ones.
  • however, if a workflow instance irrevocably is in FAILURE, we will skip ahead, and run any later workflow

I think this should be right, but I'm not sure.

collectivemedia/tracker#36
@collectivemedia/syn-datapipe2

Fixture-based workflow testing framework

Make workflows integration testable - how?

  • Specify (a small amount) of input data files.
  • Specify expected output data files.
  • Copy inputs to HDFS, run workflow over it.
  • Compare against expected output.
  • ?
  • Profit.

Will try this with the classic word count MR job on the test cluster.

collectivemedia/tracker#36
@collectivemedia/syn-datapipe2

Testing

Testing a scheduler is probably tricky. Maybe we need to "mock the clock" or something.

collectivemedia/tracker#36
@collectivemedia/syn-datapipe2

API for rerunning of failed workflows

  • Add servlet POST /celos/rerun-workflow?id=workflow-id&time=YYYY-MM-DDTHH:MM:SSZ
  • This should update the slot in the state database to READY (but only if its current status is SUCCESS or FAILURE).
  • The scheduler will then automatically pick up the slot again on the next cycle.

collectivemedia/tracker#36
@collectivemedia/syn-datapipe2

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.