Giter Club home page Giter Club logo

mongres's Introduction

Mongres ETL System for Node.js

Synchronize PostgreSQL and MongoDB using Node.js

Mongres uses configurable modules containing database and operation definitions to perform arbitrary ETL (extract, transform, load) procedures. This project is still in development and so core fundamentals could be changed without notice. Contributions, comments, and issue reports are always welcome.

Installation

npm install mongres

Command Line Usage

Usage: node mongres.js [options] <file1 [file2 ...]>

  -h, --help           Display usage help
  -v, --version        Display version number
  -d, --debug          Enable debug mode
  -q, --quiet          Disable verbose output
  -p, --period         Specify periodic execution ( in sec )
  -f, --file           Specify module file to load ( default )

Database Definitions

Databases can be defined and used almost interchangably with very minor differences. The database abstraction layer provides the following methods:

  • queryArray (query, [options], callback)
    • Execute query using options if given and pass the result set as an array to callback function.
    • query format for MongoDB is {collectionName: {field: value}}
    • query format for PostgreSQL is {text: 'select foo from bar where foo between $1 and $2', values: [1,10]}
    • callback signature is function (error, array)
    • Caution: all records will be loaded into memory. Do not use for very large data sets.
  • queryStream (query, [options], callback)
    • Execute query using options if given and pass a readable result stream to the callback function.
    • query format for MongoDB is {collectionName: {field: value}}
    • query format for PostgreSQL is {text: 'select foo from bar where foo between $1 and $2', values: [1,10]}
    • callback signature is function (error, stream)
  • insert (insert, [options], callback)
    • Inserts insert into database using options if given.
    • insert format is {collectionOrTableName: {field: value}}
    • callback signature is function (error, result) where result is the number of affected records.
  • update (query, update, [options], callback)
    • Applies update to records matching query using options if given.
    • query format is {collectionOrTableName: {field: value}}
    • update format is {collectionOrTableName: {field: value}}
    • callback signature is function (error, result) where result is the number of affected records.
  • upsert (query, update, [options], callback)
    • Applies update to records matching query using options if given.
    • A new record will be inserted if no matching records are found.
    • query format is {collectionOrTableName: {field: value}}
    • update format is {collectionOrTableName: {field: value}}
    • callback signature is function (error, result) where result is the number of affected records.

Operation Definitions

The db parameter will be a Database instance configured for the specified database. The registry parameter is a storage object that is shared among all functions for the entire operation. The operation functions are executed in this order:

  • init (db, registry, callback) (optional)
    • Executed once and must call callback method to proceed with operation.
    • Useful for getting delta starting points or populating initial registry values.
  • extract (db, registry, process, done)
    • Executed once, used to extract data from source database.
    • process method should be called once for each record
    • done method should be called after all records have been processed
  • transform (db, registry, data) (optional)
    • Executed once for each record emitted by extract functions.
    • Used to reshape data before insertion into target database.
    • Called synchronously, so just return the transformed data.
  • load (db, registry, data, callback)
    • Executed once for each record emitted by extract and transform functions.
    • Can be used to populate registry with aggregated or incremental data. interval (db, registry, callback) (optional)
    • Executed at regular intervals after load function
    • Useful for recording incremental progress in case of failures.
  • exit (db, registry, callback) (optional)
    • Executed once after all other functions have finished.
    • Useful for cleaning up after operation, and for recording summary data.

Modules

A Mongres module must export a JSON object like this:

module.exports = {

  db: {

    mongo: {
      type: 'mongodb',
      name: 'test',
      host: 'localhost',
      port: 27017,
      user: 'username',
      pass: 'password'
    },

    postgres: {
      type: 'postgresql',
      name: 'tnt2',
      host: 'localhost',
      port: 5432,
      user: 'username',
      pass: 'password'
    }

  },

  op: {

    name: 'Sample Operation',

    init: { // init is optional
      // read data from "mongo" database into registry
      mongo: [
        function (db, registry, cb) {
          // truncate the collection
          db.remove(
            { // query
              test: {} // matches all documents
            },
            cb // callback
          );
        },

        function (db, registry, cb) {
          db.queryArray(
            { // query
              mongres: {
                _id: "test"
              }
            },
            { // options
              limit: 1,
              fields: {
                _id: 0,
                lastDate: 1
              }
            },
            function (err, docs) { // callback
              if (err) return cb(err);

              var doc = docs && docs.shift() || {};
              registry.lastDate = doc.lastDate || new Date(0);

              return cb(); // continue to "extract" step
            }
          );
        }
      ]
    },

    extract: { // extract is required
      // stream data from "postgres" database to load function(s)
      postgres: function (db, registry, load, cb) {
        db.queryStream(
          { // query
            text: "                                                            \
              SELECT                                                           \
                GENERATE_SERIES($1::int, $2::int) AS series, $3::timestamp +   \
                (RANDOM()::NUMERIC(3,2) || ' days')::INTERVAL AS date,         \
                ARRAY['a','b','c'] AS arr,'{\"a\":{\"b\":\"c\"}}'::json AS obj \
              ORDER BY date                                                    \
            ",
            values: [1, 1000, registry.lastDate]
          },
          // options parameter is not required
          function (err, stream) { // callback
            if (err) return cb(err);

            var error = null, procs = 0, limit = 10;

            // call "load" method for each record
            stream.on('data', function (data) {
              procs++; // increment process counter

              // pause stream if procs over limit
              if (procs >= limit) stream.pause();

              load(data, function (err) {
                procs--; // decrement process counter

                if (err) { // pass the error
                  stream.emit('error', err);
                  return stream.emit('end');
                }

                // resume stream when procs are under limit
                if (!err && procs < limit) stream.resume();
              });
            });

            // record errors for 'end' event
            stream.on('error', function (err) {
              error = err;
            });

            // execute callback function when ended
            return stream.on('end', cb.bind(this, error));
          }
        );
      }
    },

    transform: { // transform is optional
      // transform data from "postgres" database into a different format
      postgres: function (db, registry, data) {
        return { // transformed data
          _id: data.series,
          date: data.date,
          arr: data.arr,
          obj: data.obj
        };
      }
    },

    load: { // load is required
      // load data from all sources into "mongo" database
      mongo: function (db, registry, data, cb) {
        if (!data) return cb('No data was given.');

        db.upsert(
          { // query
            test: {
              _id: data._id
            }
          },
          { // update
            test: data
          },
          { // options
            w: 1,
            journal: true
          },
          function (err, result) { // callback
            if (err) return cb(err);

            // determine most recent changed date
            if (!registry.lastDate || registry.lastDate < data.date) {
              registry.lastDate = data.date;
            }

            // proceed to next load function(s) or exit
            return cb(null, result);
          }
        );
      }
    },

    interval: { // intervals are optional
      100: { // execute every 100 records
        // write progress data to "mongo" database
        mongo: function (db, registry, cb) {
          db.upsert(
            { // query
              mongres: {
                _id: 'test'
              }
            },
            { // update
              mongres: {
                $set: {
                  lastDate: registry.lastDate
                }
              }
            },
            { // options
              w: 1,
              journal: true
            },
            cb // callback
          );
        }
      }
    },

    exit: { // exit is optional
      // write result data to "mongo" database
      mongo: function (db, registry, cb) {
        db.upsert(
          { // query
            mongres: {
              _id: 'test'
            }
          },
          { // update
            mongres: {
              $set: {
                lastDate: registry.lastDate
              }
            }
          },
          { // options
            w: 1,
            journal: true
          },
          cb // callback
        );
      }
    }

  }

};

Contributing

Contributions, comments, and issue reports are always welcome. Send pull requests to the master branch with your proposed changes or create issues to report bugs.

Testing

Copy tests\db.sample.js to tests\db.js and modify the credentials to match your development environment. You will need one MongoDB and one PostgreSQL database set up and running to complete the tests. To start tests, run npm test in your Mongres directory.

mongres's People

Contributors

benighted avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

mongres's Issues

Ignore dot files / directories when discovering modules

Modify the module discovery code (dir search) to ignore files or sub dirs beginning with a dot (.) like .git or similar. This will allow for cloning a gist or other repo into the syncs folder and maintaining the connection to origin.

Replace Easy Mode

Restore the simple json-based config style as an optional alternative to the fully configurable function-based configs I have been using since 0.2.0.

Not all features will be directly transferable:

  • registry interaction will be greatly reduced, similar to the interpolation feature of the old <0.2 style
  • a method of tracking a delta field value will be needed (ie, finding most recent change date or uid)

Standardize Database Interface

Not all features are implemented (some update methods for postgres), and the query style is different between postgres and mongo. Fully implement the remaining methods, and come up with a standardized style for issuing queries and updates to either type of database. I believe the mongo style of updates will be the most flexible, so it will probably be best to translate that style into postgres-style commands.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.