Giter Club home page Giter Club logo

dag-factory's Introduction

dag-factory

Github Actions Coverage PyPi Code Style Downloads

Welcome to dag-factory! dag-factory is a library for Apache Airflow® to construct DAGs declaratively via configuration files.

The minimum requirements for dag-factory are:

For a gentle introduction, please take a look at our Quickstart Guide. For more examples, please see the examples folder.

Quickstart

The following example demonstrates how to create a simple DAG using dag-factory. We will be generating a DAG with three tasks, where task_2 and task_3 depend on task_1. These tasks will be leveraging the BashOperator to execute simple bash commands.

screenshot

  1. To install dag-factory, run the following pip command in your Apache Airflow® environment:
pip install dag-factory
  1. Create a YAML configuration file called config_file.yml and save it within your dags folder:
example_dag1:
  default_args:
    owner: 'example_owner'
    retries: 1
    start_date: '2024-01-01'
  schedule_interval: '0 3 * * *'
  catchup: False
  description: 'this is an example dag!'
  tasks:
    task_1:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: 'echo 1'
    task_2:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: 'echo 2'
      dependencies: [task_1]
    task_3:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: 'echo 3'
      dependencies: [task_1]

We are setting the execution order of the tasks by specifying the dependencies key.

  1. In the same folder, create a python file called generate_dags.py. This file is responsible for generating the DAGs from the configuration file and is a one-time setup. You won't need to modify this file unless you want to add more configuration files or change the configuration file name.
from airflow import DAG  ## by default, this is needed for the dagbag to parse this file
import dagfactory
from pathlib import Path

config_file = Path.cwd() / "dags/config_file.yml"
dag_factory = dagfactory.DagFactory(config_file)

dag_factory.clean_dags(globals())
dag_factory.generate_dags(globals())

After a few moments, the DAG will be generated and ready to run in Airflow. Unpause the DAG in the Apache Airflow® UI and watch the tasks execute!

screenshot

Please look at the examples folder for more examples.

Features

Multiple Configuration Files

If you want to split your DAG configuration into multiple files, you can do so by leveraging a suffix in the configuration file name.

# 'airflow' word is required for the dagbag to parse this file
from dagfactory import load_yaml_dags

load_yaml_dags(globals_dict=globals(), suffix=['dag.yaml'])

Dynamically Mapped Tasks

If you want to create a dynamic number of tasks, you can use the mapped_tasks key in the configuration file. The mapped_tasks key is a list of dictionaries, where each dictionary represents a task.

...
  tasks:
    request:
      operator: airflow.operators.python.PythonOperator
      python_callable_name: example_task_mapping
      python_callable_file: /usr/local/airflow/dags/expand_tasks.py # this file should contain the python callable
    process:
      operator: airflow.operators.python_operator.PythonOperator
      python_callable_name: expand_task
      python_callable_file: /usr/local/airflow/dags/expand_tasks.py
      partial:
        op_kwargs:
          test_id: "test"
      expand:
        op_args:
          request.output
      dependencies: [request]

mapped_tasks_example.png

Datasets

dag-factory supports scheduling DAGs via Apache Airflow Datasets.

To leverage, you need to specify the Dataset in the outlets key in the configuration file. The outlets key is a list of strings that represent the dataset locations. In the schedule key of the consumer dag, you can set the Dataset you would like to schedule against. The key is a list of strings that represent the dataset locations. The consumer dag will run when all the datasets are available.

producer_dag:
  default_args:
    owner: "example_owner"
    retries: 1
    start_date: '2024-01-01'
  description: "Example DAG producer simple datasets"
  schedule_interval: "0 5 * * *"
  tasks:
    task_1:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: "echo 1"
      outlets: [ 's3://bucket_example/raw/dataset1.json' ]
      operator: airflow.operators.bash_operator.BashOperator
    task_2:
      bash_command: "echo 2"
      dependencies: [ task_1 ]
      outlets: [ 's3://bucket_example/raw/dataset2.json' ]
consumer_dag:
  default_args:
    owner: "example_owner"
    retries: 1
    start_date: '2024-01-01'
  description: "Example DAG consumer simple datasets"
  schedule: [ 's3://bucket_example/raw/dataset1.json', 's3://bucket_example/raw/dataset2.json' ]
  tasks:
    task_1:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: "echo 'consumer datasets'"

datasets_example.png

Custom Operators

dag-factory supports using custom operators. To leverage, set the path to the custom operator within the operator key in the configuration file. You can add any additional parameters that the custom operator requires.

...
  tasks:
    begin:
      operator: airflow.operators.dummy_operator.DummyOperator
    make_bread_1:
      operator: customized.operators.breakfast_operators.MakeBreadOperator
      bread_type: 'Sourdough'

custom_operators.png

Notes

HttpSensor (since 0.10.0)

The package airflow.sensors.http_sensor works with all supported versions of Airflow. In Airflow 2.0+, the new package name can be used in the operator value: airflow.providers.http.sensors.http

The following example shows response_check logic in a python file:

task_2:
  operator: airflow.sensors.http_sensor.HttpSensor
  http_conn_id: 'test-http'
  method: 'GET'
  response_check_name: check_sensor
  response_check_file: /path/to/example1/http_conn.py
  dependencies: [task_1]

The response_check logic can also be provided as a lambda:

task_2:
  operator: airflow.sensors.http_sensor.HttpSensor
  http_conn_id: 'test-http'
  method: 'GET'
  response_check_lambda: 'lambda response: "ok" in reponse.text'
  dependencies: [task_1]

Benefits

  • Construct DAGs without knowing Python
  • Construct DAGs without learning Airflow primitives
  • Avoid duplicative code
  • Everyone loves YAML! ;)

Contributing

Contributions are welcome! Just submit a Pull Request or Github Issue.

dag-factory's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dag-factory's Issues

This module is deprecated. Please use `kubernetes.client.models.V1Volume`

Symptom

~/.pyenv/versions/3.9.6/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py:26 DeprecationWa
rning: This module is deprecated. Please use `kubernetes.client.models.V1Volume`.                                                                            
~/.pyenv/versions/3.9.6/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py:27 DeprecationWa
rning: This module is deprecated. Please use `kubernetes.client.models.V1VolumeMount`.    

Traceback

  File "~/.pyenv/versions/3.9.6/lib/python3.9/site-packages/dagfactory/__init__.py", line 2, in <module>
    from .dagfactory import DagFactory                                                                                                                         
  File "~/.pyenv/versions/3.9.6/lib/python3.9/site-packages/dagfactory/dagfactory.py", line 8, in <module>
    from dagfactory.dagbuilder import DagBuilder                                                                                                               
  File "~/.pyenv/versions/3.9.6/lib/python3.9/site-packages/dagfactory/dagbuilder.py", line 10, in <module>
    from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator             
  File "~/.pyenv/versions/3.9.6/lib/python3.9/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 25, in <module>
    from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator  # noqa

Support lambda in python_callable

Right now we need to use file and name for python callable. It should be able to take lambda as well as shown below:

task_2:
      operator: airflow.sensors.http_sensor.HttpSensor
      http_conn_id: 'test-http'
      method: 'GET'
      response_check_lambda: 'lambda response: "ok" in reponse.text'
      dependencies: [task_1]

I have been able to make it work and will raise a pull request for it.

Call user defined functions when building DAGs

Hi @ajbosco,

We have defined some Python functions that return TaskGroup. We call these functions to create DAGs.

In Python, a function like make_task_group_X could be used like this:

with DAG(...) as dag:
       g1 = make_task_group_a(args...)
       g2 = make_task_group_b(args...)
       g3 = make_task_group_c(args...)
start >> g1 >> g2 >> g3 >> end

With dag-factory, is it possible to call user defined functions to build up a DAG?

SubDag Support

Hey!

I was testing some use cases to use it at our company, and was wondering if there is any support for SubDag operator?
I played around a bit and couldn't make it work.

Congrats on the great lib.

Cheers

support dynamically extract dag params in dagbuilder

Not sure why in the dag builder the config param parsing is a bit manual. Is it possible to get airflow DAG attribute and fill in if specified?

The specific use case is that I want to use the field access_control in the DAG object.

Deserialise arbitrary types in python

Most part of params that the DAG and the Operators and Sensors underneath take are basic/raw types (string, int, bool, float) and basic objects (lists, dicts). But from time to time we require some python object. For example:

An example for the ExternalTaskSensor

example_external_task_sensor:
  default_args:
    start_date: 2021-09-01
  schedule_interval: @daily
  tasks:
    external_task_sensor:
      operator: airflow.sensors.external_task.ExternalTaskSensor
      external_dag_id: example_upstream_dependency_dag
      external_task_id: example_upstream_dependency_task
      execution_delta: !!python/object/apply:datetime.timedelta [0, 300]

And also an example for deserialising a reference to the callable straight from the yaml, lets suppose we are instantiating this yaml from a file main_dag_file.py that also contains a python function we want to call (so that it belongs to the PYTHONPATH):

# main_dag_file.py
def add_two_numbers(first_operator: int, second_operator: int) -> int:
  return first_operator + second_operator
example_call_python_code:
  default_args:
    start_date: 2021-09-01
  schedule_interval: @daily
  tasks:
    example_python_task:
      operator: airflow.operators.python.PythonOperator
      python_callable: !!python/name:main_dag_file.add_two_numbers
      op_wkargs:
        first_operator: 3
        second_operator: 2

In addition sometimes it's useful to both:

  • Have your DAG as a config file with it's jinja templating ready for airflow.
  • Have a render engine on top to be able to make references, modify strings, pass function return as a param for an operator.
# main_dag_file.py
def decide_executor() -> str:
  environment = os.getenv('EXECUTION_ENVIRONMENT')
  if environment == 'prod':
    executor = 'kubernetes.KubernetesPodOperator'
  else:
    executor = 'bash.BashOperator'
  return 'airflow.operators.' + executor
# pass environment variables to the function
example_call_python_code:
  default_args:
    start_date: 2021-09-01
  schedule_interval: @daily
  tasks:
    example_python_task:
      operator: !!python/object/main_dag_file.decide_executor()
      some_param: {{ var.value.my_variable }}

This can also be solved by being able to add python elements to the yaml file. This is a fine line. Config files should be language agnostic, I do know, that's why I open this Issue for the community to have their call too.

To implement this we should use yaml.load instead of the yaml.load_safe. As we are the owners of the code there's no risk on doing so. This feature (load_safe) is in place to prevent injection attacks.

Also, this feature will render unnecessary the current way of attaching the callable to a PythonOperator, and it complies with the exact operator params.

What are your thoughts?

Getting rid off long imports

Could we get rid off this long import?
airflow.operators.bash_operator.BashOperator

Would be greate to have
BashOperator
instead

template_searchpath seems to not work in YAML file

Hi, first of all, nice job on this project.

I'm trying to configure a DAG pointing to scripts in a path different from the DAG's path so, I'm setting the template_searchpath in YAML config to point to this location but I'm receiving the error: jinja2.exceptions.TemplateNotFound: /opt/airflow/scripts/query.sql so I guess the template_searchpath is not working.
If I place the query.sql in the DAG's path, it works.

Here is my YAML

default:
  default_args:
    owner: "dag-factory"
    start_date: 2 days
    retries: 1
    retry_delay_sec: 300
  concurrency: 1
  max_active_runs: 1
  dagrun_timeout_sec: 600
  default_view: "graph"
  orientation: "LR"
  schedule_interval: "0 1 * * *"
  catchup: False
  tags: ['dag-factory']
  template_searchpath: ["/opt/airflow/scripts"]

example_sql_exec_query:
  description: "DAG to run SQL query"
  tasks:
    begin:
      operator: airflow.operators.dummy_operator.DummyOperator
    exec_sql:
      operator: airflow.operators.postgres_operator.PostgresOperator
      postgres_conn_id: "postgres"
      sql: /opt/airflow/scripts/query.sql # it doesn't work.
      params: {"columns": "dag_id, owners", "table": "public.dag", "limit": 10}
      dependencies: [begin]

Does anybody with the same issue?

execution_timeout should have converted to timedelta

Hi. I need to use default execution_timeout parameter. But task fails because dag factory isn't convert it to timedelta. I check your code to see how you handle retry_delay and I saw below code. Same convertion should be also done for execution_timeout. Maybe better solution would be to convert all parameters to timedelta which ends with '_sec'

    if utils.check_dict_key(dag_params["default_args"], "retry_delay_sec"):
       dag_params["default_args"]["retry_delay"]: timedelta = timedelta(
          seconds=dag_params["default_args"]["retry_delay_sec"]
       )
       del dag_params["default_args"]["retry_delay_sec"]

Does this require passing the YAML to airflow?

Adam,

Great looking library, how do you deploy the YAML to airflow with the DAG factory?

I'm just diving into Airflow now, so any guidance is helpful. Airflow distributed the DAG itself, but every node will need to re-factory the DAG, so the YAML has to be available in the airflow image?

Need help on Sparksubmitoperator

Hi,
Hope Sparksubmitoperator, hdfs sensor operator and many operators are not included in the dag-factory.

Could you please add that as well.

Also would be great If I get sample yaml file for sparksubmitoperator.

How to delete a DAG?

A DAG created through this tool is not able to be deleted (due to the file path) even if I comment the part that is used by the DAG.

Do you have any plans to include that ?

Thanks!

Dag factory as a DAG

Hey there,

Thank you by the library, it is awesome. I'm trying to use it though and didn't manage to. My case is the following:

I want to offer a way to users create their own dags using a small UI builder (no code solution). I already have that working, now I'm trying to configure the dag-factory to create those DAGs. Therefore, I can't add the script in the dags folder as described in the readme. Instead I created the following dag:

from datetime import timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

from dagfactory import DagFactory

default_args = {
    'owner': 'querylayer',
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=1)
}

def create_dag(**kwargs):
    dag_factory = DagFactory(config=kwargs['dag_run'].conf)

    dag_factory.clean_dags(globals())
    dag_factory.generate_dags(globals())
    print(globals().get('dag_name'))

with DAG('create_workflow', default_args=default_args, schedule_interval=None, start_date=days_ago(0), catchup=False) as dag:
    create_workflow = PythonOperator(
        task_id='create_workflow',
        provide_context=True,
        python_callable=create_dag,
        dag=dag
    )

    create_workflow

The idea is that we pass the config as JSON to this DAG it will create the dag for you. So far so good, I tried running this DAG with the config below and it runs successfully:

{
    "dag_name": {
      "dag_id": "dag_name",
      "schedule_interval": "None",
      "description": "some description",
      "doc_md": "# Title",
      "catchup": false,
      "concurrence": 2,
      "max_active_runs": 2,
      "task_groups": {},
      "default_args": {
          "start_date": "2021-09-28"
      },
      "tasks": {
        "task_name": {
          "operator": "airflow.operators.bash_operator.BashOperator",
          "bash_command": "echo 1"
        },
  
        "task_name2": {
          "operator": "airflow.operators.bash_operator.BashOperator",
          "bash_command": "echo 2",
          "dependencies": ["task_name"]
        }
      }
    }
  }

Below I put the output of the DAG execution, you can see that the line print(globals().get('dag_name')) actually prints the dag created:

*** Reading local file: /opt/airflow/logs/create_workflow/create_workflow/2021-09-28T18:46:06.674663+00:00/3.log
[2021-09-28 18:51:49,399] {taskinstance.py:876} INFO - Dependencies all met for <TaskInstance: create_workflow.create_workflow 2021-09-28T18:46:06.674663+00:00 [queued]>
[2021-09-28 18:51:49,422] {taskinstance.py:876} INFO - Dependencies all met for <TaskInstance: create_workflow.create_workflow 2021-09-28T18:46:06.674663+00:00 [queued]>
[2021-09-28 18:51:49,422] {taskinstance.py:1067} INFO - 
--------------------------------------------------------------------------------
[2021-09-28 18:51:49,422] {taskinstance.py:1068} INFO - Starting attempt 3 of 5
[2021-09-28 18:51:49,424] {taskinstance.py:1069} INFO - 
--------------------------------------------------------------------------------
[2021-09-28 18:51:49,450] {taskinstance.py:1087} INFO - Executing <Task(PythonOperator): create_workflow> on 2021-09-28T18:46:06.674663+00:00
[2021-09-28 18:51:49,460] {standard_task_runner.py:52} INFO - Started process 58 to run task
[2021-09-28 18:51:49,461] {standard_task_runner.py:76} INFO - Running: ['***', 'tasks', 'run', 'create_workflow', 'create_workflow', '2021-09-28T18:46:06.674663+00:00', '--job-id', '225', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/create_workflow.py', '--cfg-path', '/tmp/tmp1z29zp0e', '--error-file', '/tmp/tmp29nl9faf']
[2021-09-28 18:51:49,461] {standard_task_runner.py:77} INFO - Job 225: Subtask create_workflow
[2021-09-28 18:51:49,496] {logging_mixin.py:104} INFO - Running <TaskInstance: create_workflow.create_workflow 2021-09-28T18:46:06.674663+00:00 [running]> on host c1e986c90b97
[2021-09-28 18:51:49,538] {taskinstance.py:1282} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=querylayer
AIRFLOW_CTX_DAG_ID=create_workflow
AIRFLOW_CTX_TASK_ID=create_workflow
AIRFLOW_CTX_EXECUTION_DATE=2021-09-28T18:46:06.674663+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-09-28T18:46:06.674663+00:00
[2021-09-28 18:51:49,540] {logging_mixin.py:104} INFO - <DAG: dag_name>
[2021-09-28 18:51:49,540] {python.py:151} INFO - Done. Returned value was: None
[2021-09-28 18:51:49,558] {taskinstance.py:1191} INFO - Marking task as SUCCESS. dag_id=create_workflow, task_id=create_workflow, execution_date=20210928T184606, start_date=20210928T185149, end_date=20210928T185149
[2021-09-28 18:51:49,599] {taskinstance.py:1245} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-09-28 18:51:49,633] {local_task_job.py:151} INFO - Task exited with return code 0

But even tho everything runs successfully, the DAG isn't displayed in the dag list, we can see that using the command line to list all dags:

default@c1e986c90b97:/opt/airflow$ airflow dags list
/home/airflow/.local/lib/python3.6/site-packages/airflow/configuration.py:346 DeprecationWarning: The hide_sensitive_variable_fields option in [admin] has been moved to the hide_sensitive_var_conn_fields option in [core] - the old setting has been used, but please update your config.
/home/airflow/.local/lib/python3.6/site-packages/airflow/configuration.py:346 DeprecationWarning: The default_queue option in [celery] has been moved to the default_queue option in [operators] - the old setting has been used, but please update your config.
/home/airflow/.local/lib/python3.6/site-packages/airflow/configuration.py:346 DeprecationWarning: The default_queue option in [celery] has been moved to the default_queue option in [operators] - the old setting has been used, but please update your config.
dag_id                   | filepath                    | owner      | paused
=========================+=============================+============+=======
create_workflow          | create_workflow.py          | querylayer | False 

Am I missing something? Is this approach viable or there is a better way to do so?
Another question: are those DAGs stored in the DB somehow or it is created only in memory? I saw that you register that in the globals variable, but this means if somehow airflow restart it would not start with previously created dags, am I right?

Airflow 2.2.0 compability

I tried to upgrade airflow to 2.2.0 and got below error.

Broken DAG: [/opt/airflow/dags/****.py] Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/configuration.py", line 373, in get
    return self._get_option_from_default_config(section, key, **kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/configuration.py", line 383, in _get_option_from_default_config
    raise AirflowConfigException(f"section/key [{section}/{key}] not found in config")
airflow.exceptions.AirflowConfigException: section/key [core/dag_concurrency] not found in config

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/dagfactory/dagfactory.py", line 137, in clean_dags
    dags: Dict[str, Any] = self.build_dags()
  File "/home/airflow/.local/lib/python3.8/site-packages/dagfactory/dagfactory.py", line 102, in build_dags
    raise Exception(
Exception: Failed to generate dag ****. verify config is correct

I guess it is because of this change.

not able to import dagfacotry

Hi ,
first, amazing project,
hope it still running:)

I'm facing issue importing the dag-facotry package,
see below details

$ airflow version
2.1.2
$ pip list|grep dag
dag-factory 0.9.1
$ python -V
Python 3.6.13
$ python
Python 3.6.13 (default, May 12 2021, 16:48:24)
[GCC 8.3.0] on linux

>>> from airflow import DAG
>>> import dagfactory
/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py:26 DeprecationWarning: This module is deprecated. Please use `kubernetes.client.models.V1Volume`.
/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py:27 DeprecationWarning: This module is deprecated. Please use `kubernetes.client.models.V1VolumeMount`.
>>>

start_date and end_date get their time component removed

While using the dag-factory to create dags on Apache airflow, I noticed a very strange behavior. Suppose while creating a dag, input start_date is "2020-04-14T16:20:30.000Z". On airflow UI, it is shown as "2020-04-14T00:00:00.000Z" (Notice how the time component is removed).

I did a little bit of searching and I found out this issue to be due to this piece of code in utils.py:

 if isinstance(date_value, date):
    return datetime.combine(date=date_value, time=datetime.min.time()).replace(
        tzinfo=local_tz
    ) 
if isinstance(date_value, datetime):
    return date_value.replace(tzinfo=local_tz)

If I pass a datetime.datetime object as date_value, then it will match with the first if and that will remove the time component of my input.
Can you suggest a workaround if I am doing something wrong?

dag-factory==0.4.0 breaks our use of dag-factory with an airflow templating error

relevant packages:

apache-airflow[crypto]==1.10.2
dag-factory==0.4.0

dag-factory yaml:

registry_of_metrics:
  default_args:
    provide_context: True
    depends_on_past: True
    retries: 15
    start_date: 2018-10-01
  schedule_interval: '0 9 1 * *'
  description: 'registry of metric dag'

  tasks:
    # Paying dealers metric
    staging_paying_dealers:
      operator: operators.DockerRunOperator
      entrypoint: 'bin/generate_report.py'
      task_id: 'paying_dealers'
      arguments:
        date_name: 'as_of_date'
        sql_class: lib.models.reports.staging_paying_dealers.StagingPayingDealersReport
        schema: 'SCORECARD_STAGING'
        table: 'PAYING_DEALERS'
        schedule: 'monthly'
      template_arguments:
        date: 'next_execution_date'
    paying_dealers:
      operator: operators.DockerRunOperator
      entrypoint: 'bin/generate_report.py'
      task_id: 'paying_dealers'
      dependencies: [staging_paying_dealers]
      arguments:
        date_name: 'as_of_date'
        sql_class: lib.models.reports.paying_dealers.PayingDealersReport
        schema: 'SCORECARD'
        table: 'METRIC_MONTH_COUNTRY_DATA'
        schedule: 'monthly'
        metric_id: 3
      template_arguments:
        date: 'next_execution_date'
    # Session uniques metric
    session_uniques:
      operator: operators.DockerRunOperator
      entrypoint: 'bin/generate_report.py'
      task_id: 'session_uniques'
      arguments:
        date_name: 'as_of_date'
        sql_class: lib.models.reports.session_uniques.SessionUniquesReport
        schema: 'SCORECARD'
        table: 'METRIC_MONTH_COUNTRY_DATA'
        schedule: 'monthly'
        metric_id: 9
      template_arguments:
        date: 'next_execution_date'

Traceback (most recent call last):
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask/app.py", line 1982, in wsgi_app
    response = self.full_dispatch_request()
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask/app.py", line 1614, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask/app.py", line 1517, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask/_compat.py", line 33, in reraise
    raise value
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask/app.py", line 1612, in full_dispatch_request
    rv = self.dispatch_request()
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask/app.py", line 1598, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask_admin/base.py", line 69, in inner
    return self._run_view(f, *args, **kwargs)
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask_admin/base.py", line 368, in _run_view
    return fn(self, *args, **kwargs)
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask_login/utils.py", line 258, in decorated_view
    return func(*args, **kwargs)
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/airflow/utils/db.py", line 73, in wrapper
    return func(*args, **kwargs)
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/airflow/www/views.py", line 2224, in index
    auto_complete_data=auto_complete_data)
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask_admin/base.py", line 308, in render
    return render_template(template, **kwargs)
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask/templating.py", line 134, in render_template
    context, ctx.app)
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask/templating.py", line 116, in _render
    rv = template.render(context)
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/jinja2/asyncsupport.py", line 76, in render
    return original_render(self, *args, **kwargs)
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/jinja2/environment.py", line 1008, in render
    return self.environment.handle_exception(exc_info, True)
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/jinja2/environment.py", line 780, in handle_exception
    reraise(exc_type, exc_value, tb)
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/jinja2/_compat.py", line 37, in reraise
    raise value.with_traceback(tb)
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/airflow/www/templates/airflow/dags.html", line 18, in top-level template code
    {% extends "airflow/master.html" %}
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/airflow/www/templates/airflow/master.html", line 18, in top-level template code
    {% extends "admin/master.html" %}
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/airflow/www/templates/admin/master.html", line 18, in top-level template code
    {% extends 'admin/base.html' %}
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask_admin/templates/bootstrap3/admin/base.html", line 37, in top-level template code
    {% block page_body %}
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/airflow/www/templates/admin/master.html", line 107, in block "page_body"
    {% block body %}
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/airflow/www/templates/airflow/dags.html", line 88, in block "body"
    <a href="{{ url_for('airflow.'+dag.default_view, dag_id=dag.dag_id) }}" title="{{ dag.description }}">
TypeError: can only concatenate str (not "NoneType") to str

DAG import broken in 0.9.0

The latest release is not working for our DAGs, with 0.9.0 we get this error on DAGs that work fine with 0.8.0

Broken DAG: [/usr/local/airflow/dags/dcetl/dcetl_dark_dag.py] Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/dagfactory/dagbuilder.py", line 118, in get_dag_params
    dag_params["default_args"]["on_success_callback"]
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/module_loading.py", line 28, in import_string
    module_path, class_name = dotted_path.rsplit('.', 1)
AttributeError: 'function' object has no attribute 'rsplit'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/dagfactory/dagfactory.py", line 136, in clean_dags
    dags: Dict[str, Any] = self.build_dags()
  File "/usr/local/lib/python3.7/site-packages/dagfactory/dagfactory.py", line 103, in build_dags
    ) from err
Exception: Failed to generate dag dcetl_dag_dark. verify config is correct

Looks like the issue is caused by how callbacks in our customizations are handled - with 0.8.0 we do this:

        config = dagfactory.DagFactory._load_config(config_filepath)
        config["default_args"]["on_success_callback"] = PagerDutyHook().on_success_callback
        config["default_args"]["on_failure_callback"] = PagerDutyHook().on_failure_callback

With 0.9.0 this means the PagerDutyHook is passed to the import_string method, so dotted_path in import_string isn't a string it is

<bound method PagerDutyHook.on_success_callback of <pagerduty.hooks.pagerduty.PagerDutyHook object at 0x7f2293776750>>

Best practices for deployment

Hey there,
this is an incredible extension to airflow. I'm very excited about it, but I'm question my self how to deploy dags properly (I'm far new to airflow). In my tests a change of the yaml file and push to airflow changes a running jobs instantly (and executes the new tasks, which I expect to start in the next run).

Could you share a best practices how to deploy the yaml files and maybe how to version the dags?

Thanks in advance.

Time taken to create dag in Airflow from yaml file

Nameste @ajbosco Thank you for this great library.

I got a question.
I am wondering if the time taken by dag-factory to create a dag on airflow is configurable? I am seeing a time gap of about 3 minutes between I placing a yaml file of tasks (and the python configuration) and seeing the dag on Airflow UI.

Thanks again.

Airflow 2.0.0 compability

Hi @ajbosco
Did you have chance to try dagfactory on airflow 2.0? I wonder if I will have problem if I upgrade to 2.0? If it is not compatible, are you planning to release a version for airflow 2.0?

Add json schema validation for yaml files

Hi there 👋

When I write code I always use a linter, my IDE tells me I'm about to make a mess by not importing certain library or not using the right type.

This does not apply to config files, They're super nice because config "always compiles" 😉 . But the code running those config files not always like it. So to have a little bit more control I try to implement "schemas" on those config files. They're looser than "clases", also language independent, easier to maintain and to run.

It would be nice to have something like

import yaml
from jsonschema import validate

if __name__ == '__main__':
    schema = """
        type: object
        properties:
          default_args:
            type: object
            properties:
              start_date:
                type: string
          schedule_interval:
            type: string
          catchup:
            type: boolean
          tasks:
            type: array
            items:
              type: object
              properties:
                operator:
                  type: string
                dependencies:
                  type: array
                  items:
                    type: string
    """
    validate(
        instance=yaml.safe_load(open('path/to/my_dag.yml')),
        schema=yaml.safe_load(schema)
    )

But this schema is the one I generated, it doesn't distinguish between airflow versions, it's not up to date with the library. It would be nice to have a check like this. Pycharm picks up those schemas to validate things like .gitlab-ci.yaml. So we could as well be writing DAGs the "safest" way with free linting.

Also a though. But this way the codebase could ease up a little bit on the yaml validation. Reducing the core codebase makes it easier to read and maintain.

Regardless of what you think of this porposal, thanks for your work, and for open sourcing this library. It rocks 🚀 !

Unexpected keyword argument 'tags'

When trying the example in this repo as-is on GCP's Cloud Composer with Airflow 1.10.6, it runs into this error when trying to parse the DAG:

verify config is correct. err:__init__() got an unexpected keyword argument 'tags'

When I downgrade dag-factory to 0.4.2, it works as advertised.

Can we separate out generating DAG and setting it into Airflow's global?

Over here generating DAG & setting into Airflow's global is tied up together in the single method.
https://github.com/ajbosco/dag-factory/blob/bce607f95d88cbe868f0268abe23d4c34f444454/dagfactory/dagfactory.py#L66-L84

If I want to save the generated DAG into DB for versioning or serialise & send it over network, this is not possible with this method currently.

I suggest that, we return the generated DAG and we create another method set_dag() which will set the dag in Airflow's global namespace.

Read multiple YAML files in the same python file

Hello,

This is not really an issue but more a question, I am wondering if you would recommend to have the following : instead of having one python file that would read one yaml file and construct a DAG, you construct multiple DAGs in the same python file by reading several YAML files.

From what I saw, the bottleneck would be at the worker level because it also has to parse the DAG to understand what it needs to do, and thus in this situation the worker would parse all the YAML files while it only needed to parse one. I did not find out if there is a way to stipulate that information to the airflow worker to not having it to parse something it doesn't need.

Regards
Ferdinand

Unable to pass date macros

Macros like {{ ds }}, {{ prev_ds }} which are very critical for backfill do not work. Is there a way to do it?

CWL ?

Does DAG Factory follow common workflow language or CWL in terms of it's yaml syntax?

sla_miss_callback

sla_miss_callback functionality should be like below

    if utils.check_dict_key(
        dag_params, "on_success_callback_name"
    ) and utils.check_dict_key(dag_params, "on_success_callback_file"):
        dag_params["on_success_callback"]: Callable = utils.get_python_callable(
            dag_params["on_success_callback_name"],
            dag_params["on_success_callback_file"],
        )
        dag_params["default_args"]["on_success_callback"]: Callable = utils.get_python_callable(
            dag_params["on_success_callback_name"],
            dag_params["on_success_callback_file"],
        )

Help Required: XComs in YAML file

I am trying to create dynamic DAGs using YAML template and dag-factory. With a small example I am able to access MSSQL database and insert data. As a next step I want to perform EL(T)/E(T)L and from airflow side I can perform the extract from DB1 and load to DB(2) with XComs (for small datasets or S3 as XCom backend) . But how would dag-factory handle the XComs or how can I pass the extract data to the load step (keyword in YAML). Apologize if I missed something obvious in the documentation.

Thank you.

Nested Subgroups

Hello,

I have tried to specify a TaskGroup within a TaskGroup, but it seems that the "make_task_group" method does not support that.

Have I overlooked something, or is it not supported?

Best regards from Austria,
Andreas

Get the error "Broken DAG: [/usr/local/airflow/dags/example_dag_factory.py] No module named 'dagfactory'" in airflow UI

I followed the instructions in the README file to install dag-factory in a local airflow container, in which the airflow version is 1.10.12 and python version is 3.8.9.

I copied the files example_dag_factory.yml, example_dag_factory.py and print_hello.py in https://github.com/ajbosco/dag-factory/blob/master/examples into the folder /usr/local/airflow/dags in the container, expecting the DAGs defined in the yml file can be recognized. However, I see the following error in the airflow UI.

image

Not sure what I missed. Any helps are appreciated.

How to access Airflow variables and parameters passed via REST API

I have a use case where I am accessing Airflow's variable using Variable.get(). How can I achieve this in yaml file.

Also, how can I access parameters (params.app_name, params.namespace etc) which I am passing via REST API as follows:

{ "conf": { "app_name":"app namet", "region":"", "namespace": "", "default_path": "", } }

env_variables = Variable.get("env_variables", deserialize_json=True)
fetch_yaml = BashOperator(
        bash_command="export {{var.json.env_variables['kube_config'] && kubectl get sparkapplications {{ params.app_name}} -n {{params.namespace}}",
        task_id="fetch_yaml"
    )

dag-factory PythonOperator

Hi Adam,
I try to use the pythonoperator with the dagfactory, I tried to define a function within the python file and call this function as python_callable and get always the following error message:

make sure config is properly populated. err:Failed to create <class 'airflow.operators.python_operator.PythonOperator'> task. err: python_callableparam must be callable

As long as I write the task directly within the python file its no problem to call the function.
Am I doing something wrong or is it not possible to use the pythonoperator this way?

DAG file:

from airflow import DAG
import dagfactory
dag_factory = dagfactory.DagFactory("/path/to/dags/config_file.yml")
dag_factory.generate_dags(globals())

# [START howto_operator_python]
def print_context(ds, **kwargs):
    pprint(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'

YAML config:

Python_Test:
  default_args:
    owner: 'user'
    start_date: 2019-05-28
  description: 'this is an sample dag which runs every day'
  tasks:
    test_1:
      operator: airflow.operators.python_operator.PythonOperator
      task_id: 'pythonoperator_Testtask'
      python_callable: print_context

Support for python callable response_check in HttpSensor

Need to be able to create a callable for response_check as below:

    task_3:
      operator: airflow.providers.http.sensors.http.HttpSensor
      task_id: 'http_sensor'
      http_conn_id: 'test-http'
      endpoint: ''
      method: 'GET'
      headers: 
        header1: 'value1'
      response_check_name: check_sensor
      response_check_file: /opt/airflow/dags/repo/dags/example1/http_conn.py
      poke_interval: 5        
      dependencies: [task_1]

Dependabot can't resolve your Python dependency files

Dependabot can't resolve your Python dependency files.

As a result, Dependabot couldn't update your dependencies.

The error Dependabot encountered was:

ERROR: ERROR: Could not find a version that matches werkzeug<0.15.0,>=0.12,>=0.14.1,>=0.15
Tried: 0.1, 0.2, 0.3, 0.3.1, 0.4, 0.4.1, 0.5, 0.5.1, 0.6, 0.6.1, 0.6.2, 0.7, 0.7.1, 0.7.2, 0.8, 0.8.1, 0.8.2, 0.8.3, 0.9, 0.9.1, 0.9.2, 0.9.3, 0.9.4, 0.9.5, 0.9.6, 0.10, 0.10.1, 0.10.2, 0.10.2, 0.10.4, 0.10.4, 0.11, 0.11, 0.11.1, 0.11.1, 0.11.2, 0.11.2, 0.11.3, 0.11.3, 0.11.4, 0.11.4, 0.11.5, 0.11.5, 0.11.6, 0.11.6, 0.11.7, 0.11.7, 0.11.8, 0.11.8, 0.11.9, 0.11.9, 0.11.10, 0.11.10, 0.11.11, 0.11.11, 0.11.12, 0.11.12, 0.11.13, 0.11.13, 0.11.14, 0.11.14, 0.11.15, 0.11.15, 0.12, 0.12, 0.12.1, 0.12.1, 0.12.2, 0.12.2, 0.13, 0.13, 0.14, 0.14, 0.14.1, 0.14.1, 0.15.0, 0.15.0, 0.15.1, 0.15.1, 0.15.2, 0.15.2, 0.15.3, 0.15.3, 0.15.4, 0.15.4
There are incompatible versions in the resolved dependencies.
[pipenv.exceptions.ResolutionFailure]:       req_dir=requirements_dir
[pipenv.exceptions.ResolutionFailure]:   File "/usr/local/.pyenv/versions/3.6.8/lib/python3.6/site-packages/pipenv/utils.py", line 726, in resolve_deps
[pipenv.exceptions.ResolutionFailure]:       req_dir=req_dir,
[pipenv.exceptions.ResolutionFailure]:   File "/usr/local/.pyenv/versions/3.6.8/lib/python3.6/site-packages/pipenv/utils.py", line 480, in actually_resolve_deps
[pipenv.exceptions.ResolutionFailure]:       resolved_tree = resolver.resolve()
[pipenv.exceptions.ResolutionFailure]:   File "/usr/local/.pyenv/versions/3.6.8/lib/python3.6/site-packages/pipenv/utils.py", line 395, in resolve
[pipenv.exceptions.ResolutionFailure]:       raise ResolutionFailure(message=str(e))
[pipenv.exceptions.ResolutionFailure]:       pipenv.exceptions.ResolutionFailure: ERROR: ERROR: Could not find a version that matches werkzeug<0.15.0,>=0.12,>=0.14.1,>=0.15
[pipenv.exceptions.ResolutionFailure]:       Tried: 0.1, 0.2, 0.3, 0.3.1, 0.4, 0.4.1, 0.5, 0.5.1, 0.6, 0.6.1, 0.6.2, 0.7, 0.7.1, 0.7.2, 0.8, 0.8.1, 0.8.2, 0.8.3, 0.9, 0.9.1, 0.9.2, 0.9.3, 0.9.4, 0.9.5, 0.9.6, 0.10, 0.10.1, 0.10.2, 0.10.2, 0.10.4, 0.10.4, 0.11, 0.11, 0.11.1, 0.11.1, 0.11.2, 0.11.2, 0.11.3, 0.11.3, 0.11.4, 0.11.4, 0.11.5, 0.11.5, 0.11.6, 0.11.6, 0.11.7, 0.11.7, 0.11.8, 0.11.8, 0.11.9, 0.11.9, 0.11.10, 0.11.10, 0.11.11, 0.11.11, 0.11.12, 0.11.12, 0.11.13, 0.11.13, 0.11.14, 0.11.14, 0.11.15, 0.11.15, 0.12, 0.12, 0.12.1, 0.12.1, 0.12.2, 0.12.2, 0.13, 0.13, 0.14, 0.14, 0.14.1, 0.14.1, 0.15.0, 0.15.0, 0.15.1, 0.15.1, 0.15.2, 0.15.2, 0.15.3, 0.15.3, 0.15.4, 0.15.4
[pipenv.exceptions.ResolutionFailure]: Warning: Your dependencies could not be resolved. You likely have a mismatch in your sub-dependencies.
  First try clearing your dependency cache with $ pipenv lock --clear, then try the original command again.
 Alternatively, you can use $ pipenv install --skip-lock to bypass this mechanism, then run $ pipenv graph to inspect the situation.
  Hint: try $ pipenv lock --pre if it is a pre-release dependency.
ERROR: ERROR: Could not find a version that matches werkzeug<0.15.0,>=0.12,>=0.14.1,>=0.15
Tried: 0.1, 0.2, 0.3, 0.3.1, 0.4, 0.4.1, 0.5, 0.5.1, 0.6, 0.6.1, 0.6.2, 0.7, 0.7.1, 0.7.2, 0.8, 0.8.1, 0.8.2, 0.8.3, 0.9, 0.9.1, 0.9.2, 0.9.3, 0.9.4, 0.9.5, 0.9.6, 0.10, 0.10.1, 0.10.2, 0.10.2, 0.10.4, 0.10.4, 0.11, 0.11, 0.11.1, 0.11.1, 0.11.2, 0.11.2, 0.11.3, 0.11.3, 0.11.4, 0.11.4, 0.11.5, 0.11.5, 0.11.6, 0.11.6, 0.11.7, 0.11.7, 0.11.8, 0.11.8, 0.11.9, 0.11.9, 0.11.10, 0.11.10, 0.11.11, 0.11.11, 0.11.12, 0.11.12, 0.11.13, 0.11.13, 0.11.14, 0.11.14, 0.11.15, 0.11.15, 0.12, 0.12, 0.12.1, 0.12.1, 0.12.2, 0.12.2, 0.13, 0.13, 0.14, 0.14, 0.14.1, 0.14.1, 0.15.0, 0.15.0, 0.15.1, 0.15.1, 0.15.2, 0.15.2, 0.15.3, 0.15.3, 0.15.4, 0.15.4
There are incompatible versions in the resolved dependencies.

['Traceback (most recent call last):\n', '  File "/usr/local/.pyenv/versions/3.6.8/lib/python3.6/site-packages/pipenv/utils.py", line 501, in create_spinner\n    yield sp\n', '  File "/usr/local/.pyenv/versions/3.6.8/lib/python3.6/site-packages/pipenv/utils.py", line 649, in venv_resolve_deps\n    c = resolve(cmd, sp)\n', '  File "/usr/local/.pyenv/versions/3.6.8/lib/python3.6/site-packages/pipenv/utils.py", line 539, in resolve\n    sys.exit(c.return_code)\n', 'SystemExit: 1\n']

If you think the above is an error on Dependabot's side please don't hesitate to get in touch - we'll do whatever we can to fix it.

You can mention @dependabot in the comments below to contact the Dependabot team.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.