Giter Club home page Giter Club logo

airflow-plugin's Introduction

Lithops - Apache Airflow Plugin

This repository contains an Apache Airflow Plugin that implements new operators to easily deploy serverless functions tasks using Lithops.

Lithops is a Python multicloud library for running serverless jobs. Litops transparently runs local sequential code over thousands of serverless functions. This plugin allows Airflow to benefit from serverless functions to achieve higher performance for highly parallelizable tasks such as big data analysis workflows whithout consuming all the resources of the cluser where Airflow is running on or without having to provision Airflow workers using Celery executor.

Contents

  1. Installation
  2. Usage
  3. Examples

Usage

Operators

This plugin provides three new operators.


Important note: Due to the way Airflow manages DAGs, the callables passed to the Lithops operators can not be declared in the DAG definition script. Instead, they must be declared inside a separate file or module. To access the functions from the DAG file, import them as regular modules.


  • LithopsCallAsyncOperator

    It invokes a single function.

    Parameter Description Default
    func Python callable mandatory
    data Key word arguments {}
    data_from_task Get the output from another task as an input parameter for this function None

    Example:

    def add(x, y):
    	return x + y
    from my_functions import add
    my_task = LithopsCallAsyncOperator(
        task_id='add_task',
        func=add,
        data={'x' : 1, 'y' : 3},
        dag=dag,
    )
    # Returns: 
    4
    from my_functions import add
    basic_task = LithopsCallAsyncOperator(
        task_id='add_task_2',
        func=add,
        data={'x' : 4},
        data_from_task={'y' : 'add_task_1'},
        dag=dag,
    )
    # Returns: 
    8
  • LithopsMapOperator

    It invokes multiple parallel tasks, as many as how much data is in parameter map_iterdata. It applies the function map_function to every element in map_iterdata:

    Parameter Description Default Type
    map_function Python callable. mandatory callable
    map_iterdata Iterable. Invokes a function for every element in iterdata mandatory Has to be iterable
    iterdata_form_task Gets the input iterdata from another function's output None Has to be iterable
    extra_params Adds extra key word arguments to map function's signature None dict
    chunk_size Splits the object in chunks, and every chunk gets this many bytes as input data (on invocation per chunk) None int
    chunk_n Splits the object in N chunks (on invocation per chunk) None int
    remote_invocation Activates pywren's remote invocation functionality False bool
    invoke_pool_threads Number of threads to use to invoke 500 int

    Example:

    def add(x, y):
    	return x + y
    from my_functions import add
    map_task = LithopsMapOperator(
        task_id='map_task',
        map_function=add,
        map_iterdata=[1, 2, 3],
        extra_params={'y' : 1},
        dag=dag,
    )
    # Returns: 
    [2, 3, 4]
  • LithopsMapReduceOperator

    It invokes multiple parallel tasks, as many as how much data is in parameter map_iterdata. It applies the function map_function to every element in iterdata. Finally, a single reduce_function is invoked that gathers all the map results.

    Parameter Description Default Type
    map_function Python callable. mandatory callable
    map_iterdata Iterable. Invokes a function for every element in iterdata mandatory Has to be iterable
    reduce_function Python callable. mandatory callable
    iterdata_form_task Gets the input iterdata from another function's output None Has to be iterable
    extra_params Adds extra key word arguments to map function's signature None dict
    map_runtime_memory Memory to use to run the map functions Loaded from config int
    reduce_runtime_memory Memory to use to run the reduce function Loaded from config int
    chunk_size Splits the object in chunks, and every chunk gets this many bytes as input data (on invocation per chunk). 'None' for processing the whole file in one function activation None int
    chunk_n Splits the object in N chunks (on invocation per chunk). 'None' for processing the whole file in one function activation None int
    remote_invocation Activates pywren's remote invocation functionality False bool
    invoke_pool_threads Number of threads to use to invoke 500 int
    reducer_one_per_object Set one reducer per object after running the partitioner False bool
    reducer_wait_local Wait for results locally False bool

    Example:

    def add(x, y):
    	return x + y
    	
    def mult_array(results):
    	result = 1
    	for n in results:
    		result *= 2
    	return result			
    from my_functions import add, mult
    mapreduce_task = LithopsMapReduceOperator(
        task_id='mapreduce_task',
        map_function=add,
        reduce_funtion=mul,
        map_iterdata=[1, 2, 3],
        extra_params={'y' : 1},
        dag=dag,
    )
    # Returns: 
    18

Inherited parameters

All operators inherit a common PyWren operator that has the following parameters:

Parameter Description Default Type
lithops_config Lithops config, as a dictionary {} dict
async_invoke Invokes functions asynchronously, does not wait to function completion False bool
get_result Downloads results upon completion True bool
clean_data Deletes PyWren metadata from COS False bool
extra_env Adds environ variables to function's runtime None dict
runtime_memory Runtime memory, in MB 256 int
timeout Time that the functions have to complete their execution before raising a timeout Default from config int
include_modules Explicitly pickle these dependencies [] list
exclude_modules Explicitly keep these modules from pickled dependencies [] list

License

Apache 2 license

airflow-plugin's People

Contributors

aitorarjona avatar dependabot[bot] avatar josepsampe avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.