Giter Club home page Giter Club logo

teehr's Introduction

alt text

alt text Funding for this project was provided by the National Oceanic & Atmospheric Administration (NOAA), awarded to the Cooperative Institute for Research to Operations in Hydrology (CIROH) through the NOAA Cooperative Agreement with The University of Alabama (NA22NWS4320003).

TEEHR - Tools for Exploratory Evaluation in Hydrologic Research

TEEHR (pronounced "tier") is a python tool set for loading, storing, processing and visualizing hydrologic data, particularly National Water Model data, for the purpose of exploring and evaluating the datasets to assess their skill and performance.

NOTE: THIS PROJECT IS UNDER DEVELOPMENT - EXPECT TO FIND BROKEN AND INCOMPLETE CODE.

Documentation

TEEHR Documentation

How to Install TEEHR

Install poetry

$ pipx install poetry

Install from source

# Create and activate python environment, requires python >= 3.10
$ poetry shell

# Install from source
$ poetry install

Install from GitHub

# Using pip
$ pip install 'teehr @ git+https://github.com/RTIInternational/teehr@[BRANCH_TAG]'

# Using poetry
$ poetry add git+https://github.com/RTIInternational/teehr.git#[BRANCH TAG]

Use Docker

$ docker build -t teehr:v0.3.28 .
$ docker run -it --rm --volume $HOME:$HOME -p 8888:8888 teehr:v0.3.28 jupyter lab --ip 0.0.0.0 $HOME

Examples

For examples of how to use TEEHR, see the examples. We will maintain a basic set of example Jupyter Notebooks demonstrating how to use the TEEHR tools.

Resources

In May of 2023 we put on a workshop at the CIROH 1st Annual Training and Developers Conference. The workshop materials and presentation are available in the workshop GitHub repository: teehr-may-2023-workshop. This workshop was based on version 0.1.0.

Versioning

The TEEHR project follows semantic versioning as described here: https://semver.org/. Note, per the specification, "Major version zero (0.y.z) is for initial development. Anything MAY change at any time. The public API SHOULD NOT be considered stable.". We are solidly in "major version zero" territory, and trying to move fast, so expect breaking changes often.

teehr's People

Contributors

kordal3 avatar manjilasingh avatar mavocado4 avatar mgdenno avatar samlamont avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

teehr's Issues

Forcing data units and variable

Current forcing data ingest uses the units and variable from the NetCDF files, which is mm s-1 and RAINRATE. We tentatively agreed we would use mm/hr and precipitation_rate in TEEHR. We should discuss this and if appropriate add some additional unit conversions to the loading.

Improve the weights file generation

It looks like we can improve the weights file generation by vectorizing the raster cells and union with the catchments to get and exact solution and faster. win, win.

joined timeseries query returning multiple rows with same ref-time, value-time combination

Due to overlapping timesteps in the Extended AnA output, timeseries queries that use ExtAnA as the primary (e.g., forcing) are returning duplicate rows (same ref time and value time). The variable value can be different between them (e.g., Stage IV data can be adjusted/corrected before next days run) so we cannot simply remove duplicates, we need to check the reference time of the ExtAnA and only keep the most recent. Maybe there needs to be a flag to indicate which to keep if dups are found? If the same behavior is happening behind the metric query, the metrics will be skewed/wrong so need to deal with it within those queries too.

Add capability to handle list of pre-built json objects from *remote* store

Goal
Add feature to permit use of the pre-built json headers for the NWM retrospective and operational archives. This will add a significant speedup to this already zippy library.

Building the json takes about 30 seconds per file (+/- 50%) and the goal here is to avoid cost of building the json in general. The current script builds the .json and stores it locally. I understand there is (and I would welcome a link to) and issue addressing the ability to use the stored .jsons (instead of rebuilding each time). This issue is similar, except we would be pulling the pre-built .json from a an AWS bucket.

Below, is a code snippet demonstrating access to those buckets with no download -- this is extremely efficient for one-time access. An alternate method might be to download the .json from the bucket and then to use the workflow as already considered in the above referenced issue. The tradeoff one way or the other is having a location (or not) to store the .json.

Explanation
There are a couple of ways this could be handled. My suggestion is that we begin by permitting two types of input (either a list of strings or list of json objects) at this point in the script --

json_paths: List[str],

For reference, we have been experimenting with the function of this script using the example notebook here:
https://github.com/RTIInternational/teehr/blob/main/examples/loading/grid_loading_example.ipynb

The idea would be that the user could choose

  1. to work completely without download (pulling the json objects and data values entirely from buckets (and perhaps even storing the result in an output bucket
  2. to work from local cache
    a) with a json object out in the cloud
    b) with a stored json object (downloaded from the cloud or generated)
    c) with a json object generated by this script during the run. (The current default)

Partial Example
The following script demonstrates use of fsspec to obtain a series of json objects from the AWS buckets of json headers for the native NWM output. The json objects are then used to open a dataset from the file. This is one way we could access the pre-built files with no download or local storage.
Retrospective output is here: https://ciroh-nwm-zarr-retrospective-data-copy.s3.amazonaws.com/index.html
Operational output (in progress): https://ciroh-nwm-zarr-copy.s3.amazonaws.com/index.html

!pip install ujson
!pip install zarr xarray

import joblib
import numpy as np
import xarray as xr
import fsspec
import ujson
import psutil
import concurrent.futures
import multiprocessing

# Get the number of available CPU cores
num_cores = psutil.cpu_count(logical=False)  # Use logical=True for hyperthreading

# Calculate the available memory in GB
available_memory_gb = psutil.virtual_memory().available / (1024 ** 3)  # Bytes to GB

# Calculate a memory limit per worker based on available memory
# Adjust this factor based on your memory usage requirements
memory_per_worker_gb = available_memory_gb / num_cores

# Define a function to load remote JSON content
def load_remote_json(file_url):
    of = fsspec.open(file_url)
    with of as f:
        return ujson.load(f)

# Define a function to load a remote dataset from JSON content
def load_remote_ds(json_obj):
    backend_args = {
        "consolidated": False,
        "storage_options": {
            "fo": json_obj,
        },
    }
    # ON this side, the "fo": points to a json object -- dict type. 
    # on the original script side, "fo" is receiving a string with a file path.
    # We need to modify the library and make a pull request where 
    # we use the loading method in this script with the json object. 
    # The original script assumes there would be a json built at run-time 
    # and stored in a temp/cache directory, so this function only passes
    # the string of the path.
    
    return xr.open_dataset("reference://", engine="zarr", backend_kwargs=backend_args)

# Define a function to select streamflow data from a dataset
def select_flow(ds, feature_id):
    cords = ds.streamflow.sel(feature_id=feature_id)
    return cords.values

# Define a function to process a single file for a given feature ID
def process_file(file_url, feature_ids):
    json_obj = load_remote_json(file_url)
    ds = load_remote_ds(json_obj)
    streamflow_value = select_flow(ds, feature_ids)
    return streamflow_value

# Define a list of feature IDs
feature_ids = [8153461, 8153027, 18210860]
feature_ids = ost

# Define a list of URLs pointing to JSON files
files = []
files.append("https://ciroh-nwm-zarr-copy.s3.amazonaws.com/national-water-model/nwm.20220112/short_range/nwm.t00z.short_range.channel_rt.f001.conus.nc.json")
files.append("https://ciroh-nwm-zarr-copy.s3.amazonaws.com/national-water-model/nwm.20220112/short_range/nwm.t00z.short_range.channel_rt.f002.conus.nc.json")

# Create a dictionary to store the extracted values for each feature ID
extracted_values_dict = {}

# Create a Parallel processing pool using joblib with dynamic settings
with joblib.parallel_backend("threading", n_jobs=num_cores):  # Use threads for parallel processing
    streamflow_values = joblib.Parallel()(joblib.delayed(process_file)(file_url, feature_ids) for file_url in files)

@pclemins
@TrupeshKumarPatel
@hariteja-jajula
@samlamont

Add more complex metrics

Dask data frame metrics...

Concept. Use DuckDB queries to get joined timeseries, then calculate metrics that are more difficult to calculate in SQL.

Filtering can be done in SQL, grouping and metrics in Pandas/Dask.

Build NGEN (NWM v4.x) output parser to Parquet

For now, I think a good place to start is parsing out the example inputs and outputs that come with ngen. Inputs are in the data directory in the NGEN repo. Outputs will need to be generated. This will be a good starting point, but the actual NGEN output can be messy.

Add flag to control behavior when files are missing

Currently, if the ingest process tries to process a file that does not exist, it fails. Some times this may be the desired behavior so that. for example, a workflow will recognize that a file was missing and try again later. This might be common for example, when downloading recently posted data and some hasn't been posted yet. However, sometimes, particularly with older data the may not exist and never will. In those cases the user probably wants to note that a file was missed and move on to the next one without failing.

Set up CI/CD

Not 100% sure how far we want to go here. Brainstorm:

  • Need to decide what triggers actions (tag, push to main, PR to main, etc.)
  • What should happen?
  • Run tests and auto linter
  • Build Docker container and push to (GH, DockerHub), which account?
  • Build Python package and push to PyPi, which account?
  • Track release notes
  • Auto tagging? With some approach, such as version + number of commits (e.g., v0.1.0+10)? Somethin else?
  • What else?

Investigate how to use S3 buckets

DuckDB supports querying directly from the cloud storage. I think we need to refactor the queries to do this. Some things I'm pretty sure we need to do.

  • Create a connection and then conn.execute() instead of duckdb.query. This will allow us to install and enable the S3 extension.
  • Need to allow the user to specify the region, access key and access secret.
  • May make sense to make the queries a class where we can specify the required info once. For example on init, the user could specify AWS credentials, timeseries paths, etc. The there could be query methods...need to think this through.

Investigate building a data pre-processor

Summary
Brainstorming notes on building a pre-processor to convert raw data to TEEHR data model (de-coupling the processes of downloading raw data and importing to TEEHR). This would allow us to optimize each process independently

Benefits
• By ignoring the steps to format to TEEHR data model NWM point data can be downloaded much more efficiently (ex. 2 days of medium range forecasts for all reaches < 10 mins on large 2i2c instance; single jsons --> multizarrtozarr --> dask dataframe --> partitioned parquet)
• The pre-processor can import raw data directly into a duckdb database, potentially improving query performance (as opposed to querying parquet files)
• The pre-processor can include methods to calculate additional fields (eg. binary exceedence flags) to further optimize query performance
• A standalone tool will provide more flexibility for users to import raw data to the TEEHR data model (likely necessary for testbeds)
• Components:
-- Field mapping to connect raw data attributes to TEEHR data model
-- Methods to convert local files (parquet, csv, etc) to TEEHR data model
-- Pydantic models of TEEHR data model tables

cc. @mgdenno

Adds NWM v3.0 to loading utils

NWM 3.0 is live. Looks like the first v3 outputs in GCP are 9/19 12z.

We need to add loading utilities for v3. I suspect that it is largely backwards compatible with our v2.2 code, but I understand there are additional data available that our v2.2 code likely does not allow to be fetched.

tzdata error when loading data on local machine

Getting an error when trying to load data on my local machine due to missing 'tzdata' file:

File ~\Anaconda3\envs\postsm\lib\site-packages\teehr\loading\nwm22\nwm_point_data.py:100, in process_chunk_of_files(df, location_ids, configuration, variable_name, output_parquet_dir)
     98     filename = f"{min_ref_str}_{max_ref_str}.parquet"
     99 else:
--> 100     min_ref_str = pa.compute.strftime(min_ref, format="%Y%m%dT%HZ")
    101     filename = f"{min_ref_str}.parquet"
    103 pq.write_table(output_table, Path(output_parquet_dir, filename))

File ~\Anaconda3\envs\postsm\lib\site-packages\pyarrow\compute.py:256, in _make_generic_wrapper.<locals>.wrapper(memory_pool, options, *args, **kwargs)
    254 if args and isinstance(args[0], Expression):
    255     return Expression._call(func_name, list(args), options)
--> 256 return func.call(args, options, memory_pool)

File ~\Anaconda3\envs\postsm\lib\site-packages\pyarrow\_compute.pyx:355, in pyarrow._compute.Function.call()

File ~\Anaconda3\envs\postsm\lib\site-packages\pyarrow\error.pxi:144, in pyarrow.lib.pyarrow_internal_check_status()

File ~\Anaconda3\envs\postsm\lib\site-packages\pyarrow\error.pxi:100, in pyarrow.lib.check_status()

ArrowInvalid: Cannot locate timezone 'UTC': Timezone database not found at "C:\Users\kvanwerkhoven\Downloads\tzdata"

Seems to be a known issue:
https://stackoverflow.com/questions/76629191/arrowinvalid-cannot-locate-timezone-utc-timezone-database-not-found

But the fix isn't obvious.

USGS loading function not pulling last requested date of range

USGS loading function is skipping the last requested day of data. e.g., the below example pulls data from Aug1-4 but nothing on the 5th. I tried different dates, period lengths and ending hours and same result every time - the last day does not load though the data exist on NWIS.

tlu.usgs_to_parquet(
['02137727'],
datetime(2023, 8, 1, 0),
datetime(2023, 8, 5, 23),
Path('/data/post-event/events/test/timeseries/usgs'),
chunk_by = 'day'
)

Investigate the use of a query builder

Our current approach to building queries uses string formatting to construct the required queries. This is not a great approach. While we validate user input using Pydantic models, this pattern could allow user input to make it to the query un-validated. Using a query builder to construct queries may be a better approach.

Integrate poetry

Integrate poetry for dependency management and packaging. Targeted for v0.3.0 release

  • Docker image builds and tests pass in container
  • Confirm pip install from github works
  • Update pangeo image version
  • Update documentation

`nwm_to_parquet` silently overwrites existing files

Running the last cell of this example twice:

https://github.com/RTIInternational/teehr/blob/main/examples/loading/point_loading_example.ipynb

seems to result in complete re-execution. Comparing the timestamps on the parquet and json files looks like existing files are overwritten with no warning to the user. This presents challenges to the use of teehr in a scripting context or even just blind re-execution of a notebook. A warning and an option to overwrite or ignore existing files would be useful.

Add max field to gdf returned from get_metrics

For post event dashboard 2, I need the maximum value of primary and secondary data for all value_times included in the query:
primary_max:
secondary_max:

I also need the sum of all values, but can calculate from the average. If for any reason the sum would NOT reliably equal the average x count, than also include primary_sum and secondary_sum

Exclude negative values in metrics calculations

Missing data (-999) are being included in metrics calculations (primary average flow = -23000). The user can include a filter for "value>=0" to avoid, but maybe should have a default to exclude impossible (negative) values (e.g., argument like "exclude negative" that defaults to True)

Filtering duplicates from primary impacts performance

These lines from get_metrics() look to negatively impact performance pretty significantly, esp. when the query has filters applied. Guessing it forces a read of the entire primary dataset even when only some of it is needed.

WITH filtered_primary AS (
    SELECT * FROM(
        SELECT *,
            row_number()
        OVER(
            PARTITION BY value_time, location_id
            ORDER BY reference_time desc
            ) AS rn
        FROM read_parquet("/home/jovyan/data/protocols/retro/timeseries/usgs/*.parquet")
        )
),

34 seconds vs 4 seconds when filtering 3 years of retrospective data at all USGS stations to a single USGS station.

We might need a flag to only include this when the user requests that the primary data be "de-duped".

Check for Zarr JSON before creating

Loading code should check if a zarr JSON file exists before creating it. This process is fairly time consuming and only really needs to be done once per file.

Add unit conversion option to queries

Not sure how complex we want to make this, but the ability to choose what units are returned for a query would be a good feature to have. Need to think about how that would look in the query function arguments.

update retrospective loading to handle multiple versions

Right now the src/teehr/loading/nwm21/nwm21_retrospective.py script has the version (2.0 or 2.1) hard coded and also incorrectly labels the location_id as nwm22-*. This should be a relatively easy upgrade to fix the scripts to accommodate both v2.0 and v2.1.

Add a simple webservice API

The idea of this issue is to add the ability to serve out the data that is currently available via the library functions (e.g. get_timeseries() or get_metrics()) via a web API such that it can more easily be used to provide data to a webpage.

Ultimately I think this could have three components:

  1. Query and existing dataset to get the data currently available via the library functions. We will start here.
  2. Create a new evaluation - this would involve describing what datasets you want to evaluate, and then kick off a job to fetch and process that data. Currently this is done via the library.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.