Giter Club home page Giter Club logo

hamilton's Introduction

hamilton's People

Contributors

aabedrabbo avatar bovem avatar charitykithaka avatar chmp avatar danfisher-sf avatar datarshreya avatar elijahbenizzy avatar frenchfrywpepper avatar ianhoffman avatar ivirshup avatar jameslamb avatar rinsoft-sf avatar shellyjang avatar skrawcz avatar vslchusf avatar zilto avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

hamilton's Issues

Explore Ibis Integration

Is your feature request related to a problem? Please describe.
Ibis could happily replace pandas, and its more flexible (scalable, etc...)

Describe the solution you'd like
Ibis dataframes instead of pandas dataframes. Perhaps a plugin framework.

Create Hamilton converter for pandas code

Is your feature request related to a problem? Please describe.
With Hamilton you need need to restructure your code. This can be too much of a friction point for someone. Wouldn't it be nice if we had a way to help automate this step?

Describe the solution you'd like
We should be able to write some python code that parses the AST to covert code like:

df['a'] = df['b'] + df['c']

into

def a(b: pd.Series, c: pd.Series) -> pd.Series:
      return b + c

Core to this problem, is building code to parse python code and output/print hamilton functions. Once we have that, we can think about the places we could provide this, e.g. CLI, a website, some other means...

Describe alternatives you've considered
Not doing this.

Additional context
It would enable people to get up and running with Hamilton faster. E.g. if they provided a script, and we "walked" the script and guessed what should be output...

[idea] can we make hamilton run in a distributed manner?

What's the idea?

Hamilton runs locally on data that fits in memory on a single core.

Can we improve speed and data scale by making hamilton run in a parallel and distributed manner?

Why we think it should be possible?

Hamilton ultimately creates a DAG before executing. The idea would be to distribute various parts of this DAG.

We'd have to figure out initial data loading, but other than that, it seems like we're solving something that other systems have already solved for us. Can we harness them? Perhaps by going from a Hamilton DAG and "compiling" it to the target system of choice?

Ideas to explore

Add docker container(s) to help run examples

Is your feature request related to a problem? Please describe.
The friction to getting the examples up and running is installing the dependencies. A docker container with them already provided would reduce friction for people to get started with Hamilton.

Describe the solution you'd like

  1. A docker container, that has different python virtual environments, that has the dependencies to run the examples.
  2. The container has the hamilton repository checked out -- so it has the examples folder.
  3. Then using it would be:
  • docker pull image
  • docker start image
  • activate python virtual environment
  • run example

Describe alternatives you've considered
Not doing this.

Additional context
This was a request from a Hamilton talk.

Prototype Row Based Execution of a DAG

Is your feature request related to a problem? Please describe.
Currently Hamilton is setup to operate over entire "columns". This means there is no "incremental" computation that happens.
There are potential use cases where one might not have all the data, or want to conserve memory and chunk over data. Right now this is possible, but you end up having to rebuild the DAG prior to execution each time. This seems a little clunky; can we do better?

Describe the solution you'd like
A possibility is to:

  1. Create another Driver class which allows multiple calls to execute to happen successively, where some input is varied.
  2. Augment the current driver to allow for this type of execution style.
  3. Some other option :)

In terms of technical decision making, you'd need to figure out some assumptions. E.g. do you rerun @config.when each time? Or do you do a first pass, assume a fixed DAG structure, and then all that's changing is the value of some user defined inputs? Can you do multiple rows/inputs at a time? etc. Ideally people would not have to change the shape of their functions, and things would just work...

Describe alternatives you've considered
You could wrap the current Driver class in another driver that would hold the correct state and reinstantiate the DAG correctly before each iteration. However I think first class support would enable more use-cases for Hamilton, rather than having users write this code themselves.

Additional context
E.g. could this help with image processing, where you have a loop of images, and want to process them given some configuration?
E.g. could this help with memory consumption by enabling chunking over input data?

Add Ray workflows adapter

what

Ray workflows seems like something we could easily add too https://docs.ray.io/en/latest/workflows/concepts.html given that we now have GraphAdapters

Task

  • Implement something very similar to the RayGraphAdapter, i.e. RayWorkflowGraphAdapter. The hypothesis is that then we just need to use workflow step function to wrap hamilton functions.
  • implement an integration test for it
  • Implement a hello world with it

Context

Originally posted by @skrawcz in #10 (comment)

Add ability to provide functions directly to driver

Is your feature request related to a problem? Please describe.
Hard to develop with Hamilton on Google Collab.

From Herve M. via discord:

working with Google Colab, as this requires to have you feature .py scripts imported on the Colab machine, or git cloning them, but with the risk to loose them when the machine is recollected by Google. Having your features in the main notebook would make them safe on Colab.

Describe the solution you'd like
We should enable a way to pass functions into the driver directly.

Option 1 - pass directly

from my_timeseries_package import (
    rolling_mean,
    rolling_stdev
)
import other_func_module

d = Driver({},  rolling_mean, rolling_stdev, other_func_module)

Assuming we go with our current behavior, you would not be able to pass in two functions with the same name like happens now. i.e. no using this to "override" a function definition.

Pros

This is pretty simple to pick up and implement for a user.

Cons

  1. This requires changing code in the driver/function graph.
  2. The long term software impacts leave the possiblity of creating messy driver code, because functions are not curated.

Option 2 - curate functions into "module" like objects.

E.g.

from my_timeseries_package import (
    rolling_mean,
    rolling_stdev
)
import other_func_module

temp_module = TempModule(rolling_mean, rolling_stdev)
d = Driver({}, temp_module, other_func_module)

Assuming we go with our current behavior, you would not be able to pass in two functions with the same name like happens now. i.e. no using this to "override" a function definition.

Pros

  1. Does not require changes in the driver/function graph code.
  2. Leaves open the possibility of doing option 1 at a later date.
  3. Forces people to curate functions -- which should make refactoring the code easier in the future, versus allowing uncurated groups of functions.

Cons

  1. Introduces a new concept a user needs to know about.

Additional context
See https://discordapp.com/channels/842466558483496997/842467113611821087/982409991547940934
Also from private chatter between @jameslamb and @elijahbenizzy.

Revamp methods of parametrizing the DAG

Is your feature request related to a problem? Please describe.

Currently, we have two ways to pass data into/parametrize the DAG:

  1. By passing in a config -- this takes the form of a constructor argument to the function_graph during instantiation. Note that this is both used as input data to the DAG (on exectution) as well as a configuration for constructing the DAG
  2. By passing in overrides -- these are intended to override node values that already exist.

The problem is that this does not allow users to build a DAG once and execute it multiple times with different configurations. However, the function_graph actually allows for this using execute_static, but the parameters to function graph replace it with the config (

inputs=self.config,
)

Describe the solution you'd like
We should separate this into three different notions:

  1. config -- allows for pre-configuration of the DAG. Used primarily to determine DAG shape, but also passed in as inputs.
  2. inputs -- passes data into a run of the DAG (effectively filling the purpose of a user-defined-node)
  3. overrides -- overrides nodes in the DAG

inputs will become the union of config and inputs, for two reasons:

  1. To maintain backwards compatibility
  2. To reduce cognitive burden when calling the DAG for inputs that are required both in instantiation and execution

This will just require a change to the default driver. TBD what we do for inputs/configs clashing.

Describe alternatives you've considered
We could forego overrides entirely -- allowing inputs to override the DAG. Personally I think this is an easy way to shoot oneself in the foot, but it does simplify the API.

Additional context
Writing the docs and realizing the original design is not great.

Prototype integration with an LLVM or similar tech.

Here the following assumes "numba", but really we could replace "numba" with "jax", or any other framework that could optimize python code to execute faster.

Is your feature request related to a problem? Please describe.
Numba is a way to accelerate python functions. To use it, you annotate your python functions to be "compiled" with the jit. It then creates faster code from it.

Currently the speed up only materializes on the second invocation of a function -- the first time it compiles it. So to work with Hamilton, we'd have to compile ahead of time (AOT) if people only run a DAG once. Otherwise we could use the jit for DAGs that people execute over and over again.

Describe the solution you'd like
Two solutions:

  1. Prototype the ability to compile a hamilton graph ahead of time with Numba. You could use how we get Hamilton to run on Dask as a starting point (TODO: link to code). See these numba docs for ahead of time compilation.
  2. Prototype the ability to use the jit compiler with Numba. That way the first time someone runs execute things are compiled (no speed up), but the second time, things are lightning quick! See these docs.

Things to think about with prototype (1):

  1. Since compiling a head of time requires types -- we might need some better way to specify them? Or perhaps we can have numba infer it?
  2. The output of compilation is another set of python module(s) -- this is what we'd then want to use for computation.
  3. What is therefore the correct order of operations? Build the function graph, compile it, then somehow build the graph again with the new functions (?), and use that for execution?
  4. What are the limitations of this approach in terms of use cases, etc. We could limit to numpy and python primitive code only for instance.

Things to think about with prototype (2):

  1. What use cases does this make sense for?
  2. What are the limitations of this approach?

Describe alternatives you've considered
Haven't.

Additional context

explore extracting columns with validations

Is your feature request related to a problem? Please describe.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

When we extract columns, it would be very handy to be able to run checks against those columns. pandera is a great, lightweight tool for validating dtypes, nullability, uniqueness, and any arbitrary Check callable.

Describe the solution you'd like
A clear and concise description of what you want to happen.

Ideally this would be a decorator that would work similar to extra_columns, would ingest a DataFrame and return the same dataframe, and expand the nodes to have a dataframe validation node. This could be specific to pandera, or could be made more general, so something like

import pandas as pd
from pandera import DataFrameSchema, Column, Check

@validate_columns({
    "user_id": Column(str, unique=True),
    "age": Column(int, Check.in_range(18, 150),
    "shirt_size": Column(float, description="arm length in inches", Check.greater_than(10)),
    "favorite_apparel": Column(str, Check.isin(["pants", "shirts", "hats"]),
})
def users(input_file: str) -> pd.DataFrame:
    return pd.read_csv(input_file)

or more generically

import pandas as pd
import abc

class Schema(abc.ABC):
    @abc.abstract_method
    def validate(self):
        pass

class SimpleColumnChecker(Schema):
    def __init__(self, columns: Dict[str, Any]):
        self.columns = columns
    def validate(self, df):
        for column, col_schema in self.columns.items():
            assert column in df.columns
            if col_schema.get("unique"):
                assert df[column].shape[0] == df[column].drop_duplicates().shape[0]
            if col_schema.get("min"):
                assert df[column].min() > col_schema.get("min")
            if col_schema.get("max"):
                assert df[column].max() < col_schema.get("min")
            if col_schema.get("isin"):
                assert set(df[column]) == set(col_schema.get("isin"))
    
@validate_columns({
    "user_id": { "unique": True},
    "age": {"min": 18, "max": 150},
    "shirt_size": {"min": 10},
    "favorite_apparel": {"isin": ["pants", "shirts", "hats"]},
})
def users(input_file: str) -> pd.DataFrame:
    return pd.read_csv(input_file)

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

certainly you can have a splitting node where you validate data yourself, but I think this is a common enough pattern (or it really should be common enough and made a first class citizen of any dataframe manipulation) that it would benefit from being easy to plug in directly to a node

Additional context
Add any other context or screenshots about the feature request here.

Ability to Profile nodes

Is your feature request related to a problem? Please describe.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

Have found it useful to be able to profile code execution for purposes of debugging and profiling code for enhancing performance.

Describe the solution you'd like
A clear and concise description of what you want to happen.

A solution that I have implemented in the past has been to create a stateful decorator so that you can capture execution context. This would capture a list of calls with their function names and simple stats; while this was done with dataframes in mind, it can also be used on arbitrary functions and class methods, so extending to any sort of hamilton (callable) node should be straightforward. The original would look something like:

my_data.csv

a      b
1	3
2	4
3	5
profile = Tracer(**tracer_options)

@profile
def load_data(input_csv: str) -> pd.DataFrame:
    return pd.read_csv(input_csv)
    
@profile
def half_everything(load_data: pd.DataFrame) -> pd.DataFrame:
    return load_data / 2
    
if __name__ == "__main__":
    data = load_data("my_data.csv")
    transformed_data = half_everything(data)

print(profile)
[
    Trace(
         func=<function load_data at 0x7f9fc7ff8d30>,
         args=('my_data.csv', ),
         kwargs={},
         profile=Profile(
             cpu=Resource(start=3.9, end=4.4),
             memory=Resource(start=75.7, end=75.7)
          )
    ),
    Trace(
        func=<function transform at 0x7f9fd4782dd0>,
        args=(   a  b
             0  1  3
             1  2  4
             2  3  5, ),
        kwargs={},
        profile=Profile(
            cpu=Resource(start=0.0, end=3.8), 
            memory=Resource(start=75.7, end=75.7)
        )
    )
]

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

chatted with @elijahbenizzy , think it could be done either as a node extender similar to data quality, or as an adapter that wraps current adapter types

Add caching for hamilton

The problem

We want to enable caching of functions and their downstream results.

Say we want to alter a function and rerun the entire DAG. The function that we want to alter runs late enough that we'd be redoing a significant amount of computation. While iterating could often be solved by executing individual nodes, its completely reasonable to iterate on the entire DAG.

In our internal use of Hamilton, we actually have a decorator called @cache that runs entirely separate from Hamilton -- this allows us to cache the results of individual functions. This decorator uses (a) the code a function runs and (b) the hash of the parameters. That said, its not foolproof -- changes in external libraries referenced within functions can get ignored, its not DAG-aware (it doesn't care about downstream functions you might also not want to rerun), and it depends on hashability of parameters.

I envision this as useful for:

  1. Rerunning/iterating on a DAG locally
  2. Running expensive DAGs in production ETLs that all use the same cache but change minimal parts

Some options

  1. Automatically cache functions, have a clear_cache or use_cache in the execute function
  2. Use the @cache decorator we have internally
  3. Manage the cache externally -- pass in as an override to the driver's execute function. Then have a method on the driver to manipulate the cache as needed.

I'm partial to (1) although we need to make it visible to the user and easy to override. E.G. to mark things as changed. We could also have decorators that say dont_cache if needed.

Probably a few other things we can do -- welcome feedback! Might want to think about making it pluggable -- saving to disk is nice, but saving it to a backing store could be even nicer. Shouldn't get locked in.

Expose tags and function metadata to decorators

Is your feature request related to a problem? Please describe.
With tagging, we can annotate functions with metadata. It would be useful to allow decorators access to this, and other metadata accumulated.

E.g. tags could be used as a means to help inform what should happen if a check_output decorator runs a test and it fails. That is, if we standardize on tag keys, then decorators could assume them and make use of them.

Describe the solution you'd like
Enable decorators access to a context or some variable that would allow them to get at this information.

Describe alternatives you've considered
N/A

Additional context
Taken from the discussion with whylabs folks on what would be useful.

Capturing wide to long transformations of entire dataframes using Hamilton

Is your feature request related to a problem? Please describe.
Is there a way to transform a dataframe from wide to long (or vice versa) using Hamilton to track this transformation? I concluded no since all of the input/output columns would need to be specified, which could be a lot of typing.

Describe the solution you'd like
It would nice if I could define a function that accepts df_wide and outputs df_long with pd.melt.

Describe alternatives you've considered
I performed the melt operation outside of Hamilton so this operation is not directly captured through the DAG.

Hamilton should probably handle a circular dependency better -- and allow overrides to work for computation.

Current behavior

Stack overflow on circular DAG -- we should error more gracefully/handle things better.

Library & System Information

Latest, mac osx.

Steps to replicate behavior

# circular.py
def a(b: int, c: int) -> int:
    return b + c


def b(a: int, c: int) -> int:
    return a - c


def c(a: int, b:int )-> int:
    return a - c
# run.py
from hamilton import driver
import circular
dr = driver.Driver({}, circular)
print(dr.has_cycles(['b']))
result = dr.execute(['b'], overrides={'a': 1, 'c': 2})
result = dr.execute(['b'], inputs={'a': 1, 'c': 2})

Stack Traces

  File "hamilton/hamilton/driver.py", line 174, in has_cycles
    nodes, user_nodes = self.graph.get_required_functions(final_vars)
  File "hamilton/hamilton/graph.py", line 286, in get_required_functions
    return self.directional_dfs_traverse(lambda n: n.dependencies, starting_nodes=final_vars)
  File "hamilton/hamilton/graph.py", line 305, in directional_dfs_traverse
    dfs_traverse(self.nodes[var])
  File "hamilton/hamilton/graph.py", line 295, in dfs_traverse
    dfs_traverse(n)
  File "hamilton/hamilton/graph.py", line 295, in dfs_traverse
    dfs_traverse(n)
  File "hamilton/hamilton/graph.py", line 295, in dfs_traverse
    dfs_traverse(n)
  [Previous line repeated 991 more times]
  File "hamilton/hamilton/graph.py", line 293, in dfs_traverse
    for n in next_nodes_fn(node):
  File "hamilton/hamilton/graph.py", line 286, in <lambda>
    return self.directional_dfs_traverse(lambda n: n.dependencies, starting_nodes=final_vars)
RecursionError: maximum recursion depth exceeded

Expected behavior

  1. We error gracefully.
  2. If the DAG being walked doesn't result in a cycle, then we should handle it.
    E.g.
print(dr.has_cycles(['b'])) # True
result = dr.execute(['b'], overrides={'a': 1, 'c': 2}) # -1
result = dr.execute(['b'], inputs={'a': 1, 'c': 2}) # -1

Additional context

https://www.reddit.com/r/programming/comments/tfarht/comment/i0vdud3/?utm_source=share&utm_medium=web2x&context=3

Create HOW TOs for integration with popular ETL frameworks/methodology

Is your feature request related to a problem? Please describe.
Hamilton has a small footprint. It can be run inside existing ETLs very easily. We should have documentation to reflect
that fact to help users understand how simple it is to get it working.

Describe the solution you'd like
We should have examples/documentation to cover:

  1. Metaflow
  2. Airflow
  3. Dagster
  4. Flyte
  5. Your custom scheduler

Describe alternatives you've considered
N/A

Additional context
We want people to be able to cut and paste code easily. Also having examples/documentation would help people size what it would look like to get Hamilton into their ETL.

config.when decorator with numeric logic input

Is your feature request related to a problem? Please describe.
I would like to avoid if-else statements by using the config.when decorator on columns with numeric data, e.g. being able to produce new columns based on a variety of comparisons like whether inventory column is greater than, equal to or less than a certain threshold value.

Describe the solution you'd like

I would like to use config.when decorator to with numeric column e.g. inventory >300 instead of string comparisons using when_in, when_not_in. It would be great if I could do something like below:

import pandas as pd
from hamilton.function_modifiers import config

# returns inventory values exceeding threshold
@config.when(inventory>=1000)
def inventory_greater_eq_than__1000(inventory: pd.Series) -> pd.Series:
    """TODO:
        :param inventory:
        :return: inventory values exceeding threshold
    """
    pass

@config.when(inventory>=260)
def inventory_greater_eq_than__260(inventory: pd.Series) -> pd.Series:
    """TODO:
        :param inventory:
        :return: inventory values exceeding threshold
    """
    pass

Support Generics

Is your feature request related to a problem? Please describe.
Return/input types of Tuple[str, str], for example, don't work.

Describe the solution you'd like
We should have sophisticated subclass checking.

Describe alternatives you've considered
Adding an option to bypass compile-time type checking (🤮 )

Additional context
Creating metaflow examples and they'd be a lot more self-documenting this way.

Verify `extract_columns` support for distributed dataframes

Is your feature request related to a problem? Please describe.
This is only supported for pandas.

Describe the solution you'd like
TBD, but it should just work. Some options:

  1. We have extract_columns be a class that's subclassable for each type and they add it in as a plugin. extract_columns then knows the mapping from DF type (from the function signature) to implementation
  2. The executor passes in the types for this
  3. We have some sort of transform-writing-friendly dataframe representation (although this is tricky for many reasons.)
  4. Additional decorators for dask, spark, etc...

Additional context
Add any other context or screenshots about the feature request here.

Add the ability to mark/tag nodes

Think @pytest.mark.parametrized -- a way to attach metadata to a set of nodes.

Use-case -- the FED team wants to be able to mark nodes/data products.

Example:

@tag('actuals')
def my_actuals_column(...) -> pd.Series:
    pass

@tag('actuals', 'full_product')
def column_for_both_actuals_and_full_product() -> pd.Series:
    pass


def intermediate_column() -> pd.Series:
    pass

Then list_available_variables would also allow us to have these tags in the Variable object.

driver = Driver(...)
vars = [var for var in driver.list_available_variables() if 'actuals' in var.tags]

This would then allow users to build decorators on top of this:

@actual_column
# The same as
@tag('actuals')

Enable automated testing for non-stitch-fix accounts

Is your feature request related to a problem? Please describe.
Currently we use Stitch Fix's internal testing system. Thus, due to security concerns, we are limited to only allowing SFIX employees to run the tests.

Describe the solution you'd like
We should use a public license -- the free one has 6k build minutes/month which should be enough.

Describe alternatives you've considered
We could switch vendors, but that would be more work.

Additional context
Want to make it easier for others to contribute!

Extract columns executes functions twice (!!)

Short description explaining the high-level reason for the new issue.

From discord conversation.

Current behavior

This test fails:

def test_extract_columns_executed_once():
    dr = hamilton.driver.Driver({}, tests.resources.extract_columns_execution_count)
    unique_id = uuid.uuid4()
    dr.execute(['col_1', 'col_2', 'col_3'], inputs={'unique_id': unique_id})
    assert len(tests.resources.extract_columns_execution_count.outputs) == 1  # It should only be called once

Expected behavior

It should succeed -- E.G. it should only be called once.

Steps to replicate behavior

  1. See unit tests for attached PR

Passing multiple inputs (scalar and columns) to the parametrized_input decorator

Is your feature request related to a problem? Please describe.
I would like to pass multiple inputs (scalar and columns) to the parametrized_input decorator.

Describe the solution you'd like
I would like to create a series of rules for different columns with all parameters in one place for easier management and readability.
Example of a rule is whether any values in the column has exceeded a threshold value, however this could be extended to include any numeric comparisons such as <,>, = or combinations thereof e.g. between value_1 & value_2 (inclusive).

import pandas as pd
from hamilton.function_modifiers import parametrized_input

INV_PARAMS = {
     #input var        (# output var,   # threshold_value          ,  # description of new outputs)
     'inventory', ('inventory_geq_1000', 1000, 'inventory greater than or equal to 1000'),
     'inventory', ('inventory_leq_215', 215, 'inventory less than or equal to 215'),
}
          
@parametrized_input(parameter='inventory', assigned_inputs=INV_PARAMS )
def inventory_geq(inventory: pd.Series) -> pd.Series:
    pass

Thanks so much!

Redo documentation setup

Is your feature request related to a problem? Please describe.
Currently, gitbooks syncs with a specific documentation branch. This does not guarentee that it syncs with releases though.

Describe the solution you'd like
If gitbooks.io doesn't allow for this, let's look at a statically hosted page using github pages or some other hosting service. Ideally we'd even be able to version it with the release version so you can see old docs.


We should consider read the docs for hosting -- since it seems to be the default for python projects of scale.

Tasks:

  • Scope what's required to set read the docs up.
  • Determine how it would function and be rebuilt.
  • #297
  • Plan migration from gitbooks.

Some examples of docs to emulate:

Prototype Data Quality Feature

Is your feature request related to a problem? Please describe.
When creating pipelines, data issues can silently wreak havoc; your code didn't change but the data did and now things are wonky...

To combat that, there are projects like pandera that allow you to annotate functions with expectations, and at runtime, have those expectations checked and appropriately exposed.

Hamilton, should have some support for runtime data quality checks, so that we can not only support clean code bases, but also clean data as well.

Describe the solution you'd like
We should prototype an approach where there is:

  1. A way to set expectations on the output of a function, what the data should like.
  2. Use those expectations either right after function execution, or on conclusion of a Hamilton DAG, or some other way.
  3. A way to specify what should happen when an expectation is not met -- e.g. log warnings, surface warnings, or stop execution.
  4. Thinking of a way to bootstrap these expectations from a dataset -- so that users can update/change expectations easily as time goes on.

Directionally https://pandera.readthedocs.io/en/stable/ seems like a good first approach to try, i.e. via decorators.

Describe alternatives you've considered
This is something that as the prototype is being built out, we're thinking about alternatives considered too.

Additional context
Some ideas on approaches:

Metadata emission

Is your feature request related to a problem? Please describe.
Hamilton encodes a lot of metadata that lives in code. It also creates some at execution time. There are projects such as https://datahubproject.io/, https://openlineage.io/ that capture this metadata across a wide array of tooling to create a central view in a heterogenous environment. Hamilton should be able to emit metadata/executions information to them.

Describe the solution you'd like
A user should be able to specify whether their Hamilton DAG should emit metadata.
This should play nicely with graph adapters, e.g. spark, ray, dask.

UX questions:

  1. Should this be something in the graph adapter universe? E.g. a mixin?
  2. Or should this be on the driver side, so you change drivers for functionality, but change graph adapters for scale...

TODO:

  • find motivating use case to develop for

Improve Documentation

Is your feature request related to a problem? Please describe.
Docs are good for what we have now but will make it tougher to onboard users. We need a stronger narrative/prettier format.

Describe the solution you'd like
@elijahbenizzy has started this on gitbook -- this is a WIP. hamilton-docs.gitbook.io.

Describe alternatives you've considered
Beefing up READMEs, using another platform.

Additional context
Looking good so far! Really excited about feedback on docs, and potentially getting contributors. If you want to contribute post here and I can figure out adding you to the gitbook org. For now we have a free "community plan" meant for OS.

Prototype Lineage Analysis Tooling

Is your feature request related to a problem? Please describe.
Currently, when given a Hamilton DAG, we don't expose ways to ask questions about it.

E.g. For GDPR, Data providence, etc.

E.g.

  1. What if I remove this input, what function(s) will I impact?
  2. What uses some PII data and what is the surface area?
  3. If someone requests to be forgotten, what data do I need to delete?
  4. Who should I talk to when I want to make this change that impacts these functions ? (e.g. use git blame to surface function owner?)
  5. What has changed about the DAG since these two commits?
  6. Are there any cycles?
  7. Are there clusters of disjoint nodes? If so, what are they, maybe I can delete them?
  8. etc

Describe the solution you'd like
This could be a specific "driver class", or something added to the base driver.

Without an end user workflow in mind, it's a bit hard to specify the API.

Also, perhaps this would work well with #4 -- e.g. tagging what is PII, and what isn't?

Describe alternatives you've considered
N/A

Additional context
There are a lot of start ups and organizations trying to get a handle on their data and where it is used. Hamilton can help provide a way to get at this easily...

Show pyspark dataframe support

Is your feature request related to a problem? Please describe.
A common question we get, is does Hamilton support spark dataframes? The answer is yes, but it's not ideal at the moment, and we don't have a vanilla example to point to.

It's not ideal because joins are a bit of a pain -- you need to know the index to join on. In the pandas world, we got away with
this because everything had an index associated with it. In spark, you need to provide it, and know when to provide it.

Describe the solution you'd like
(1) Provide a vanilla pyspark example.
(2) Provide a pattern to show how to handle multiple spark data sources. Perhaps implement a graph adapter to do so.

Describe alternatives you've considered
N/A

Do we want to support iterators for data loading?

What?

If we want to chunk over data, a natural way to do that is via an iterator.

Example: Enable "input" functions to be iterators

def my_loading_funct(...) -> Iterator[pd.Series]:
     ...
     yield some_chunk 

This is fraught with some edge cases. But could be a more natural way to chunk over large data sets? This perhaps requires a new driver -- as we'd want some next() type semantic logic on the output of execute...

Originally posted by @skrawcz in #43 (comment)

Things to think through whether this is something useful to have:

  1. Where would we allow this? Only on "input" nodes?
  2. How would we exercise them in a deterministic fashion? i.e. does execute() care? and we iterate over them until they're exhausted? Or does execute() only do one iteration?
  3. How do we coordinate multiple inputs that are iterators? What if they're of different lengths?
  4. How would we ensure people don't create a mess that's hard to debug?
  5. Would this work for the distributed graph adapters?

[RFC] consolidate CI code into shell scripts

Is your feature request related to a problem? Please describe.

All of this project's CI configuration currently lives in a YAML file, https://github.com/stitchfix/hamilton/blob/main/.circleci/config.yml.

This introduces some friction to development in the following ways:

  • adding a new CI job involves adding a new block of YAML that is mostly the same as the others (see, for example, e2ad136)
  • running tests locally involves copying and pasting commands from that YAML file
  • duplication of code across jobs makes it a bit more difficult to understand what is different between them

Describe the solution you'd like

I'd like to propose the following refactoring of this project's CI jobs:

  • put CI code into one or more shell scripts in a directory like .ci/
    • using environment variables to handle the fact that some jobs are slightly different from others (e.g. the dask jobs don't require installing ray)
  • change .circlci/config.yaml so that it runs those shell scripts
  • document in CONTRIBUTING.md how to run the tests in Docker locally, with commands that can just directly be copied and run by contributors, like this:
    •  docker run \
           --rm \
           -v $(pwd):/app \
           --workdir /app \
           --env BUILD_TYPE="dask" \
           -it circleci/python:3.7 \
            bash .ci/test.sh

Describe alternatives you've considered

Using a Makefile instead of shell scripts could also work for this purpose, but in my experience shell scripts are understood by a broader range of people and have fewer surprises around quoting, interpolation, and exit codes.

Additional context

A similar pattern to the one I'm proposing has been very very useful for us in LightGBM.

Consider, for example, how many different job configurations the CI for that project's R package uses (https://github.com/microsoft/LightGBM/blob/3ad26a499614cf0af075ce4ea93b880bcc69b6bb/.github/workflows/r_package.yml) and how little repetition there is across jobs.

If you all are interested in trying this out, I'd be happy to propose some PRs.

Thanks for considering it!

Prototype Compile Hamilton on to an Orchestration Framework

Is your feature request related to a problem? Please describe.
Another way to scale a Hamilton DAG is to break it up into stages and have some other orchestrator handle execution. Hamilton need not implement these functions itself -- it could just compile and delegate execution to these frameworks.

E.g. I have a Hamilton DAG, but I want to use my in house Metaflow system -- the user should be able to generate code to run on Metaflow.

Describe the solution you'd like
A prototype to show how you could go from a Hamilton DAG to a DAG/Pipeline of some orchestration framework.

E.g.:

You'd have to think through the flow to do this:
e.g. define Hamilton DAG -> Compile to X Framework -> Commit code -> Run code on Framework X

We should prototype at least two implementations and see how we'd need to structure the code to make it manageable to maintain.

Describe alternatives you've considered
Hamilton could implement something like these other orchestration frameworks do, but that seems like a heavy lift. Better to try compiling to an existing framework.

Additional context
N/A

Specify path to save .gv files

Current & Expected behavior

I wish there was a way of specifying the path for saving the DAG (.gv). I had two sets of DAGs and the file gets overwritten as test_output/execute.gv.

Library & System Information

python version= 3.7.11, hamilton library version =1.1.1, linux= yes, Ubuntu 20 installed via WSL 1

Enhancement: Add capability to use a DataFrame as a template for the target output.

Currently if the caller has a DataFrame structure that they are targeting then they need to ensure they match the names of the columns correctly and manually convert the Series types. If the output_columns or other parameter of the execute function took a DataFrame as a template then the output columns would match the data columns and each series can be delivered using astype conversion.

You will probably need something like a DictionaryError for the scenario where there is a column in the DataFrame template that is not in the data columns available.

There is also the option to be able to process compound column names from the DataFrame to map into a more structured DataFrame. This would involve having a join character e.g. _.

enable connecting model node to metric node

slack conversation

Current issue:

  1. model_p_something (“model” node) and prob_something (“metric” node) being a complement pair,
  2. create_database driver (and therefore its crawler) contain both nodes
  3. simulate driver (and therefore its crawler) only contains the “metric” node and not the “model” node.
  4. the simulate driver’s crawler can find the feature dependency of the metric node (presumably through the model coefficient configs)
  5. the crawlers (and therefore the drivers) are unable to find the complementary connection between the model and metric nodes;

item 5 specifically means that a person needs to know the complementary pairs (= domain knowledge; or hard-coded somewhere?) instead of DAG containing this info. A complementary pair is identified via @model decorator.

@model(GLM, 'model_p_demand_manual_by_formerautoship')
def prob_demand_manual_existing_former_autoship() -> pd.Series:
    pass

We would like there to be a systematic mapping between the complementary pairs.

Support typing.TypeVar annotations

Hamilton fails on TypeVar annotations.

Current behavior

If you define functions like the following:

import typing

StrSeries = typing.TypeVar("StrSeries", bound="pd.core.series.Series[str]")

def normalize_situs(situs_addr: StrSeries) -> StrSeries:
    normalized = situs_addr.map(str.upper)
    return normalized

def foo_bar(normalize_situs: StrSeries) -> StrSeries:
    return normalize_situs

and then try to create a DAG it breaks.

Stack Traces

Traceback (most recent call last):
  File "sf/hamilton/examples/temp/run.py", line 21, in <module>
    dr = driver.Driver({'situs_addr': pd.Series(['a', 'b', 'c'])}, repo)
  File "sf/hamilton/hamilton/driver.py", line 47, in __init__
    self.graph = graph.FunctionGraph(*modules, config=config, adapter=adapter)
  File "sf/hamilton/hamilton/graph.py", line 183, in __init__
    self.nodes = create_function_graph(*modules, config=self._config, adapter=adapter)
  File "sf/hamilton/hamilton/graph.py", line 117, in create_function_graph
    add_dependency(n, node_name, nodes, param_name, param_type, adapter)
  File "sf/hamilton/hamilton/graph.py", line 78, in add_dependency
    if custom_subclass_check(required_node.type, param_type):
  File "sf/hamilton/hamilton/graph.py", line 41, in custom_subclass_check
    if issubclass(requested_type, param_type):
TypeError: issubclass() arg 1 must be a class

Expected behavior

Hamilton should be happy to handle TypeVar defined types.

Library & System Information

E.g. python version, hamilton library version, linux, etc.

Steps to replicate behavior

  1. define functions above into module.
  2. Run
import repo
dr = driver.Driver({'situs_addr': pd.Series(['a', 'b', 'c'])}, repo)

Additional context

Flagged by discord user https://discord.com/channels/842466558483496997/842467113611821087/955688991749980200

Reusable subDAG components

Is your feature request related to a problem? Please describe.
Reusing functions, helpers, etc... are all nice and good. However, sometimes you want to be able to reuse large subcomponents in the DAG.

Describe the solution you'd like

Two idea:

  1. Use the Driver to stich bigger DAGs together
  2. Two decorator (still need to fully think this through)

This uses prefixes, but some actual namespace notion could be nice here.

@hamilton.subdag_split(
    inputs={
        'subdag_1' : {'source' : 'source_for_dag_1'}, # subdag 1 gets its source from a different place than subdag 1
        'subdag_2 : {'source' : 'source_for_dag_2'}}} 
def data(source: str) -> pd.DataFrame:
    return _load_data(source)

def foo(data) -> pd.Series:
    return _do_something(data)

@hamilton.subdag_join(subdag='subdag_1')
def process_subdag_1(foo) -> pd.Series:
    return _process_some_way(foo)

The framework would then compile this in a fancy way -- ensuring that every node between the splits and the joins is turned into one for each subdag, under a separate namespace. TBD on how to access it.

Describe alternatives you've considered
Not allowing this -- I don't have a concrete use-case blocking for anyone but we have observed one at Stitch Fix.

Additional context
Thinking ahead here.

Have `graphviz` be an optional dependency?

Is your feature request related to a problem? Please describe.

This library is already nice and light, but I think it could be lighter!

Describe the solution you'd like

Could graphviz be an optional dependency? It's only required for one function (display), which I don't think is really core to the functionality of the package.

Describe alternatives you've considered

Could keep it in. It is fairly light on its own, though it is a compiled dependency.

Dynamic data loading from PostgreSQL database

Problem description
My data is stored in several PostgreSQL tables with millions of rows, and time indices at different resolution. When building new features (new columns), I would like to:

  1. Load only the subset of required columns from the table
  2. Avoid reloading the same table

Draft solution
By default, PostgreSQL generates the schema information_schema with the view columns which contains details about the existing schema in the database. For my purpose, I store this information in a pandas DataFrame named catalog with the columns ["schema", "table", "column"]. Here's some pseudo code of my current implementation (code will follow):

1. connect to db
2. get the `db_catalog` as a pandas DataFrame
3. instanciate the Hamilton Driver with a feature module
4. pass a list `features_to_build` with the feature names as strings (matches Hamilton function definition)
5. filter `features_to_build` to keep only features that are not in the `db_catalog` column named "column" (each table column name is unique as in Hamilton)
6. for the remaining features, traverse upstream the Hamilton DAG to get their `top_level_dependencies`
7. search for the `top_level_dependencies` in the `db_catalog`
8. iterate through `top_level_dependencies` and load tables from db (here, I avoid redundant loading with pandas groupby)
9. from the `top_level_dependencies`, traverse downstream the Hamilton DAG to their `impacted_node`
10. filter `impacted_node` to keep only features that are in `features_to_build`
11. pass to Hamilton Driver the filtered `impacted_node` as `final_vars`, and the loaded tables as `inputs`

Code pieces:

step 3

hamilton_driver = Driver({}, hamilton_features)

step 4, 5, and 6

def get_dependencies(driver, features: List[str], index: List[str]) -> List[str]:
    filtered = list(filter(lambda v: v.name in features, driver.list_available_variables()))
    if len(filtered) != len(features):
        raise KeyError("some dependencies functions are not defined")

    _, node_dependencies = driver.graph.get_upstream_nodes(final_vars=features)
    dependencies = [d._name for d in node_dependencies if d._name not in index]  # index are rows that are not Hamilton features
    return dependencies

step 7 and 8

dependencies_catalog = catalog.query(features=dependencies)
for (schema, table), top_level_dependencies in dependencies_catalog.groupby(["schema", "table"])["column"]:
    # compose SQL query string
    query = db_engine.compose_sql_table_query(
        schema=schema,
        table=table,
        columns=top_level_dependencies.to_list() + list(catalog.index),  # ensure index is loaded + feature columns
    )
    # get DataFrame from db
    table_df = db_engine.get_table_df(query=query)

step 9, 10, 11

# within the for loop of step 7 and 8
    impacted_node = hamilton_driver.graph.get_impacted_nodes(top_level_dependencies)
    output_df = hamilton_driver.execute(
        final_vars=[n._name for n in impacted_node if n._name in features_to_build],
        inputs=table_df.to_dict(orient="series")
    )

    do_something(output_df)

Additional context
For now this seem to solve the problem, but it probably could be more efficient for the filtering / selection of upstream and downstream nodes. Also, for heavy computation workflows, there should be ways to truncate the DAG to find intermediary results that are stored in PostgreSQL instead of always selecting the top_level_dependencies

I was given tips on Discord to use decorators. I will look into it!

Rename `model` to `dynamic_node`

Is your feature request related to a problem? Please describe.
Model comes from a specific use-case at Stitch Fix, and is not applicable (in the same way) to the outside world.

Describe the solution you'd like
While it potentially applies more generally, dynamic_node would be a better name. We can leave around model for now, deprecating it in version 2.0.0

Describe alternatives you've considered
Pretty simple, no need.

Additional context
Getting ready for more users/writing docs.

Add pandas result builder that converts to long format

Is your feature request related to a problem? Please describe.
Hamilton works on "wide" columns -- not "long ones". However the "tidy" data ethos thinks data should be in a long format -- it does make some things easier to do.

Describe the solution you'd like
Add a ResultBuilder variant that takes in how you'd want to collapse the resulting pandas dataframe.

Describe alternatives you've considered
People do this manually -- but perhaps in the result builder makes more sense.

Additional context
Prerequisites for someone picking this up:

  • know Pandas.
  • know python.
  • can write the pandas code to go from wide to long.
  • can read the Hamilton code base to figure out where to add it.

Modin integration

Is your feature request related to a problem? Please describe.
Modin - https://github.com/modin-project/modin - also enables scaling pandas computation. Since we have ray, dask, and koalas, why not add Modin?

Describe the solution you'd like
Modin requires a replacement of the pandas import in user code to work.
We would need to think how to do this:

  1. Do we get people to import "pandas" from hamilton, and we can then control which pandas is actually imported?
  2. Do we require users then to assume modin, by changing the pandas import themselves when defining their hamilton python functions?
  3. Or is there some other way to integrate? E.g. a graph adapter

Additional context
N/A

Requirements to open source hamilton

  • examples to run/use hamilton
  • onboarding documentation
  • contributor documentation
  • contributor guidelines
  • scrub documentation
  • scrub commit history
  • Determine license
  • Legal sign off -- yes changed BSD-3 Clause Clear License
  • [] push to pypi
  • [] make repo public
  • [] notify interested persons
  • [] publish blog post

Slightly leaky abstraction -- figure out the best way to extend this

This is for an extension of hamilton called hamiltime. Currently it's perfectly abstracted away except for this enum. Let's figure out how to make it a completely separate product.

PRIOR_RUN = 3 # This node's value sould be taken from a prior run. This is not used in a standard function graph, but it comes in handy for repeatedly running the same one.

Some ideas:

  1. Release hamilton with hamiltime
  2. Have this be an extensible class that hamiltime can overwrite
  3. Have the function_graph, not the node type know about the node sources
  4. ...

Flytekit integration

Is your feature request related to a problem? Please describe.
We should be able to create a flyte workflow from Hamilton functions. i.e. recreate:

from flytekit import task, workflow

@task
def sum(x: int, y: int) -> int:
   return x + y

@task
def square(z: int) -> int:
   return z * z

@workflow
def my_workflow(x: int, y: int) -> int:
   return sum(x=square(z=x), y=square(z=y))

print(f"my_workflow output: {my_workflow(x=1, y=2)}")

in a Hamiltonesque way.

Describe the solution you'd like
We should be able to recreate the above by doing something like this:

# my_funcs.py
def square_x(x: int) -> int:
       return x * x

def square_y(y: int) -> int:
       return y * y

def sum_square_x_y(square_x:int, square_y: int) -> int:
       return square_x + square_y
from hamilton import driver
from hamilton.experimental import h_flyte
import my_funcs

fga = h_flyte.FlyteGraphAdapter(...)
dr = driver.Driver({}, my_funcs, adapter=fga)
result = dr.execute(["sum_square_x_y"], inputs={"x": 1, "y": 2})
print(result)

Describe alternatives you've considered
TBD.

Additional context
Docs:

This feels very similar to https://www.prefect.io/ v2.0 -- so maybe whatever pattern we come up with here would also help provide integration there.

1.3.0 release

Tracking here. Goal to do this tomorrow. Order of business is:

  • Delete augment from decorator-refactor
  • Merge distributed_prototype (@skrawcz) (#47)
  • Rebase, merge decorator-refactor (@elijahbenizzy) (#28)
  • Rebase, merge fix-inputs (#56)
  • Rebase, merge stefan/refactor_visualization (@skrawcz) (#58)
  • Create a candidate release
  • Run some tests in internal tooling
  • Add any documentation required (@stefan)
  • Announce on discord
  • Add 1.3.0 commit
  • Publish library

Add ResultMixin implementations for Dask native types

Is your feature request related to a problem? Please describe.

We should implement useful implementations of:

class ResultMixin(object):
    """Base class housing the static function.

    Why a static function? That's because certain frameworks can only pickle a static function, not an entire
    object.
    """
    @staticmethod
    @abc.abstractmethod
    def build_result(**outputs: typing.Dict[str, typing.Any]) -> typing.Any:
        """This function builds the result given the computed values."""
        pass

for use with Dask. E.g. returning a Dask native array, dataframe, bag, etc. Currently the default is to return a pandas dataframe.

See the build_result function in DaskGraphAdapter for a reference point on how it could be used.

Describe the solution you'd like
These should probably be placed in the h_dask.py module for now. Otherwise open to naming.

Alternatively, we could include more options in DaskGraphAdapter. Open to thinking what way is the most user friendly solution going forward.

Additional context
The addition of these ResultMixins should enable a user who is using Dask, to not have to implement their own version,
instead they can use the ones that come with Hamilton.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.