Giter Club home page Giter Club logo

airflow-dbt's Introduction

airflow-dbt

This is a collection of Airflow operators to provide easy integration with dbt.

from airflow import DAG
from airflow_dbt.operators.dbt_operator import (
    DbtSeedOperator,
    DbtSnapshotOperator,
    DbtRunOperator,
    DbtTestOperator,
    DbtCleanOperator,
)
from airflow.utils.dates import days_ago

default_args = {
  'dir': '/srv/app/dbt',
  'start_date': days_ago(0)
}

with DAG(dag_id='dbt', default_args=default_args, schedule_interval='@daily') as dag:

  dbt_seed = DbtSeedOperator(
    task_id='dbt_seed',
  )

  dbt_snapshot = DbtSnapshotOperator(
    task_id='dbt_snapshot',
  )

  dbt_run = DbtRunOperator(
    task_id='dbt_run',
  )

  dbt_test = DbtTestOperator(
    task_id='dbt_test',
    retries=0,  # Failing tests would fail the task, and we don't want Airflow to try again
  )

  dbt_clean = DbtCleanOperator(
    task_id='dbt_clean',
  )

  dbt_seed >> dbt_snapshot >> dbt_run >> dbt_test >> dbt_clean

Installation

Install from PyPI:

pip install airflow-dbt

It will also need access to the dbt CLI, which should either be on your PATH or can be set with the dbt_bin argument in each operator.

Usage

There are five operators currently implemented:

Each of the above operators accept the following arguments:

  • env
    • If set as a kwarg dict, passed the given environment variables as the arguments to the dbt task
  • profiles_dir
    • If set, passed as the --profiles-dir argument to the dbt command
  • target
    • If set, passed as the --target argument to the dbt command
  • dir
    • The directory to run the dbt command in
  • full_refresh
    • If set to True, passes --full-refresh
  • vars
    • If set, passed as the --vars argument to the dbt command. Should be set as a Python dictionary, as will be passed to the dbt command as YAML
  • models
    • If set, passed as the --models argument to the dbt command
  • exclude
    • If set, passed as the --exclude argument to the dbt command
  • select
    • If set, passed as the --select argument to the dbt command
  • selector
    • If set, passed as the --selector argument to the dbt command
  • dbt_bin
    • The dbt CLI. Defaults to dbt, so assumes it's on your PATH
  • verbose
    • The operator will log verbosely to the Airflow logs
  • warn_error
    • If set to True, passes --warn-error argument to dbt command and will treat warnings as errors

Typically you will want to use the DbtRunOperator, followed by the DbtTestOperator, as shown earlier.

You can also use the hook directly. Typically this can be used for when you need to combine the dbt command with another task in the same operators, for example running dbt docs and uploading the docs to somewhere they can be served from.

Building Locally

To install from the repository: First it's recommended to create a virtual environment:

python3 -m venv .venv

source .venv/bin/activate

Install using pip:

pip install .

Testing

To run tests locally, first create a virtual environment (see Building Locally section)

Install dependencies:

pip install . pytest

Run the tests:

pytest tests/

Code style

This project uses flake8.

To check your code, first create a virtual environment (see Building Locally section):

pip install flake8
flake8 airflow_dbt/ tests/ setup.py

Package management

If you use dbt's package manager you should include all dependencies before deploying your dbt project.

For Docker users, packages specified in packages.yml should be included as part your docker image by calling dbt deps in your Dockerfile.

Amazon Managed Workflows for Apache Airflow (MWAA)

If you use MWAA, you just need to update the requirements.txt file and add airflow-dbt and dbt to it.

Then you can have your dbt code inside a folder {DBT_FOLDER} in the dags folder on S3 and configure the dbt task like below:

dbt_run = DbtRunOperator(
  task_id='dbt_run',
  dbt_bin='/usr/local/airflow/.local/bin/dbt',
  profiles_dir='/usr/local/airflow/dags/{DBT_FOLDER}/',
  dir='/usr/local/airflow/dags/{DBT_FOLDER}/'
)

Templating and parsing environments variables

If you would like to run DBT using custom profile definition template with environment-specific variables, like for example profiles.yml using jinja:

<profile_name>:
  outputs:
    <source>:
      database: "{{ env_var('DBT_ENV_SECRET_DATABASE') }}"
      password: "{{ env_var('DBT_ENV_SECRET_PASSWORD') }}"
      schema: "{{ env_var('DBT_ENV_SECRET_SCHEMA') }}"
      threads: "{{ env_var('DBT_THREADS') }}"
      type: <type>
      user: "{{ env_var('USER_NAME') }}_{{ env_var('ENV_NAME') }}"
  target: <source>

You can pass the environment variables via the env kwarg parameter:

import os
...

dbt_run = DbtRunOperator(
  task_id='dbt_run',
  env={
    'DBT_ENV_SECRET_DATABASE': '<DATABASE>',
    'DBT_ENV_SECRET_PASSWORD': '<PASSWORD>',
    'DBT_ENV_SECRET_SCHEMA': '<SCHEMA>',
    'USER_NAME': '<USER_NAME>',
    'DBT_THREADS': os.getenv('<DBT_THREADS_ENV_VARIABLE_NAME>'),
    'ENV_NAME': os.getenv('ENV_NAME')
  }
)

License & Contributing

GoCardless ♥ open source. If you do too, come join us.

airflow-dbt's People

Contributors

abamisileaje-tc avatar andrewrjones avatar ayobamshy avatar cnarvaa avatar d-swift avatar danielcmessias avatar dependabot-preview[bot] avatar dmasip-rxd avatar etoulas avatar falydoor avatar fenimore avatar irfanhabib avatar isaacseymour avatar kimia84 avatar lovenishgoyal avatar rafalsiwek avatar rliddler avatar timmycarbone avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

airflow-dbt's Issues

Use Airflow's own connections in dbt profiles

It would be a great feature if we could use Airflow's own "Connections" section instead of having to provide usernames/passwords/keyfiles in profiles.yml.

I'm thinking specifically in using dbt in Google Cloud Composer, which provides a "bigquery_default" connection that other Operators can use. This is just an example, and could definitely be applied to every Airflow installation.

It would also help with maintaining the DRY principle, by not having to provide the same credentials in two separate parts of the code (airflow connections and profiles.yml).

Run DBT on Google Cloud Composer: Issue Permission Denied

Hi Gocardless team,
thanks for this amazing package, truly great of combined airflow and dbt as well

I have an issue, I'm running airflow-dbt within cloud composer, and got this error

ERROR - [Errno 13] Permission denied: '/home/airflow/gcs/dags/dbt_cx'

Can you guys help on this?
Thanks again

MWAA integration looking for dbt_project.yml in subdirectories

Hello all!

I am trying to integrate MWAA with Airflow-DBT and for some reason when using the DBT run operator, DBT initially looks for the DBT_Project.yml file in the initial dbt directory (/usr/local/airflow/dags/dbt/dbt_project.yml) as defined using the dir variable. However, it then proceeds to search all subdirectories for the dbt_projects.yml file and resulting in the following error:

[2022-02-13 15:42:52,016] {{dbt_hook.py:130}} INFO - Failed to read package: Runtime Error
[2022-02-13 15:42:52,016] {{dbt_hook.py:130}} INFO - no dbt_project.yml found at expected path /usr/local/airflow/dags/dbt/snapshots/dbt_project.yml

Any idea why airflow-dbt searches for dbt_project.yml in the snapshots folder after having found it in the parent folder?

Thank you for all your help!!

Release a version with DbtDocsGenerateOperator included

Hi I saw there's a merged PR (#28) regarding to DbtDocsGenerateOperator, but it's not avaiable in the latest pip version yet. I know it's been a while, but just curious if it's possible to have a new version release with DbtDocsGenerateOperator included?

import airflow.hooks.base_hook is deprecated

/lib/python3.9/site-packages/airflow_dbt/hooks/dbt_hook.py:7 DeprecationWarning: The airflow.hooks.base_hook.BaseHook class is deprecated. Please use 'airflow.hooks.base.BaseHook'.
/lib/python3.9/site-packages/airflow_dbt/operators/init.py:1 RemovedInAirflow3Warning: This decorator is deprecated.

In previous versions, all subclasses of BaseOperator must use apply_default decorator for the default_args feature to work properly.

In current version, it is optional. The decorator is applied automatically using the metaclass.

Ability to run dbt deps in every operator

This package assumes that dbt deps has been executed at the deployment stage, and doesn't need to be rerun in any subsequent operators. This implies dependencies don't change between deployment and task runs.

Scenario: Airflow was deployed 5 days ago and I'm running a DbtRunOperator task today. If a package updated since then, those changes won't be reflected in my task because because dbt deps hasn't run for the last 5 days. This is relevant in the case where the dbt repo and airflow repos are separate, and the dbt repo gets imported into airflow as a dbt package.

Create "API" tag for those issues requesting flags and operators, and dbt version strategy

Hi, there's a number of issues asking for this and that new flag or param in new dbt versions. It'd be nice to tag them as such, they're nice "first contributions".

There's still the pending issue of supporting newer dbt versions. Do we want to suppport all the dbt versions? do we want to be tied to the Airflow version (see #54 ) or to the dbt version (see #59 ), or to both, using a compat matrix like dbt-utils.

Maybe the render function should also receive the airflow version it is rendering against.

Another option would be to do just like the Airflow providers does, they do NOT support old airflow versions. I think it makes sense to support dbt v1.x officially in airflow-dbt. And also look into adding this as a provider at some point

Error in GCP Composer

When I run in the composer it returns that.

[2020-04-16 21:27:07,642] {taskinstance.py:862} INFO - Executing <Task(DbtRunOperator): dbt_run> on 2020-04-16T21:26:19.688956+00:00
[2020-04-16 21:27:07,643] {base_task_runner.py:133} INFO - Running: ['airflow', 'run', 'dw_dbt', 'dbt_run', '2020-04-16T21:26:19.688956+00:00', '--job_id', '1004', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/dw_dbt/dw_dbt_airflow.py', '--cfg_path', '/tmp/tmph61y8y0e']
[2020-04-16 21:27:09,486] {base_task_runner.py:115} INFO - Job 1004: Subtask dbt_run [2020-04-16 21:27:09,486] {settings.py:254} INFO - settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=1800, pid=1044
[2020-04-16 21:27:09,800] {base_task_runner.py:115} INFO - Job 1004: Subtask dbt_run [2020-04-16 21:27:09,800] {app.py:53} WARNING - Using default Composer Environment Variables. Overrides have not been applied.
[2020-04-16 21:27:09,803] {base_task_runner.py:115} INFO - Job 1004: Subtask dbt_run [2020-04-16 21:27:09,802] {configuration.py:593} INFO - Reading the config from /etc/airflow/airflow.cfg
[2020-04-16 21:27:09,818] {base_task_runner.py:115} INFO - Job 1004: Subtask dbt_run [2020-04-16 21:27:09,818] {settings.py:254} INFO - settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=1800, pid=1044
[2020-04-16 21:27:10,185] {base_task_runner.py:115} INFO - Job 1004: Subtask dbt_run [2020-04-16 21:27:10,184] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluster.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-04-16 21:27:10,187] {base_task_runner.py:115} INFO - Job 1004: Subtask dbt_run [2020-04-16 21:27:10,187] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-04-16 21:27:10,188] {base_task_runner.py:115} INFO - Job 1004: Subtask dbt_run [2020-04-16 21:27:10,188] {dagbag.py:407} INFO - Filling up the DagBag from /home/airflow/gcs/dags/dw_dbt/dw_dbt_airflow.py
[2020-04-16 21:27:10,194] {base_task_runner.py:115} INFO - Job 1004: Subtask dbt_run /opt/python3.6/lib/python3.6/site-packages/airflow_dbt/operators/dbt_operator.py:43: PendingDeprecationWarning: Invalid arguments were passed to DbtRunOperator (task_id: dbt_run). Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:
[2020-04-16 21:27:10,194] {base_task_runner.py:115} INFO - Job 1004: Subtask dbt_run *args: ()
[2020-04-16 21:27:10,194] {base_task_runner.py:115} INFO - Job 1004: Subtask dbt_run **kwargs: {'profile_dir': './dw-dbt/profiles'}
[2020-04-16 21:27:10,195] {base_task_runner.py:115} INFO - Job 1004: Subtask dbt_run   super(DbtBaseOperator, self).__init__(*args, **kwargs)
[2020-04-16 21:27:10,619] {base_task_runner.py:115} INFO - Job 1004: Subtask dbt_run [2020-04-16 21:27:10,619] {cli.py:545} INFO - Running <TaskInstance: dw_dbt.dbt_run 2020-04-16T21:26:19.688956+00:00 [running]> on host airflow-worker-67465d97c9-v7q2l
[2020-04-16 21:27:10,685] {base_task_runner.py:115} INFO - Job 1004: Subtask dbt_run [2020-04-16 21:27:10,684] {dbt_hook.py:86} INFO - dbt run --models l1_autorizacoes
[2020-04-16 21:27:10,835] {taskinstance.py:1059} ERROR - [Errno 2] No such file or directory: 'dbt': 'dbt'
Traceback (most recent call last)
  File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 930, in _run_raw_tas
    result = task_copy.execute(context=context
  File "/opt/python3.6/lib/python3.6/site-packages/airflow_dbt/operators/dbt_operator.py", line 61, in execut
    self.hook.run_cli('run'
  File "/opt/python3.6/lib/python3.6/site-packages/airflow_dbt/hooks/dbt_hook.py", line 93, in run_cl
    close_fds=True
  File "/opt/python3.6/lib/python3.6/subprocess.py", line 729, in __init_
    restore_signals, start_new_session
  File "/opt/python3.6/lib/python3.6/subprocess.py", line 1364, in _execute_chil
    raise child_exception_type(errno_num, err_msg, err_filename
FileNotFoundError: [Errno 2] No such file or directory: 'dbt': 'dbt

Dbt Operators are no longer working after migrating to Airflow running in Kubernetes

Apache Airflow version
2.7.2

What happened

Since migrating from local to develop environment airflow is running in kubernetes helm, DbtRunOperator -- and any DBT related operator, are no longer working in develop branch. DBT is not recognized as a command, even with dependency packages installed. I have a dbt project already initialized, and am also specifying the path to the dbt directory.

[2024-07-18, 10:06:13 UTC] {dbt_hook.py:117} INFO - dbt run --profiles-dir /opt/airflow/dags/repo/dags/dbt/mysql_dbt/profiles --target dev
[2024-07-18, 10:06:13 UTC] {dbt_hook.py:126} INFO - Output:
[2024-07-18, 10:06:17 UTC] {dbt_hook.py:132} INFO - Command exited with return code 2
[2024-07-18, 10:06:17 UTC] {taskinstance.py:1937} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.9/site-packages/airflow_dbt/operators/dbt_operator.py", line 98, in execute
self.create_hook().run_cli('run')
File "/home/airflow/.local/lib/python3.9/site-packages/airflow_dbt/hooks/dbt_hook.py", line 138, in run_cli
raise AirflowException("dbt command failed")
airflow.exceptions.AirflowException: dbt command failed
[2024-07-18, 10:06:17 UTC] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=airflow_dbt, task_id=dbt_run, execution_date=20240718T100611, start_date=20240718T100613, end_date=20240718T100617
[2024-07-18, 10:06:17 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 27286 for task dbt_run (dbt command failed; 18254)

CurrentDagDirectory = os.path.dirname(os.path.abspath(file))

dbt_run = DbtRunOperator(
    task_id='dbt_run',
    target='dev',
    profiles_dir=os.path.join(CurrentDagDirectory, 'mysql_dbt', 'profiles'),
    dir=os.path.join(CurrentDagDirectory, 'mysql_dbt'),
    full_refresh=False
)

Invalid arguments were passed to DbtSeedOperator

I am using dbt + MWAA v2.2. Following your instructions https://github.com/gocardless/airflow-dbt#templating-and-parsing-environments-variables, I set up the following dag:

dbt_run = DbtSeedOperator(
   task_id="dbt_seed",
   dbt_bin='/usr/local/airflow/.local/bin/dbt',
   profiles_dir='/usr/local/airflow/dags/{DBT_FOLDER}/',
   dir='/usr/local/airflow/dags/{DBT_FOLDER}/',
   env={
    'DBT_ENV_SECRET_DATABASE': '<DATABASE>',
    'DBT_ENV_SECRET_PASSWORD': '<PASSWORD>',
    'DBT_ENV_SECRET_SCHEMA': '<SCHEMA>',
    'USER_NAME': '<USER_NAME>',
  }
)

And for some reason I get such an error which I do not know how to debug, any help you could provide?
"""
airflow.exceptions.AirflowException: Invalid arguments were passed to DbtSeedOperator (task_id: dbt_seed).
"""
My env is fine since I have another dag (without airflow-dbt) that works. Assume something to do with the args. Moreover, here I do not find support for ENV arg: https://pypi.org/project/airflow-dbt/#description.

So I assume it has not been released yet? If no, when is it planned? And is there any workaround?

Thanks in advance!

Ability to send environment variables for use by profiles.yml

I'm running airflow-dbt on AWS MWAA (managed airflow). It does not let me directly configure the runtime environment of the airflow worker nodes; I can't install wrapper scripts or anything, and I don't want secrets in plain text in dag source code/yml. So I'm just calling dbt directly.

I would like to pass in secrets (mainly database username/password for connections) at runtime. The obvious way to do this is to have a profiles.yml with an env var reference, and then somehow get the DbtSeed/etc operator steps to accept an environment variables dictionary which are set just before the dbt binary is started. It would be helpful to have a clean way to do this. I have tried creating environment variables in a previous dag step, but they're already lost before the DbtOperator runs. Because these are for profiles.yml, not for the project yml, the --vars option is insufficient.

Any plans integrating directly with dbt api

any plans integrating directly with dbt api ? using python instead sub process to integrate

something like

import dbt.main as dbt
args = ["run"]
args.extend(["--project-dir", 'project_dir'])
args.extend(["--profiles-dir", 'profiles_dir'])
args.extend(["--profile", 'profile_name'])
args.append('--log-cache-events')
res, success = dbt.handle_and_check(args)

New release to PyPI?

Hello,

It looks like the latest release on PyPI is quite outdated. Are there any plans to tag and release a newer version some time soon? We can currently work around this by installing a specific commit hash, but it'd be nice to just rely on a simple pin in requirements.txt

Thanks for a very useful piece of kit!

Abstract execution environment

As many out there might know every project is a whole different world. For this reason we find ourselves with a plethora of ways for running DBT. Though airflow-dbt is an elegant solution we sometimes want to run this on a k8s pod, on cloud build or some other task execution environment. This is motivated by decoupling, security, different dbt versions (yes, dbt SEMVER is not consistent and minor versions break backwards compatibility so this is a thing), or simply centralising different orchestration tasks in a task-runner because it is already configured.

To accomplish this it would be interesting to:

  1. Abstract the command generation from the execution:
  2. Allow for dependency injection, passing the hook we want to use and run accordingly, pass a python function that receives the command (ala GCSFileTransformOperator) ... open for discussion

Add models to template fields

This will allow us to create an ad-hoc dbt pipeline that accepts models as a parameter

load_data = DbtRunOperator( task_id="load_data", models="{{ dag_run.conf['models'] }}" )

Happy to create the PR but for some reason I am being denied access when attempting to push a branch.

Dbt Operators no longer working after migrating to Airflow 2.3.0

Apache Airflow version

2.3.0 (latest released)

What happened

Since migrating to Airflow 2.3.0, DbtRunOperator -- and any DBT related operator, are no longer working in production. DBT is not recognized as a command, even with dependency packages installed. I have a dbt project already initialized, and am also specifying the path to the dbt directory.

[2022-05-11, 13:13:37 MDT] {dbt_hook.py:117} INFO - dbt run --profiles-dir /usr/local/airflow/dags/dbt/
[2022-05-11, 13:13:37 MDT] {taskinstance.py:1888} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/astro/.local/lib/python3.9/site-packages/airflow_dbt/operators/dbt_operator.py", line 98, in execute
    self.create_hook().run_cli('run')
  File "/home/astro/.local/lib/python3.9/site-packages/airflow_dbt/hooks/dbt_hook.py", line 119, in run_cli
    sp = subprocess.Popen(
  File "/usr/local/lib/python3.9/subprocess.py", line 951, in __init__
    self._execute_child(args, executable, preexec_fn, close_fds,
  File "/usr/local/lib/python3.9/subprocess.py", line 1821, in _execute_child
    raise child_exception_type(errno_num, err_msg, err_filename)
FileNotFoundError: [Errno 2] No such file or directory: 'dbt'
[2022-05-11, 13:13:37 MDT] {taskinstance.py:1394} INFO - Marking task as UP_FOR_RETRY. dag_id=dags_to_use, task_id=dbt_run, execution_date=20220511T191332, start_date=20220511T191337, end_date=20220511T191337
[2022-05-11, 13:13:37 MDT] {standard_task_runner.py:92} ERROR - Failed to execute job 47 for task dbt_run ([Errno 2] No such file or directory: 'dbt'; 9219)

I have also tried using a BashOperator to run dbt commands, but am getting the same error.

What you think should happen instead

Expected behavior: The DbtRunOperator runs the models currently in the dbt directory.

Sample dbt operator. This was working fine in previous Airflow releases

dbt_run = DbtRunOperator(
        task_id="dbt_run",
        dir="/usr/local/airflow/dags/dbt/",
        profiles_dir='/usr/local/airflow/dags/dbt/',
    )

How to reproduce

  1. Install astronomer cli
  2. astrocloud dev init
  3. Initialize dbt project inside /dags directory
    • pip install dbt-snowflake
    • dbt init
  4. Add following to airflow requirements.txt
dbt-snowflake==1.0.0
airflow-dbt==0.4.0
  1. astrocloud dev start
  2. Add DbtRunOperator code block to DAG file
dbt_run = DbtRunOperator(
        task_id="dbt_run",
        dir="/usr/local/airflow/dags/dbt/",
        profiles_dir='/usr/local/airflow/dags/dbt/',
    )
  1. Trigger dag from UI

Operating System

OS: Microsoft Windows 10 Enterprise; Version: 10.0.19042 Build 19042

Versions of Apache Airflow Providers

airflow-dbt==0.4.0

Deployment

Astronomer

Deployment details

Local Astronomer deployment via Docker

FROM quay.io/astronomer/ap-airflow:2.3.0

Anything else

This occurs every time on dag run

Use --no-use-colors in dbt_hook.py

My little advice is to to use the switch --no-use-colors in dbt_hook.py

    def run_cli(self, *command):
        """
        Run the dbt cli

        :param command: The dbt command to run
        :type command: str
        """

        dbt_cmd = [self.dbt_bin, "--no-use-colors", *command]

        if self.profiles_dir is not None:
            dbt_cmd.extend(['--profiles-dir', self.profiles_dir])

The log will be cleaner and not full of ANSI escape chars.

New release to pypi?

Hello,

It looks like the latest release on PyPI is quite outdated. Are there any plans to tag and release a newer version some time soon? We can currently work around this by installing a specific commit hash, but it'd be nice to just rely on a simple pin in requirements.txt 😄

Add DbtSourceFreshnessOperator

Hello, I'd like to ass the DbtSourceFreshnessOperator but I can't push my branch to the repo to open a PR. What should I do ?
Thanks :)

Deprecation warnings

I get these sort of warnings when using airflow-dbt:

  /home/virgilp/.local/lib/python3.9/site-packages/airflow_dbt/hooks/dbt_hook.py:7: DeprecationWarning: This module is deprecated. Please use `airflow.hooks.base`.
    from airflow.hooks.base_hook import BaseHook

And indeed, in airflow_dbt/hooks/dbt_hook.py we have the deprecated import even in the latest version of airflow-dbt.

Where to reference/install dbt packages with MWAA?

Where should I be installing packages (or referencing if deployed during a CI step) dbt packages while using MWAA? I'm currently installing to: /tmp/dbt/packages but am experiencing about a 50/50 success rate when running the test operator. Here is what the graph view of my dags look like:

Screen Shot 2022-11-12 at 12 21 06 PM

This is the error log for when the runs fail:
Screen Shot 2022-11-12 at 12 21 38 PM

The deps operator works great, then dbt run, but the test operator fails about 50% of the time. Should I be deploying package files to a folder within the MWAA S3 folder and then updating profiles.yml to reference that location?

Environment context:
MWAA v2.0.2
dbt-core>=1.0
dbt-postgres>=1.0
airflow-dbt==0.4.0

use of vars results in invalid dbt run command

Reading the source vars is used like:

dbt_run = DbtRunOperator(
    task_id="dbt_run",
    models="MY_MODEL",
    vars={"EXECUTION_DATE": "{{ ds }}"},
)

This generates an invalid dbt command where vars is formatted like:

--vars {"EXECUTION_DATE": "2020-12-22"}

What is valid is:

--vars '{"EXECUTION_DATE": "2020-12-22" }'

noting the extra '

I believe this is a bug.

dbt=0.18
airflow=1.10.14

new `env` functionality throws `dbt command not found` errors

I have been running this package in Composer Airflow using a separate PyPi release I made for my team to use: (airflow-dbt-cta). I hope this will be only a temporary solution to avoid cluttering PyPi with confusing rogue packages, but it has been over a year since the latest PyPi release of airflow-dbt, and over 6 months since a different user requested a new release (issue 65).

The new functionality that supports passing environment variables (eg to use variables in sources.yaml - see PR 60 from May 2020) was causing errors when I attempted to run it in Google Composer. This seems to be because the new addition of env causes dbt_hook.py to spin up a subprocess with an empty environment, when the behavior should instead be to just use the existing environment and only export new variables if the user specifies them. At least, when I made the change I suggested in PR 75, the operators began working as intended.

My team is trying to run this package in Composer Airflow, which might introduce quirks not present in other Airflow deployments... I'm not sure. Composer do be weird sometimes.

It would be amazing if we could get this fix in there and also have a new PyPi release? Pretty please?

Enable dbt run-operation

Hey,

I'd like to suggest adding support for the dbt run-operation command, that executes macros

Expose DbtBaseOperator

Please expose DbtBaseOperator in __init__.py as I would like to use the operator to extend my own custom operators.

Bump coverage to 100%

Yep. test all the things, at least once :)

Now that we have a small codebase it should be all tested.

Potentially to be implemented after #48 , since it adds new testing and changes some things

Add support for dbt-build-operator

Hi GoCardless team! Would love to submit a PR to extend airflow-dbt to add a build operator to support functionality added in dbt v0.21

I've already updated the readme, tests, and airflow_dbt/operators/dbt_operator.py, but am unable to make a push to the remote repo so I can submit a PR.

$ git push origin dbt-build-operator
ERROR: Permission to gocardless/airflow-dbt.git denied to wwells.

If the maintaining team is interested in this augmentation, what should I do to participate?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.