Giter Club home page Giter Club logo

airflow-tutorial's Introduction

Airflow tutorial

This tutorial is loosely based on the Airflow tutorial in the official documentation. It will walk you through the basics of starting up Airflow and creating a job.

Setup

Install Airflow

Airflow is installable with pip. It's prepackaged in the virtual environment in environment.yml or you can install it yourself. You may run into problems later if you don't have the right binaries or Python packages installed. If you run into HiveOperator errors, do a pip install airflow[hive]. Make sure you have the database installed if you're specifying database support with pip (e.g. do a brew install postgresql or apt-get install postgresql if you did pip install airflow[postgres]).

Use the conda virtual environment as defined in environment.yml:

  • Install miniconda
  • Make sure that conda is on the path:
$ which conda
~/miniconda2/envs/ns/bin/conda
  • Install the virtual environment:
$ conda env create -f environment.yml
  • Activate the virtual environment:
$ source activate airflow

Alternatively, install Airflow yourself by running:

$ pip install airflow

Run Airflow

Before we can actually use Airflow, we have to initialize its database (the default is a SQLite database). Once that's done, we can access the UI by running a web server and we can start scheduling jobs.

First, set environment variable AIRFLOW_HOME to the current directory (if you don't set this, Airflow will create a directory ~/airflow to put its files in):

$ export AIRFLOW_HOME="$(pwd)"

Initialize the database:

$ airflow initdb

Start the web server and go to localhost:8080 to check out the UI:

$ airflow webserver -p 8080

With the webserver running, we'll start a job from a new terminal window. Open a new terminal, activate the virtual environment and set the environment variable for this terminal:

$ source activate airflow
$ export AIRFLOW_HOME="$(pwd)"

Run a supplied example and check in the UI that it's running:

$ airflow run example_bash_operator runme_0 2015-01-01

Job

We'll first create a job by specifying actions as a Directed Acyclic Graph (DAG) in Python and then submit it.

Create the DAG

Open a file with the name airflow_tutorial.py for your DAG.

Settings for tasks can be passed as arguments when creating them. This can be done by setting each of them or by passing a dictionary with default values, allowing us to share default arguments for multiple tasks. Add the following dictionary to airflow_tutorial.py to specify time, email and retry settings that are shared by our tasks:

from datetime import datetime
from datetime import timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

We'll create a DAG object that will contain our actions. This DAG will run every day, starting with the start_date specified in default_args. The run will start after a given date has passed (i.e. the workflow for 2016-05-01 will run after 2016-05-01 23:59). All times in Airflow are in UTC.

from airflow import DAG
dag = DAG('airflow_tutorial', default_args=default_args, 
          schedule_interval=timedelta(days=1))

We'll run a job consisting of four tasks: we'll print 'hello', wait for 10 seconds and finally print 'world', all done using BashOperators. Give each operator an unique ID, a bash command and our dag object (the parameter retries is overwritten by 3 for the third operator).

from airflow.operators.bash_operator import BashOperator

task_hello = BashOperator(task_id='print_hello', 
                          bash_command='echo "hello"', dag=dag)
task_sleep = BashOperator(task_id='sleep', bash_command='sleep 5',
                          dag=dag)
task_world = BashOperator(task_id='print_world',
                          bash_command='echo "world"', retries=3,
                          dag=dag)

Dependencies in actions are added by setting other actions as upstream (or downstream). Link the operations in a chain so that task_sleep will be run after task_hello and is followed by task_world; task_hello -> task_sleep -> task_world:

task_sleep.set_upstream(task_hello)
task_world.set_upstream(task_sleep)

Your final DAG should look something like:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
from datetime import timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}


dag = DAG('airflow_tutorial', default_args=default_args,
          schedule_interval=timedelta(days=1))


task_hello = BashOperator(task_id='print_hello',
                          bash_command='echo "hello"', dag=dag)
task_sleep = BashOperator(task_id='sleep', bash_command='sleep 5',
                          dag=dag)
task_world = BashOperator(task_id='print_world',
                          bash_command='echo "world"', retries=3,
                          dag=dag)


task_sleep.set_upstream(task_hello)
task_world.set_upstream(task_sleep)

Check that the DAG is valid by executing the file with python:

$ python airflow_tutorial.py

Run the job

Airflow checks for DAGs in its $AIRFLOW_HOME/dags/ folder. Move airflow_tutorial.py to the folder dags/ (or ~/airflow/dags if you didn't set $AIRFLOW_HOME`). Your job is automatically picked up and scheduled to run.

You can manually test a single task with airflow test:

airflow test airflow_tutorial print_world 2016-07-01

This runs the task locally as if it was for the given date, ignoring other tasks and without communication to the database. You should see

Use airflow run to manually run a task with its dependencies for a given date.

airflow run airflow_tutorial print_world 2016-07-01

If you want to backfill hobs over a period, specify a start and end date with -s and -e:

airflow backfill airflow_tutorial -s 2016-10-01 -e 2016-10-05

Conclusion

You've set up Airflow, created a DAG and ran a job. Try changing the interval to every minute, implement templating as in the original tutorial and checking our more example DAGs. Read the docs before really using Airflow.

airflow-tutorial's People

Watchers

 avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.