Giter Club home page Giter Club logo

boundary-layer's Introduction

Build StatusCoverage Status

boundary-layer

boundary-layer is a tool for building Airflow DAGs from human-friendly, structured, maintainable yaml configuration. It includes first-class support for various usability enhancements that are not built into Airflow itself:

  • Managed resources created and destroyed by Airflow within a DAG: for example, ephemeral DAG-scoped hadoop clusters on Dataproc
  • Type checking and automatic preprocessing on all arguments to all operators, based on flexible schemas
  • Automatic imports of required classes
  • Distinct before and after operator groups, to make it easier to manage actions taken at the beginning or end of workflows
  • DAG pruning, for extracting or eliminating sections of the graph while maintaining dependency relationships

boundary-layer also performs various checks to find errors that would only be made visible upon deployment to an Airflow instance, such as cycles in the DAG, duplicate task names, etc.

boundary-layer is used heavily on the Etsy Data Platform. Every DAG on our platform is defined by a boundary-layer configuration instead of in raw python, which greatly reduces the barrier to entry for our data scientists and engineers to develop DAGs, while ensuring that best practices are always observed in the generated python code. boundary-layer is the core of our fully self-service deployment process, in which DAGs are tested by our CI tools and errors are surfaced prior to allowing DAGs to be merged and deployed to our Airflow instances.

In addition, our migration from Oozie to Airflow relied heavily on boundary-layer's included conversion tool.

boundary-layer is pluggable, supporting custom configuration and extensions via plugins that are installed using pip. The core package does not contain any etsy-specific customizations; instead, those are all defined in an internally-distributed etsy plugin package.

For more information, see our article on Etsy's Code as Craft blog.

Supported operators and Airflow versions

boundary-layer requires that each operator have a configuration file to define its schema, the python class it corresponds to, etc. These configuration files are stored in the boundary-layer-default-plugin. We currently include configurations for a number of common Airflow operators (sufficient to support our needs at Etsy, plus a few more), but we know that we are missing quite a few operators that may be needed to satisfy common Airflow use cases. We are committed to continuing to add support for more operators, and we also commit to supporting a quick turn-around time for any contributed pull requests that only add support for additional operators. So please, submit a pull request if something is missing, or at least drop an issue to let us know.

Furthermore, due to some differences in the operators and sensors between Airflow release versions, there may be incompatibilities between boundary-layer and some Airflow versions. All of our operators are known to work with Airflow release versions 1.9 and 1.10 (although our schemas validate against the operator arguments for 1.10, which is a superset of those for 1.9 --- there could be some parameters that we allow but that 1.9 will not properly use).

Installation

boundary-layer is distributed via PyPI and can be installed using pip.

pip install boundary-layer --upgrade

We recommend installing into a virtual environment, but that's up to you.

You should now be able to run boundary-layer and view its help message:

$ boundary-layer --help

If the installation was successful, you should see output like:

usage: boundary-layer [-h] {build-dag,prune-dag,parse-oozie} ...

positional arguments:
  {build-dag,prune-dag,parse-oozie}

optional arguments:
  -h, --help            show this help message and exit

Publishing updates to PyPI (admins only)

boundary-layer is distributed via PyPI. We rely on an automated Github Actions build to publish updates. The build runs every time a tag is pushed to the repository. We have a script that automates the creation of these tags, making sure that they are versioned correctly and created for the intended commits.

The recommended process for publishing a relatively minor boundary layer update is to simply run

./release.py

which will bump the patch version.

For bigger changes, you can bump the minor (or major) versions, or you can force a specific version string, via one of the following commands:

./release.py --bump minor
./release.py --bump major
./release.py --force-version a.b.c

There are a few other options supported by the release.py command, as described by the usage string:

โ•ฐ$ ./release.py --help
usage: release.py [-h]
                  [--bump {major,minor,patch} | --force-version FORCE_VERSION]
                  [--git-remote-name GIT_REMOTE_NAME]
                  [--remote-branch-name REMOTE_BRANCH_NAME]

optional arguments:
  -h, --help            show this help message and exit
  --bump {major,minor,patch}
                        Select the portion of the version string to bump.
                        default: `patch`
  --force-version FORCE_VERSION
                        Force the new version to this value. Must be a valid
                        semver.
  --git-remote-name GIT_REMOTE_NAME
                        Name of the git remote from which to release. default:
                        `origin`
  --remote-branch-name REMOTE_BRANCH_NAME
                        Name of the remote branch to use as the basis for the
                        release. default: `master`

boundary-layer YAML configs

The primary feature of boundary-layer is its ability to build python DAGs from simple, structured YAML files.

Below is a simple boundary-layer yaml config, used for running a Hadoop job on Google Cloud Dataproc:

name: my_dag
dag_args:
  schedule_interval: '@daily'
resources:
- name: dataproc-cluster
  type: dataproc_cluster
  properties:
    cluster_name: my-cluster-{{ execution_date.strftime('%s') }}
    num_workers: 10
    region: us-central1
default_task_args:
  owner: etsy-data-platform
  project_id: my-project-id
  retries: 2
  start_date: '2018-10-31'
  dataproc_hadoop_jars:
  - gs://my-bucket/my/path/to/my.jar
before:
- name: data-sensor
  type: gcs_object_sensor
  properties:
    bucket: my-bucket
    object: my/object
operators:
- name: my-job
  type: dataproc_hadoop
  requires_resources:
  - dataproc-cluster
  properties:
    main_class: com.etsy.my.job.ClassName
    dataproc_hadoop_properties:
      mapreduce.map.output.compress: 'true'
    arguments: [ '--date', '{{ ds }}' ]

A few interesting features:

  • The resources section of the configuration defines a transient DataProc cluster resource that is required by the hadoop job. boundary-layer will automatically insert the operators to create and delete this cluster, as well as the dependencies between the jobs and the cluster, when the DAG is created.
  • The before section of the configuration defines sensors that will be inserted by boundary-layer as prerequisites for all downstream operations in the DAG, including the creation of the transient DataProc cluster.

To convert the above YAML config into a python DAG, save it to a file (for convenience, this DAG is already stored in the examples directory) and run

$ boundary-layer build-dag readme_example.yaml > readme_example.py

and, if all goes well, this will write a valid Airflow DAG into the file readme_example.py. You should open this file up and look at its contents, to get a feel for what boundary-layer is doing. In particular, after some comments at the top of the file, you should see something like this:

import os
from airflow import DAG

import datetime

from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.sensors.gcs_sensor import GoogleCloudStorageObjectSensor
from airflow.contrib.operators.dataproc_operator import DataprocClusterDeleteOperator, DataProcHadoopOperator, DataprocClusterCreateOperator

DEFAULT_TASK_ARGS = {
        'owner': 'etsy-data-platform',
        'retries': 2,
        'project_id': 'my-project-id',
        'start_date': '2018-10-31',
        'dataproc_hadoop_jars': ['gs://my-bucket/my/path/to/my.jar'],
    }

dag = DAG(
        schedule_interval = '@daily',
        catchup = True,
        max_active_runs = 1,
        dag_id = 'my_dag',
        default_args = DEFAULT_TASK_ARGS,
    )

data_sensor = GoogleCloudStorageObjectSensor(
        dag = (dag),
        task_id = 'data_sensor',
        object = 'my/object',
        bucket = 'my-bucket',
        start_date = (datetime.datetime(2018, 10, 31, 0, 0)),
    )


dataproc_cluster_create = DataprocClusterCreateOperator(
        dag = (dag),
        task_id = 'dataproc_cluster_create',
        num_workers = 10,
        region = 'us-central1',
        cluster_name = "my-cluster-{{ execution_date.strftime('%s') }}",
        start_date = (datetime.datetime(2018, 10, 31, 0, 0)),
    )

dataproc_cluster_create.set_upstream(data_sensor)

my_job = DataProcHadoopOperator(
        dag = (dag),
        task_id = 'my_job',
        dataproc_hadoop_properties = { 'mapreduce.map.output.compress': 'true' },
        region = 'us-central1',
        start_date = (datetime.datetime(2018, 10, 31, 0, 0)),
        cluster_name = "my-cluster-{{ execution_date.strftime('%s') }}",
        arguments = ['--date','{{ ds }}'],
        main_class = 'com.etsy.my.job.ClassName',
    )

my_job.set_upstream(dataproc_cluster_create)

dataproc_cluster_destroy_sentinel = DummyOperator(
        dag = (dag),
        start_date = (datetime.datetime(2018, 10, 31, 0, 0)),
        task_id = 'dataproc_cluster_destroy_sentinel',
    )

dataproc_cluster_destroy_sentinel.set_upstream(my_job)

dataproc_cluster_destroy = DataprocClusterDeleteOperator(
        dag = (dag),
        task_id = 'dataproc_cluster_destroy',
        trigger_rule = 'all_done',
        region = 'us-central1',
        cluster_name = "my-cluster-{{ execution_date.strftime('%s') }}",
        priority_weight = 50,
        start_date = (datetime.datetime(2018, 10, 31, 0, 0)),
    )

dataproc_cluster_destroy.set_upstream(my_job)

This python DAG is now ready for ingestion directly into a running Airflow instance, following whatever procedure is appropriate for your Airflow deployments.

A few things to note:

  • boundary-layer converted the start_date parameter from a string to a python datetime object. This is an example of the boundary-layer argument-preprocessor feature, which allows config parameters to be specified as user-friendly strings and converted to the necessary python data structures automatically.
  • boundary-layer added a sentinel node in parallel with the cluster-destroy node, which serves as an indicator to Airflow itself regarding the ultimate outcome of the Dag Run. Airflow determines the Dag Run status from the leaf nodes of the DAG, and normally the cluster-destroy node will always execute (irrespective of upstream failures) and will likely succeed. This would cause DAGs with failures in critical nodes to be marked as successes, if not for the sentinel node. The sentinel node will only trigger if all of its upstream dependencies succeed --- otherwise it will be marked as upstream-failed, which induces a failure state for the Dag Run.

Oozie Migration tools

In addition to allowing us to define Airflow workflows using YAML configurations, boundary-layer also provides a module for converting Oozie XML configuration files into boundary-layer YAML configurations, which can then be used to create Airflow DAGs.

Admittedly, boundary-layer's Oozie support is currently limited: it is only capable of building DAGs that submit their Hadoop jobs to Dataproc (it does not support stand-alone Hadoop clusters, for example), and it does not support Oozie coordinators. We are open to working on improved Oozie support if there is community demand for it, and of course, we are open to community contributions toward this goal.

The following command will translate an example Oozie workflow to a boundary-layer DAG that will execute on a 64-node Dataproc cluster in GCP's us-east1 region, for the GCP project my-project-id:

boundary-layer parse-oozie example \
  --local-workflow-base-path test/data/oozie-workflows/ \
  --cluster-project-id my-project-id \
  --cluster-region us-east1 \
  --cluster-num-workers 64

boundary-layer's People

Contributors

agomez-etsy avatar andpol avatar antjw avatar brighton1101 avatar cbthompson1 avatar dcheno avatar dianeschulze avatar dossett avatar etc-jibdugaw avatar everglory99 avatar gpetroski-etsy avatar hyoung87 avatar hyungoos avatar jcraver1021 avatar jmchen28 avatar juanfbages avatar kzvezdarov avatar m-lce avatar mchalek avatar nickmoorman avatar onetonfoot avatar peleyal avatar rfan-debug avatar sahilkhanna129 avatar vchiapaikeo avatar vkhuat avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

boundary-layer's Issues

kubernetes Propeties - SecurityContext and ServiceAccount

Hello,
I am trying to convert existing DAG to YAML, but i don't see options to add security_context and service_account_name details in YAML file. is there any workaround for adding these options in YAML file? i don't see these options are available in Kubernetes properties.

Thank You,
Syam

Registering custom boundary-layer-plugin

i wrote some (boundary-layer) schemas for my Airflow custom operators. Following same convention as boundary_layer_default_plugin, i bundled all configs, plugin.py and preprocessor.py file into a single folder.
.
now i was wondering how would boundary-layer pick my custom plugin and generate python code that employs those custom operators.
.
for POC, i used the hack where i manually updated entry_points.txt & top_level.txt files in pip's site-packages/boundary_layer-1.6.16.dist-info directory. And it worked: i was able to generate python DAG-definition file that employed my custom operators.
.
but i still don't understand what is the right way to deploy my boundary-layer plugin. i can do this by modifying setup.py file, but that defies the purpose of plugin: i will be modifying the source code of boundary-layer itself.

Support for Airflow HTTP operator

Airflow HTTP operator has an optional parameter named response_check that gets a function (docs)

Adding a corresponding schema named http.yaml for a boundary-layer http operator resulted in the next error:
boundary_layer.exceptions.InvalidConfig: Invalid config spec in file c:\path_to_venv\site-packages\boundary_layer_default_plugin\config\operators\http.yaml: {'parameters_jsonschema': ["Invalid JSON schema: 'function' is not valid under any of the given schemas\n\nFailed validating 'anyOf' in schema['properties']['properties']['additionalProperties']['properties']['type']:\n {'anyOf': [{'$ref': '#/definitions/simpleTypes'},\n {'items': {'$ref': '#/definitions/simpleTypes'},\n 'minItems': 1,\n 'type': 'array',\n 'uniqueItems': True}]}\n\nOn instance['properties']['response_check']['type']:\n 'function'"]}

http.yaml content is: (with problematic parameter #'d out)

name: http
operator_class: SimpleHttpOperator
operator_class_module: airflow.operators.http_operator
schema_extends: base
parameters_jsonschema:
  properties:
    http_conn_id:
      type: string
    endpoint:
      type: string
    method:
      type: object
    data:
      type: object
    headers:
      type: object
    # response_check:
    #   type: function
    extra_options:
      type: object
    xcom_push:
      type: boolean
    log_response:
      type: boolean
  required:
    - http_conn_id
    - endpoint
  additionalProperties: false

`EnsureRenderedStringPattern` preprocessor should warn without rejecting strings containing verbatim directives

Right now, if you have a parameter that uses the EnsureRenderedStringPattern pre-processor, and if you pass a verbatim string to this parameter, there is a chance that the parameter value will be rejected as invalid because the characters < and > are not supported by the provided regular expression. An example is the dataproc cluster_name parameter. We should recognize this condition and generate a warning, like we do when we encounter a jinja template that we do not know how to render.

Adding more parameters in dag_args

Hi,
From my testing as well as this schema in the source, seems like only catchup, max_active_runs, concurrency and schedule_interval are allowed in dag_args section. However, there are many more other parameters available in DAG constructor (see this). In most of our DAGs in production, we use dagrun_timeout and on_failure_callback. I am wondering is it possible to enable more parameters for dag_args schema without changing the source code, maybe through a plugin? Thanks for the help!

Parameterized preprocessor

The signature of process_arg(..) method of Preprocessor's hints that there's provision for passing and using arguments to preprocessor

@abc.abstractmethod
    def process_arg(self, arg, node, raw_args):
        pass

.
More precisely, I want to supply some arguments to my preprocessor from within my dag-definition-YAML file
.

  • Is that even possible?
  • I couldn't any example detailing how to achieve this

Support TriggerDagRunOperator

  • looking into default plugin, it becomes apparent than TriggerDagRunOperator is presently not supported
  • and it it understandable since that would require validation not just within DAG / YAML (which is presently done by networkx library), but across them
    .
  • while ExternalTaskSensor do manage to capture dependencies across DAGs, but they fail to provide the same reactive / eager triggering as TriggerDagRunOperator does
  • Moreover there are cases when a sensor simply cannot replace a triggering-operator without messing up entire schedules of DAGs
    .
  • is there any plan to support them in future?
  • is there a neat approach using which this can be implemented?

Quote escaping issues leads to lost backslash `\` and syntax errors

Hi there, I ran into this situation:

Given yaml like:

arguments:
    - --textproto_content
    - 'query: "\''foo\'' = \"foo\"" x: "bar"'

Boundary layer produces Python code like (whitespace added for clarity):

arguments = [
          '--textproto_content',
          """query: "\'foo\' = \"foo\"" x: "bar"""",

There are two problems with the generated """query:... code above:

  • The string ending with 4 " characters: """" causes a syntax error
>>> print("""query: "\'foo\' = \"foo\"" x: "bar"""")
  File "<stdin>", line 1
    print("""query: "\'foo\' = \"foo\"" x: "bar"""")
                                                  ^
SyntaxError: unterminated string literal (detected at line 1)
  • Even if I fix that syntax error, we lose the \ characters that were part of the original string (and we need so that the resulting string that is passed to the operator is valid textproto)
>>> print("""query: "\'foo\' = \"foo\"" x: "bar" """)
query: "'foo' = "foo"" x: "bar" 

Expected:

query: "\'foo\' = \"foo\"" x: "bar"

Support Airflow Variable?

Hi,
Thanks for open sourcing boundary-layer, this is really useful! While I am prototyping some plugins for some other open sourced operators and internal operators, I can't find any support for Variables (https://airflow.apache.org/concepts.html#variables). Am I missing something? Is there a plan to support Variable? The use case is that we want to maintain one yaml file across different environments. Thanks for the help!

Inject Python code that will be used in PythonOperator

I would like to figure out a way if I can inject some Python code at the beginning of my DAG.
The idea is that I want to use the python_callable field on a PythonOperator to call my code, however my code is going to be like 5-10 lines of code and I cannot really represent it with lambda (or I would like not to, cause it's going to be ugly!)

Currently I see two options but I would like to get your feedback please :) Thanks!

  1. Define a plugin that creates a new operator that inherits from PythonOperator and will do the magic. Then, Boundary Layer will load the YAML file I config in the plugin so the output DAG.py is valid. And, I'll also have to include the implementation itself (not the YAML) in the DAGs folder in airflow itself.
  2. A better option(which I'm not sure if it is supported) will be to have some way to paste code at the beginning of the DAG.py file. This I guess can be done using a pre (or post) processor. However, I'm not sure if Boundary Layer supports something like that.

While exploring these options, I preferred to file a bug and see what you guys think about it.
Thanks!

GCP Pubsub Publish Operator / Preprocessor Output - Bytes not supported generating DAGs

With the addition of the GCP Pubsub Publish Operator, the preprocessor for this converts message data property to bytes. However, bytes are not supported when generating the actual Python DAGs. The below is the error seen:

...
  File "/home/jenkins/workspace/bigdataconf-test-pr/venv/lib/python3.7/site-packages/boundary_layer/builders/util.py", line 132, in format_value
    type(value)))
Exception: Cannot format value `b'U1VDQ0VTUw=='`: no handler for type <class 'bytes'>

The solution for this likely lies in boundary_layer/builders/util.py in the function format_value. However, I know that byte literals are treated differently in Python 2 and Python 3, so this might be a little more involved of a change. Regardless, there may be future instances when generating a DAG with byte literals is required as well, so it might be worth looking into.

Resource sentinel nodes inside generators use incorrect task id's

We have found that under the following conditions, an error occurs:

  • A DAG uses a generator with at least two elements
  • The generator node has downstream dependencies
  • A resource is created inside the generator workflow
  • At least one node in the generator workflow both requires the resource and has no downstream dependencies that do not require the resource.

Under these conditions, boundary-layer by default inserts a sentinel node downstream of the node(s) matching these conditions, in order to propagate errors past the resource-destroy step. However, inside the generator, the sentinel node that is created does not have <<item_name>> appended to its task_id. This causes errors when that node is connected to the generator's downstream dependencies, because the Airflow set_upstream() method will be called repeatedly (once per generator element) for the (sentinel_node, generator_downstream_dependency) pairs. Airflow is proactive about alerting on this, which is nice (although in theory it's not really a problem...).

Anyway this is a bug that we should fix.

Upstream dependencies with generator

Hello,

I need to define the upstream dependencies from a key upstream from the items of the generators.
Something like this

generators:
  - name: bq-jobs
    type: list_object_generator
    target: bq-job
    properties:
      items:
        - name: job1
          upstream: []
        - name: job2
          upstream:
            - job1
---
name: bq-job

operators:   
- name: << item['name'] >>
  type: kubernetes
  upstream_dependencies: << item['upstream']>>

When I try this I have this error

Found errors in sub dag: {'operators': {0: {'upstream_dependencies': ['Not a valid list.']}}}

Do you know how can I make it work ?

Thanks !

boundary-layer does not process properties that are not in the operator schema

This issue manifests in two ways:

  1. For an operator schema that does not allow additional properties, boundary-layer will not reject configurations containing invalid properties
  2. For an operator schema that does allow additional properties, boundary-layer will not pass any properties to the operator that are not explicitly part of the schema.

Limitation to create more than 3 operator groups

Can we create more than 3 operator groups?
As far as I understand, we can only create 3 groups which is under 'before', 'operators' and 'after' section. How if we want to create let say 4 groups? And can we specifically route each operator ?

These are just the some examples of the use cases (ignore unused line in last image):
image
image

Add support for batching in the generator framework

In some cases, it may be desired to use a generator but handle the output in batches instead of individually. For example, maybe you want to list the files in a Google Cloud Storage bucket and then create BigQuery load jobs with them in batches of 10 (instead of a single load job per file). It should be possible to specify batching parameters in a generator config to achieve this result.

Does boundary-layer have active support?

Hi all!

In the last year we have been using this wonderful library.

We love it a lot, and as one of our engineers keep saying "Boundary Layer helps us to not be in the business of generating python code". We definitely prefer YAML.

However, in the last weeks we saw a decrease support in PRs that we need, such as #110 and #112.

Before we take the approach of forking and applying changes we need on our own fork, I thought it will be better to reach out to you guys, and specifically add some of the folks that created the last PRs (such as @vchiapaikeo, @dossett, @gpetroski-etsy and @mchalek).

It will be really great if we can get the minimal support we need (updating configuration, to support more operators...), so other Boundary Layer users can enjoy more operators that exists our there in Airflow...

Thoughts?
Thank you!
Eyal

@eap, @bthomee and @jcraver1021 FYI.

Resources passed to generators (and maybe subdags?) should only depend on sub-workflow operators that use the resource

We have seen behavior in which resources passed to generators containing some operators that use the resources, and other operators that do not use the resources, are not destroyed until every operator in the generator sub-workflow is complete. But this could mean keeping clusters around much longer than necessary, if the generator workflow contains long-running resource-independent operations.

This probably also applies to sub-dags, because resources are attached to them using the same mechanisms.

Sort operator arguments on DAG generation

First off, thank you so much for making this tool available! I am very happy using this library at my work.

For historical reasons, I version-control my generated DAGs and have found it easier to parse the diffs when I sort the operator arguments by name. I have tried this out locally by modifying the DagBuilderBase.render_operator method. Would there be any reason not to have the builder sort the arguments for each operator?

Import plugins with Airflow 2.0

Hi Etsy Engineers!

I just wanted to start by commending your effort in putting together this project. I had the pleasure of trying it out in our airflow project, and we are very pleased by the result!

Our airflow project has many customized operators/hooks/etc. But with Airflow 2.0 removal of support in importing operators, sensors, and hooks via plugins, how do you suggest we register our custom plugins to be used by boundary layer?
https://airflow.apache.org/docs/apache-airflow/stable/plugins.html#plugins

Alana

Upgrade to marshmallow 3

๐Ÿ‘‹ Just dropping by to let you know that marshmallow v3 is released.

Upgrading will provide a few benefits for boundary-layer:

before = fields.List(fields.Nested(OperatorSchema))
operators = fields.List(fields.Nested(OperatorSchema))
after = fields.List(fields.Nested(OperatorSchema))

load_from='property',
dump_to='property',

  • Serialization performance is significantly improved compared to v2

I've only skimmed the boundary-layer code, but it looks like the migration should be relatively straightforward.

  1. Use data_key instead instead of load_from and dump_to.
  2. Change usages of .load and .dump to handle ValidationError and expect the (de)serialized data dictionary to be returned.
        try:
            data = OozieWorkflowSchema(context={
                'cluster_config': cluster_config,
                'oozie_plugin': oozie_config,
                'macro_translator': JspMacroTranslator(oozie_config.jsp_macros()),
                'production': self.production,
            }).load(parsed)
        except ma.ValidationError:
            raise Exception('Errors parsing file {}: {}'.format(
                filename,
                loaded.errors))

        data_copy = data.copy()
  1. Add **kwargs to decorated methods.
    @validates_schema
    def validate_template_undefined(self, data, **kwargs):
        # ...

    @post_dump
    def dagrun_timeout_to_timedelta(self, data, **kwargs):
        # ...

A full upgrading guide is here: https://marshmallow.readthedocs.io/en/latest/upgrading.html

It's worth knowing that marshmallow 3 only supports Python 3. So you'd need to drop support for 2.7 in order to upgrade. Join the party https://python3statement.org/ ! ๐Ÿ˜„

airflow 1.10.6 conflict jsonschema version with boundary-layer 1.7.24

airflow 1.10.6 Requirement.parse('jsonschema>=3.0.1<4'), {'flask-appbuilder'})

boundary-layer 1.7.24 has requirement jsonschema<3.0,>=2.6.0, but you'll have jsonschema 3.2.0 which is incompatible.

So any suggestions to by-pass this by relaxing version or what is the version of airflow to work with boundary-layer?

Using preprocessors to cast into custom type

I took hint from existing date_string_to_datetime preprocessor and wrote my own preprocessor that converts input string argument into an Enum.
.
While the conversion worked, during dag-build phase (Python code-generation), I ended up hitting this block and got Cannot format value {}: no handler for type {}.
.

  • Was this expected or am i doing something wrong?
  • Is there a reason why this is restricted?
  • Is there a workaround?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.