Giter Club home page Giter Club logo

django-geo-spaas-processing's Introduction

Unit tests and builds Coverage Status

GeoSPaaS processing tools

This package brings processing capabilities to GeoSPaaS.

It is composed of:

  • several processing modules which can be used to perform various operations on the datasets referenced in a GeoSPaaS database.
  • the code necessary to run these operations asynchronously as Celery tasks.

The processing modules can either be run as standalone code or asynchronously as Celery tasks.


Overal table for showing the usage of short-form of arguments of all CLIs individually

Argument short form Download CLI Copy CLI
'-d' '--destination_path' '--destination_path'
'-b' '--begin' (time) '--begin' (time)
'-e' '--end' (time) '--end' (time)
'-r' '--rel_time_flag' '--rel_time_flag'
'-g' '--geometry' '--geometry'
'-q' '--query' '--query'
'-c' '--config_file'
'-s' '--safety_limit'
'-a' '--save_path'
'-l' '--link'
'-t' '--type'
'-f' '--flag_file'
'-ttl' '--time_to_live'

Dependencies

The main requirement is to have a populated GeoSPaaS database (see django-geo-spaas and django-geo-spaas-harvesting).

For standalone usage, the dependencies depend on which processing module is used.

For asynchronous usage, the following is needed (not including the additional dependencies for each processing module):

  • a RabbitMQ instance
  • a Redis instance
  • Python dependencies:
    • celery<5.0
    • django-celery-results
    • redis

Processing modules

downloaders module

The downloaders module provides the ability to download datasets referenced in a GeoSPaaS database.

Usage

The entrypoint for this module is the DownloadManager class. It takes care of downloading the datasets matching the criteria it is given.

The criteria take the same form as those used in Django filters.

In its simplest use case, DownloadManager can be used as follows:

# This will download the dataset whose ID is 1 in the current directory
download_manager = DownloadManager(id=1)
download_manager.download()

The behavior of a DownloadManager can be altered using parameters, as shown below:

# Downloads the dataset in /tmp
download_manager = DownloadManager(download_directory='/tmp', id=1)
download_manager.download()

# Use specific provider settings, like credentials or a limit on parallel downloads
download_manager = DownloadManager(download_directory='/tmp',
                                   provider_settings_path='./provider_settings.yml',
                                   id=1)
download_manager.download()

# If the number of selected datasets is superior to the max_downloads argument,
# an exception will be raised and nothing will be downloaded.
# This is a safety measure to avoid filling a disk if a wrong criterion is given.
download_manager = DownloadManager(max_downloads=10, source__instrument__short_name='SLSTR')
download_manager.download()

Note than when other parameters are given, the dataset selection criteria must be the last arguments.

Credentials

Some providers require authentication to download datasets. The credentials for a particular provider can be defined in the provider settings file (by default the provider_settings.yml file included in the package).

It is a YAML file with the following structure:

---
'<provider_url_prefix>':
  property1: 'value1'
  property2: 'value2'
'<provider2_url_prefix>':
  property1: 'value1'
  property3: 'value3'
...

The provider prefixes will be matched against the URI of the dataset to determine which settings apply. The available settings are the following:

  • username: the user name
  • password: the password
  • max_parallel_downloads: the maximum number of downloads which can run simultaneously for a provider
  • authentication_type: for providers which do not use basic authentication, it is possible to specify an alternative authentication type. For now, only OAuth2 is supported.
  • token_url: for OAuth2, the URL where tokens can be retrieved
  • client_id: for OAuth2, the client ID to use

Enabling limits on the number of parallel downloads

This is only useful if multiple downloaders are run simultaneously.

If necessary, the DownloadManager can use a locking mechanism to avoid downloading too many files from the same provider at once.

This functionality requires a Redis instance and the redis pip package. The connection information to the Redis instance can be specified via the following environment variables:

  • GEOSPAAS_PROCESSING_REDIS_HOST
  • GEOSPAAS_PROCESSING_REDIS_PORT

If these conditions are fulfilled, the locking functionality is activated automatically.

To define a limit for a particular provider, a max_parallel_downloads entry must be added in the provider's configuration section in the provider settings file.

converters subpackage

The converters subpackage contains code to convert datasets from one format to another. The conversion process must be adapted to the product and desired output format, which is why the following structure is used.

Base classes for managing conversions are defined in the converters.base module.

The Converter class is the parent of classes which handle the actual conversion process. The ConversionManager class is the parent of classes used to choose which converter to use depending on the dataset. Each converter has a PARAMETER_SELECTORS class attribute. It contains a sequence of ParameterSelector objects which are used by the conversion manager to know in which case the converter can be used and how to instantiate it.

Here is an example of declaration and usage of such classes:

from geospaas_processing.converters.base import (ConversionManager,
                                                 Converter,
                                                 ParameterSelector)


class ExampleConversionManager(ConversionManager):
    """Example conversion manager"""


@ExampleConversionManager.register()
class ExampleConverter(Converter):
    """Example converter"""

    # define the conditions for using this converter and the keyword
    # arguments to pass to its constructor
    PARAMETER_SELECTORS = (
        ParameterSelector(
            matches=lambda d: d.entry_id.startswith('something'),
            param='foo'),
        ParameterSelector(
            matches=lambda d: d.entry_id.startswith('something_else'),
            param='bar'),
    )

    def __init__(self, param):
        self.param = param

    def run(self, in_file, out_dir, **kwargs):
        """Conversion method"""
        # conversion code goes here


@ExampleConversionManager.register()
class SpecialExampleConverter(ExampleConverter):
    """Example converter to be used in a special case"""
    PARAMETER_SELECTORS = (
        ParameterSelector(
            matches=lambda d: d.entry_id.startswith('something_special'),
            param='baz'),
    )

    def run(self, in_file, out_dir, **kwargs):
        """Conversion method for a special case"""
        # conversion code goes here

# Run an actual conversion
conversion_manager = ExampleConversionManager('/working_directory')
conversion_manager.convert(dataset_id=42, file_name='dataset.nc')

converters.idf

The converters.idf.converter module contains the IDFConversionManager and IDFConverter classes which can be used to convert downloaded dataset files to the IDF format for use with SEAScope.

converters.syntool

The converters.syntool.converter module contains the SyntoolConversionManager and SyntoolConverter classes which can be used to convert downloaded dataset files to a format allowing to display them in a Syntool portal.

Tasks queue

Celery is a framework that enables to submit tasks into a queue. The tasks are then processed by one or more workers. For more information, please check out the Celery documentation.

The geospaas_processing package offers the options to run the processing modules as Celery tasks.

Architecture

Here is a description of the architecture in which geospaas_processing is supposed to be used.

The components are:

  • a Celery worker
  • a RabbitMQ instance
  • a Redis instance
  • a Database
  • the client that triggers jobs (for example a REST API)

geospaas_architecture

The workflow represented on the diagram is the following:

  • the client submits tasks to the queue
  • the worker retrieves tasks from the queue and executes them
  • the results of the tasks are stored in the database and can be accessed by the client
  • the Redis instance is used to synchronize the multiple processes spawned by the worker.

Tasks

The geospaas_processing.tasks subpackage provides various Celery tasks divided into separate modules. Each tasks module has its own "theme": one deals with IDF conversions, one with Syntool conversions, one with generic operation like downloading a dataset's files. This structure makes it possible to run a separate celery worker for each module. Among other things, it makes it easier to deal with each group of tasks' requirements.

Most of these tasks are designed to work with datasets which are present in the GeoSPaaS database. They take one argument: a tuple containing a dataset ID as it's first element, and other elements depending on the task. This makes it easy to chain the tasks and makes it possible to prevent simultaneous operations on the same dataset's files via the lock_dataset_files(). decorator.

Example of command to start a worker:

celery -A geospaas_processing.tasks.core worker -l info -Q core -E -c 4 --max-tasks-per-child 4

See the Celery documentation for more details.

tasks.core

Generic tasks.

download()

Downloads a dataset.

Example:

# Asynchronously downloads the dataset whose ID is 180
geospaas_processing.tasks.core.download.delay((180,))
remove_downloaded()

Removes the downloaded files for a dataset.

Example:

# Remove files for the dataset whose ID is 180
geospaas_processing.tasks.core.remove_downloaded.delay((180,))
archive()

Compresses a dataset file into a tar.gz archive.

geospaas_processing.tasks.core.archive.delay((180, './dataset_180.nc'))
publish()

Copies the given file or directory to a remote server using SCP.

This task also requires the following environment variables to be set:

  • GEOSPAAS_PROCESSING_FTP_HOST: the hostname of the server to which the files will be copied
  • GEOSPAAS_PROCESSING_FTP_ROOT: the FTP root folder
  • GEOSPAAS_PROCESSING_FTP_PATH: the path where the files must be copied relative to the FTP root folder.

The variables are named like that because the original purpose of this task is to publish files on an FTP server accessible via SCP.

A little more detail about these variables:

  • they are concatenated with a slash as separator to determine the absolute path to which files must be copied on the remote server.
  • GEOSPAAS_PROCESSING_FTP_HOST and GEOSPAAS_PROCESSING_FTP_PATH is used to determine the URL of the copied files

For example, given the following values:

  • GEOSPAAS_PROCESSING_FTP_HOST='ftp.domain.com'
  • GEOSPAAS_PROCESSING_FTP_ROOT='/ftp_root':
  • GEOSPAAS_PROCESSING_FTP_PATH='project':

If the task is called with the following argument: (180, './foo/dataset_180.nc')

  • the file will be copied to ftp.domain.com:/ftp_root/project/foo/dataset_180.nc.
  • the task will return the following tuple: (180, ftp://ftp.domain.com/project/foo/dataset_180.nc).
crop()

Crops a dataset file to the given bounding box.

Example:

geospaas_processing.tasks.core.crop.delay((180, ('foo.nc', 'bar.nc')), bounding_box=[0, 20, 20, 0])

tasks.idf

Tasks which deal with converting dataset files to IDF format.

convert_to_idf()

Converts a dataset to the IDF format for usage in Oceandatalab's SEAScope.

# Asynchronously convert the dataset whose ID is 180
geospaas_processing.tasks.idf.convert_to_idf.delay((180, './dataset_180.nc'))

tasks.syntool

Tasks which deal with converting dataset files to Syntool format.

check_ingested()

Checks that the dataset does not have saved processing results in the database. If there are existing results, stop the current tasks chain, otherwise just pass along the arguments.

Example:

geospaas_processing.tasks.syntool.check_ingested.delay((180, './dataset_180.nc'))
convert()

Convert a dataset's files to a format displayable in Syntool.

Example:

geospaas_processing.tasks.syntool.convert.delay((180, './dataset_180.nc'))
db_insert()

Insert converted files in a Syntool database to make them accessible through a Syntool portal.

Example:

geospaas_processing.tasks.syntool.db_insert.delay((180, './dataset_180.nc'))
cleanup_ingested()

Remove all ingested datasets files older than a certain date.

Example:

geospaas_processing.tasks.syntool.cleanup_ingested.delay('2022-03-04')

tasks.harvesting

Tasks dealing with harvesting metadata. Requires the GEOSPAAS_PROCESSING_HARVEST_CONFIG environment variable to contain the path to the harvesting configuration file.

start_harvest()

Start the harvesting process using a dictionary which contains the search configuration.

Example:

geospaas_processing.tasks.harvesting.start_harvest.delay({
    'common': {'start_time': '2022-08-01', 'end_time': '2022-08-02'},
    'searches': [{'provider_name': 'creodias', 'collection': 'Sentinel3'}]
})
save_search_results()

Start the ingestion process from a SearchResults object. Used in start_harvest(), there should not be any reason to use it directly.

update_vocabularies()

Update the vocabularies according to the harvesting configuration.

Example:

geospaas_processing.tasks.harvesting.update_vocabularies.delay()
retry_ingestion()

Retries failed ingestions which have been dumped during a previous harvesting run.

Example:

geospaas_processing.tasks.harvesting.retry_ingestion.delay()

django-geo-spaas-processing's People

Contributors

akorosov avatar aperrin66 avatar azamifard avatar

Stargazers

 avatar

Watchers

 avatar  avatar  avatar  avatar

Forkers

aperrin66

django-geo-spaas-processing's Issues

Integrate new converter configurations

The following conversion files and their potential auxiliary files must be added.
The bold ones require auxiliary files.

  • cmems_001_024_hourly_mean_surface
  • cmems_001_024_hourly_smoc
  • cmems_008_046
  • cmems_013_048_drifter_0m
  • cmems_013_048_drifter_15m
  • cmems_013_048_radar_total
  • esa_cci_sst
  • cmems_015_003_0m
  • cmems_015_003_15m
  • ghrsst_l2p_modis_a_day
  • ghrsst_l2p_modis_a_night
  • ghrsst_l2p_viirs_jpl_sst
  • ghrsst_l2p_viirs_navo_sst
  • ghrsst_l2p_viirs_ospo_sst
  • ghrsst_l3c_avhrr_metop_b_sst
  • ghrsst_l3c_goes16_sst
  • ghrsst_l3c_goes17_sst
  • ghrsst_l3c_seviri_atlantic_sst
  • ghrsst_l3c_seviri_indian_sst
  • ghrsst_l3u_amsr2_sst
  • ghrsst_l3u_gmi_sst
  • hycom_osu
  • ibi_hourly_mean_surface
  • mfs_med00-cmcc-cur
  • mfs_med00-cmcc-ssh
  • mfs_med00-cmcc-temp
  • rtofs_diagnostic
  • rtofs_prognostic
  • sentinel1_l2_rvl
  • sentinel3_olci_chl
  • sentinel3_olci_l1_efr
  • sentinel3_slstr_sst

missing address

In the case of downloaded files from an external process into the folder that the downloader are downloading, it should add the address of that previously downloaded file to the dataset_url field of the database

Add support for cropping downloaded datasets

Allow to select a subset of a dataset before further operations, like conversion to IDF.
Not all types of datasets will be supported, at least not at first.
Priorities:

  • GOES
  • SEVIRI

Wrong time condition in copying CLI

The query built there restrains the results to the datasets whose time coverage is fully included within the searched time span.

We should also include the datasets whose time coverage is intersecting the searched time span.

Error 403 when downloading from Scihub

When a download is launched shortly after another one just finished, we receive a 403 error with the following message:

An exception occured while creating a stream: Maximum number of 2 concurrent flows achieved by the user "........"

Maybe the connection is not correctly closed in some cases?

Add support for OAuth2 authentication to the HTTP downloader

The HTTP downloader should support OAuth2 authentication.
The type of authentication can be set in the provider_settings.yml file.

Here is a chunk of code which downloads a file from Creodias with OAuth2 authentication:

import requests
from oauthlib.oauth2 import LegacyApplicationClient
from requests_oauthlib import OAuth2Session, OAuth2

url = 'https://zipper.creodias.eu/download/e25d0589-c0af-53c1-8ec1-2995bbcb64c6'
token_url = 'https://auth.creodias.eu/auth/realms/DIAS/protocol/openid-connect/token'

client_id = 'CLOUDFERRO_PUBLIC'
username = 'user'
password = 'passwd'
client = LegacyApplicationClient(client_id=client_id)
token = OAuth2Session(client=client).fetch_token(
    token_url=token_url,
    username=username,
    password=password,
    client_id=client_id,
)
oauth2 = OAuth2(client_id=client_id, client=client, token=token)

r = requests.get(url, auth=oauth2, stream=True)

with open('/tmp/test_creodias.zip', 'bw') as f:
    for chunk in r.iter_content(chunk_size=1024 * 1024):
        f.write(chunk)

r.close()

The default value of filename should be returned by the server either from header, or from url instead of an empty one

Downloaded dataset should be named based on its entry_id for all cases.

For copernicus harvested file the header of the response of the download link provide the associated part named 'filename=' for find the name of the dataset based on it. But for the osisaf and other future download links, the header of the response whould be something like:
image

So no information can be gained about the filename from such a header. Thus, this line


should be corrected for solving this issue.

Integrate synchronization to FTP into the processing

Right now, output files are synchronized to the FTP server by an external service, so there is a delay between the tasks completion and the availability of the file on the FTP server.

It would be better to integrate this step in the relevant tasks, so that the available as soon as the task completes.

This would probably make #6 irrelevant as well.

Add new IDF conversion configuration files

The following IDF configuration files must be integrated:

cmems_001_024_hourly_mean_surface.idf
cmems_001_024_hourly_smoc.idf
cmems_008_046.idf
cmems_015_003_0m.idf
cmems_015_003_15m.idf
esa_cci_sst.idf
ghrsst_l2p_modis_a_jpl_day.idf
ghrsst_l2p_modis_a_jpl_night.idf
ghrsst_l2p_viirs_jpl.idf
ghrsst_l2p_viirs_navo.idf
ghrsst_l2p_viirs_ospo.idf
sentinel1_l2_rvl.idf

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.