Giter Club home page Giter Club logo

node-datapumps's People

Contributors

hendricha avatar neveri avatar novaki avatar skumarnk2 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

node-datapumps's Issues

Does the RestMixin handle responses?

I have the following sample code which uses the postJson function. Since this is using Restler, I am wiring up to the events from Restler. However, it appears the events are swallowed, since there appears to be no way to get to the response data. I have also tried this with the post function as well. This is required to get response information for a latter operation.

data_pump
.mixin(datapumps.mixin.RestMixin)
.process(function (data) {

        var test_record = <some json>
        var test_record_json = JSON.stringify(test_record);

        this.postJson('http://qa71-app.mia.ucloud.int/ucc/api/candidates', {

            multipart: false,

            headers: { 'Content-Type' : 'application/json',
 'X-ApiKey' : 'xxxxxx'
 },

            data: test_record_json
        }).on('complete', function(data, response) {
            console.log(response.statusCode);
        }).on('error', function(err, response){
            console.log(response.statusCode)
        }).on('fail', function(err, response){
            console.log(response.statusCode)
        }).on('success', function(err, response){
            console.log(response.statusCode)
        }).on('2XX', function(err, response){
            console.log(response.statusCode)
        }).on('3XX', function(err, response){
            console.log(response.statusCode)
        }).on('4XX', function(err, response){
            console.log(response.statusCode)
        }).on('5XX', function(err, response){
            console.log(response.statusCode)
        }).on('end', function(err, response){
            console.log(response.statusCode)
        });
    })
    .start();

Unable to get into last then after whenFinished

We are unable to get into the last then after the whenfinished() with the following example

var datapump = require('datapumps');

var comic_data_pump = new datapump.Pump();

console.log("reading data into pump")
//Extract
comic_data_pump
.mixin(datapump.mixin.RestMixin)
.from(comic_data_pump.createBuffer({"size": "100"}))
.get('http://www.comicvine.com/api/volumes/', {
"multipart": false,
"query": {
"api_key": "xxxxxxxxx",
"format": "json",
"filter":"name:spider-man",
"field_list": "id,name,start_year,description,publisher,image"
}
})
.then(function(volumes) {
// here you have the response from the rest service.
// Put it into the input buffer with comic_data_pump.from().write(...)

    volumes.results.forEach (function(volume){
        //push/write item into the pump
        comic_data_pump.from().write(volume);
    })
    // finally, seal the input buffer

    //comic_data_pump.from().seal();
})
.catch(function() {
    console.log("failed with errors");
});;

console.log('processing volumes')
//Transformation
comic_data_pump
.process(function(volume) {
// you should have a comic object here.
// console.log(this.buffer());

    console.log(volume.name);
})
.start()
.whenFinished(function(){console.log("This is finished")})
    .then(function() {
        console.log('get here');
        console.log('this is the end state of the buffer', comic_data_pump.buffer());

        if (!comic_data_pump.errorBuffer().isEmpty()) {
            console.log(comic_data_pump.errorBuffer().getContent());
        }
    })
    .catch(function() {
        console.log("Pump group failed with errors");
        // errors list will be at group.errorBuffer().getContent()
    });

Default functionality / Run

Based on the documentation:
"A pump reads data from its input buffer or stream and copies it to the output buffer by default:"
If I use the following example:
var datapumps = require('datapumps');
var mssql = require('mssql');
Promise = require('bluebird');

var config = {
user: 'user',
password: 'pw',
server: 'server', // You can use 'localhost\instance' to connect to named instance
database: 'db',
stream: true, // You can enable streaming globally
requestTimeout: 60000,
options: {
encrypt: false // Use this if you're on Windows Azure
}
}

var connection = new mssql.Connection(config, function (err) {

if (err) {
    console.log(err);
    return;
}

var request = connection.request();

//Only executed if not streaming i.e streaming = false
request.query('some query', function (err, recordset) {
    // ... error checks
    if (err) {
        console.log(err);
    }
});

//All Event Emitters are used when streaming i.e streaming = true
request.on('recordset', function (columns) {
    // Emitted once for each recordset in a query
});

request.on('row', function (row) {
    // Emitted for each row
    extract_input_buffer.writeAsync(row);

    console.log(extract_input_buffer.content.length);
});

request.on('error', function (err) {
    // May be emitted multiple times
});

request.on('done', function (returnValue) {
    // Always emitted as the last one
    connection.close();
    extract_input_buffer.seal();

});

});

var extract_pump = new datapumps.Pump();

var extract_input_buffer = new datapumps.Buffer({size: 100});

var extract_output_buffer = extract_pump.buffer();

extract_pump.errorBuffer().on('write', function (data) {
console.log(data);
});

extract_pump.buffer('output').on('write', function (data) {
console.log(extract_output_buffer.content.length);
});

extract_pump.buffer('output').on('end', function (data) {
console.log("output buffer has ended")
});

var transform_pump = new datapumps.Pump();

extract_pump
.from(extract_input_buffer)
.to(transform_pump, 'output')
.logErrorsToConsole()
.run()
.then(function(){

    console.log(extract_output_buffer);
});

transform_pump
.logErrorsToConsole()
.run()
.then(function () {
var from1buffer = extract_pump.from();
var output1buffer = extract_pump.buffer('output');
var from2buffer = transform_pump.from();
var output2buffer = transform_pump.buffer('output');
console.log(from1buffer.content.length);
console.log(output1buffer.content.length);
console.log(from2buffer.content.length);
console.log(output2buffer.content.length);
});

I see the extract_output_buffer written to, in line 66, however it is not being picked up in the transform_pump. Based on line 81 I set the transform_pump input buffer to the output buffer of extract_pump.

What am I am missing? Can you clarify?

Copying data between two mongo servers

I need to copy and transform data from one mongo server to another, but not sure the best approach. It seems possible if I use two pumps as follows

sourcePump
  .mixin(MongodbMixin('mongodb://server1/books'))
  .useCollection('authors')
  .from(sourcePump.find())
  .process(function(data) {
    return destinationPump.insert(data)
  })
  .logErrorsToConsole()

destinationPump
  .mixin(MongodbMixin('mongodb://server2/books'))
  .useCollection('authors')
  .logErrorsToConsole()

sourcePump.run()
    .then(function() {
      console.log("Done");
    });

Is there a better way?

Read from 2 collections and write

Hello,

Can I read data from 2 collections and write in an another?

var
    clientPump = new Pump(),
    previousDiagnosisPump = new Pump(),
    oncologyOutputPump = new Pump();

clientPump
    .mixin(MongodbMixin(mongoInputURL))
    .useCollection('client')
    .from(clientPump.find()); // ??? is this way?

previousDiagnosisPump
    .mixin(MongodbMixin(mongoInputURL))
    .useCollection('previousDiagnosis')
    .from(previousDiagnosisPump.find()); // ??? is this way?

oncologyOutputPump
    .mixin(MongodbMixin(mongoOutputURL))
    .useCollection('oncology')
    .from(...) // HOW TO USE TWO COLLECTIONS
    .process(function (client, previousDiagnosis) { // ???
        return oncologyOutputPump.insert(...);
    })
    .logErrorsToConsole();

Promise.all([
    clientPump.run(),
    previousDiagnosisPump.run(),
    oncologyOutputPump.run()
])
    .then(function () {
        console.log("Done");
    });

Is possible use a pump to did a findOne inner a process method?

oncologyOutputPump
    .mixin(MongodbMixin(mongoOutputURL))
    .useCollection('oncology')
    .from(oncologyPump.buffer())
    .process(function (data) {
        var client = clientPump.findOne({_id:data.client}); // is possible??
        return oncologyOutputPump.insert(data);
    })
    .logErrorsToConsole();

Getting "Cannot change source buffer after pumping has been started" all of a sudden

Chron jobs that I've been running regularly with datapumps seem to be failing all of a sudden. I don't believe I've changed any related code.

Any ideas or tips on troubleshooting?

"datapumps": "~0.4.4"
npm -v 3.8.6

Also, I'm using the MongoDB mixin

Here's the full error logged to console:

Error in pump (root): MongoError: pool destroyed
/app/node_modules/datapumps/lib/Pump.js:57
        throw new Error('Cannot change source buffer after pumping has been started');
                                               ^

Error: Cannot write sealed buffer
    at Buffer.write (/app/node_modules/datapumps/lib/Buffer.js:46:15)
    at null.<anonymous> (/app/node_modules/datapumps/lib/Pump.js:69:32)
    at emitOne (events.js:90:13)
    at emit (events.js:182:7)
    at Readable.read (_stream_readable.js:368:10)
    at flow (_stream_readable.js:751:26)
    at resume_ (_stream_readable.js:731:3)
    at _combinedTickCallback (internal/process/next_tick.js:74:11)
    at process._tickCallback (internal/process/next_tick.js:98:9)

CsvWriter Mixin "TypeError: May not write null values to stream"

I've been attempting to use the CsvWriter mixin to write a stream of Objects to a csv.

Upon writing the final record to the csv, the CsvWriter errors out (as described below).

TypeError: May not write null values to stream
    at validChunk (_stream_writable.js:184:10)
    at CsvTransformStream.Writable.write (_stream_writable.js:218:12)
    at Pump.<anonymous> (/Users/..../.../.../node_modules/datapumps/lib/mixin/CsvWriterMixin.js:27:35)
    at emitNone (events.js:91:20)
    at Pump.emit (events.js:185:7)
    at Pump.module.exports.Pump._outputBufferEnded (/Users/.../.../.../node_modules/datapumps/lib/Pump.js:233:19)
    at emitNone (events.js:86:13)
    at Buffer.emit (events.js:185:7)
    at Buffer.seal (/Users/.../.../.../node_modules/datapumps/lib/Buffer.js:149:14)
    at Pump.module.exports.Pump.sealOutputBuffers (/Users/.../.../.../node_modules/datapumps/lib/Pump.js:273:32)
    at Pump.module.exports.Pump._pump (/Users/.../.../.../node_modules

Here is the CsvWriter mixin code (as pulled from node_modules):

(function() {
  var CsvWriterMixin, Promise, csv, fs;

  csv = require('fast-csv');

  fs = require('fs');

  Promise = require('bluebird');

  CsvWriterMixin = function(options) {
    if (!(options != null ? options.path : void 0)) {
      throw new Error('path option is required.');
    }
    return function(target) {
      target.writeRow = function(row) {
        return target._csv.writer.writeAsync(row);
      };
      target._csv = options;
      target._csv.writer = Promise.promisifyAll(csv.createWriteStream());
      target._csv.writer.pipe(fs.createWriteStream(target._csv.path, {
        encoding: 'utf8'
      }));
      if (target._csv.headers != null) {
        target.writeRow(target._csv.headers);
      }
      return target.on('end', function() {
        return target._csv.writer.write(null);
      });
    };
  };

  module.exports = CsvWriterMixin;

}).call(this);

I've resolved the issue locally by updating this line of code:

      return target.on('end', function() {
        return target._csv.writer.write(null);
      });

to

      return target.on('end', function() {
        return target._csv.writer.end();
      });

Output buffer

So in the following example, I am trying to take pumpOne's output buffer and make it the input buffer (from()) for pumpTwo. I see the writes going into the output buffer, however,the data does not make it to the second pump. Also, I get a promise error but I am returning a promise from the same library you are using Perhaps you can clarify why:

var datapumps = require('datapumps');
var mssql = require('mssql');
Promise = require('bluebird');

var config = {
    user: 'abc123',
    password: 'abc123',
    server: 'server', // You can use 'localhost\\instance' to connect to named instance
    database: 'db',
    stream: true, // You can enable streaming globally
    requestTimeout: 60000,
    options: {
        encrypt: false // Use this if you're on Windows Azure
    }
}

var connection = new mssql.Connection(config, function (err) {

    if (err) {
        console.log(err);
        return;
    }

    var request = connection.request();

    request.query('select top 10 column from table where othercolumn = 0', function (err, recordset) {
        // ... error checks
        if (err) {
            console.log(err);
        }

        //Only executed if not streaming i.e streaming = false
        //console.log(recordset);

    });

    //All Event Emitters are used when streaming i.e streaming = true
    request.on('recordset', function (columns) {
        // Emitted once for each recordset in a query
    });

    request.on('row', function (row) {
        // Emitted for each row
        extract_input_buffer.writeAsync(row);

        console.log(extract_input_buffer.content.length);
    });

    request.on('error', function (err) {
        // May be emitted multiple times
    });

    request.on('done', function (returnValue) {
        // Always emitted as the last one
        connection.close();
        extract_input_buffer.seal();

    });

});

var extract_pump = new datapumps.Pump();

var extract_input_buffer = new datapumps.Buffer({size: 100});

var extract_output_buffer = extract_pump.buffer();

extract_pump.errorBuffer().on('write', function (data) {
    console.log(data);
});

extract_pump.buffer('output').on('write', function (data) {
    console.log(extract_output_buffer.content.length);
});

extract_pump.buffer('output').on('end', function (data) {
    console.log("output buffer has ended")
});

var transform_pump = new datapumps.Pump();

extract_pump
    .from(extract_input_buffer)
    .to(transform_pump, 'output')
    .logErrorsToConsole()
    .process(function (data) {
        console.log(extract_input_buffer.content.length);
        return new Promise(function (resolve, reject) {
            resolve(data);
        });
    })
    .start()
    .whenFinished(function () {

        console.log(extract_output_buffer);
        extract_output_buffer.seal();
    });
/*
 .run()
 .then(function(){

 console.log(extract_output_buffer);
 extract_output_buffer.seal();
 });
 */

transform_pump
    .logErrorsToConsole()
    .run()
    .then(function () {
        var from1buffer = extract_pump.from();
        var output1buffer = extract_pump.buffer('output');
        var from2buffer = transform_pump.from();
        var output2buffer = transform_pump.buffer('output');
        console.log(from1buffer.content.length);
        console.log(output1buffer.content.length);
        console.log(from2buffer.content.length);
        console.log(output2buffer.content.length);
    });

PostgresqlMixin Process not called

I have the following in my main javascript file but it seems that the process is never called .

var dpumps =require('datapumps');
Pump = dpumps.Pump;
PostgresqlMixin= dpumps.mixin.PostgresqlMixin;
QueryStream = require('pg-query-stream');
Client = require('pg').Client;

var con = {
host: 'localhost',
port: 5432,
database: 'mydatabase',
user: 'myuser',
password: 'mypass'
};

postgresqlClient = new Client(con) ;

exports.pump = function () {
console.log("Ready To Pump"); //This logs so i know the function is called.
postgresqlCopy = new Pump()
.from(postgresqlClient.query(new QueryStream("SELECT $1~, $2~, $3~ FROM $4~ LIMIT 10",["RowId", "Company", "ArticleNo", "Article"])))
.mixin(PostgresqlMixin(postgresqlClient))
.process(function(data) {
console.log("We are in the Process");
return postgresqlCopy.log(data) ;
})
.logErrorsToConsole()
.run()
.then(function() {
console.log("Done");
console.log(postgresqlClient)
});

};

Hi I started learning node.js three days ago and i have been trying to use datapumps. So far I am running node js with Express and the problem is that my process function never runs. I see this console.log("Ready To Pump"); in the logs so I am able to tell that the function is called. However it seems like the process hangs. (not sure). The process function is not called because console.log("We are in the Process"); part of the code does not show in the logs. Where have i gone wrong here. I also know my connection string works because i have onother function that uses the same query and logs the data to the console. I intend to transfer data from one postgre database to another. And certainly i cant do that if my process function is never called. I do not get any errors in the console either. I believe it hangs because console.log("Done"); does not show either

Output buffer does not extend nor clears, records above 10

We are experiencing a problem when using records above 10, for the input buffer, from(), we had to extend the size to a large number (2000 in our example). However, we noticed that the when the output buffer reaches 10, the system stops. We assumed the output buffer would pop items form its container but it is not doing this. Below is an example of the node module we have been using.
You can see we are reading from a database and attempting to write to the Restful endpoints. We also notice this never gets to the WhenFinished() promise.

module.exports = (function(rec14AuthToken) {
var fs = require('fs');
var path = require('path');
var datapumps = require('datapumps');
var mssql = require('mssql');
var config = require('../../config');
var candidateSourceSQL = fs.readFileSync(path.join(__dirname, 'candidateSourceQuery.sql'), 'utf8');
var rec14APIUrlCandidates = config.rec14.baseUrl + '/' + config.rec14.tenant + '/api/candidates/';

var sqlConfig = {
    user: config.rec10.dbServerUserName,
    password: config.rec10.dbServerPassword,
    server: config.rec10.dbServer,
    database: config.rec10.db,
    requestTimeout: config.rec10.dbServerCommandTimeout,
    options: {
        encrypt: true
    }
};

var rows = 0;

var connection = new mssql.Connection(sqlConfig, function (err) {
    if (err) {
        console.log(err);
        return;
    }

    var request = connection.request();
    request.stream = true;

    request.query(candidateSourceSQL, function (err, recordSet) {
        if (err) {
            console.log(err);
            return;
        }

        console.log(recordSet);
    });

    request.on('row', function(row) {
        console.log(row);
        data_pump.from().writeAsync(row);
        rows++;
    });

    request.on('recordset', function(columns) {
        // Emitted once for each recordset in a query
        console.log(columns);

    });

    request.on('done', function (err, returnValue) {
        connection.close();
        console.log(rows);
        data_pump.from().seal();
    });

});
// Setup the data pump
var data_pump = new datapumps.Pump().from(new datapumps.Buffer({size: 2000}));

//Process the Post operation
data_pump
    .mixin(datapumps.mixin.RestMixin)
    .process(function (data) {

        var record = '{ '+
            '"password": "some password", ' +
            '"is_internal": false, ' +
            '"is_active": false, ' +
            '"is_willing_to_relocate": true, ' +
            '"name": { ' +
            '"first": "' + data.FirstName + '" ' +
            (data.MiddleInitial ? ',"middle": "' + data.MiddleIntial + '" ' : '') +
            ',"last": "' + data.LastName + '" ' +
                //(data)
                //  ',"prefix": { ' +
                //      "id": "d5b19770-00a8-43b8-a252-af24b416d21c"
                //  }
            '}, '+
            '"contact_info": { ' +
            '"email": "' + data.Email + '", ' +
            '"phone": { ' +
            '"primary": "' + data.PrimaryPhone + '"' +
            (data.SecondaryPhone ? ',"secondary": "' + data.SecondaryPhone + '" ' : '') +

            '},' +
            '"address": { ' +
            '"line1": "' + data.Address1 + '", ' +
            (data.Address2 ? '"line2": "' + data.Address2 + '",' : '') +

            '"city": "' + data.City + '", '+
            '"state": { '+
            '"code": "' + data.State+ '" '+
            '}, ' +
            '"postal_code": "' + data.Zip+ '",' +
            '"country": { ' +
            '"code": "USA" ' +
            '}'+
            '}'+
            '}'+
            '}';

        var test_record_json = JSON.parse(record);

        return this.postJson(rec14APIUrlCandidates, test_record_json, {
                headers: {
                    'Authorization': 'Bearer ' + rec14AuthToken,
                    'Accept': 'application/vnd.xxxx+json; version=2',
                    'Content-Type': 'application/json'
                },
                rejectUnauthorized: false,
                followRedirects: true
        }).then(function(response) {
                console.log(response.result);
                console.log(response.statusCode);

                return this.copy(response);
            }.bind(this));

    }).logErrorsToConsole().on('end', function() {
    console.log('Pumped everything, and all my output buffers are empty. Bye.');
    }).start().whenFinished().then(function(){
        console.log('finished!!!!');
    });

});

Best Practice

As I go through the documentation and examples, I wanted to ask a question.
Is there any benefit of one over the other?

For the ETL process, is it better to have the data use a single pump? (in other word, a record going through the ETL process with a single pump and buffer)

for example:
//Extract (rest mixin)
pump.get(...)
.then(write to input buffer)

//Transform
pump
.process

//Load (mongo mixin)
pump
.insert

Or is it better to use different pumps (one for extract, one for transformations, one for loading) and use the data with buffer that go through the different pumps?

for example:
//Extract (rest mixin)
pump1.get(...)
.then(write to input buffer)

//Transform
pump2
.from(output buffer of pump1)
.process (write to output buffer for pump2)

//Load (mongo mixin)
pump3
.from(output buffer of pump2)
.insert

Or does it even matter?

Unable to get into the final then after whenFinished

We are unable to get into the last then after the whenfinished() with the following example

var datapump = require('datapumps');

var comic_data_pump = new datapump.Pump();

console.log("reading data into pump")
//Extract
comic_data_pump
.mixin(datapump.mixin.RestMixin)
.from(comic_data_pump.createBuffer({"size": "100"}))
.get('http://www.comicvine.com/api/volumes/', {
"multipart": false,
"query": {
"api_key": "xxxxxxxxxx",
"format": "json",
"filter":"name:spider-man",
"field_list": "id,name,start_year,description,publisher,image"
}
})
.then(function(volumes) {
// here you have the response from the rest service.
// Put it into the input buffer with comic_data_pump.from().write(...)

    volumes.results.forEach (function(volume){
        //push/write item into the pump
        comic_data_pump.from().write(volume);
    })
    // finally, seal the input buffer

    //comic_data_pump.from().seal();
})
.catch(function() {
    console.log("failed with errors");
});;

console.log('processing volumes')
//Transformation
comic_data_pump
.process(function(volume) {
// you should have a comic object here.
// console.log(this.buffer());

    console.log(volume.name);
})
.start()
.whenFinished(function(){console.log("This is finished")})
    .then(function() {
        console.log('get here');
        console.log('this is the end state of the buffer', comic_data_pump.buffer());

        if (!comic_data_pump.errorBuffer().isEmpty()) {
            console.log(comic_data_pump.errorBuffer().getContent());
        }
    })
    .catch(function() {
        console.log("Pump group failed with errors");
        // errors list will be at group.errorBuffer().getContent()
    });

MongoDB mixin: add `aggregate` to available methods

Hello there!

It would be super-great to be able to aggregate using the pump, sample:

pump
  .mixin(MongodbMixin('mongodb://localhost/myDB'))
  .useCollection('myCollection')
  .from(pump.aggregate([{ $group: { _id: '$geo.country', sum: { $sum: 1} }}, {$sort: { 'sum': -1}}]))

I don't know coffee, would it be hard to implement?

Can't access failures log when is not a pump problem

Hi, I have this scenario in which I use RestMixin to POST some data to an API.
From datapumps point of view, the POST is ok whatever the HTTP response status code is. I'd like to have the ability to intercept the response and log an error if something different than 200 or 201 happens.

I cannot understand how to achieve this with datapumps, if it would be possible to update the README close to where is explained how to do this:

.logErrorsToConsole()
.run()
.then(function() {
console.log("Done writing contacts to file");
});

that is good but not enough for my purpose. Thanks.

Programmatically increase Buffer Size

When loading a lot of data, we noticed that the pumps only handle a small amount of data at a time.
Here is small debug output of a recent run.

2015-08-25T16:40:48.774Z [(root)] input: 373706 items, 0 in, 4 out | output: 0 items, 0 in, 0 out |
2015-08-25T16:40:49.775Z [(root)] input: 373704 items, 0 in, 2 out | output: 0 items, 0 in, 0 out |
2015-08-25T16:40:50.778Z [(root)] input: 373701 items, 0 in, 3 out | output: 0 items, 0 in, 0 out |
2015-08-25T16:40:51.778Z [(root)] input: 373699 items, 0 in, 2 out | output: 0 items, 0 in, 0 out |
2015-08-25T16:40:52.778Z [(root)] input: 373696 items, 0 in, 3 out | output: 0 items, 0 in, 0 out |
2015-08-25T16:40:53.779Z [(root)] input: 373693 items, 0 in, 3 out | output: 0 items, 0 in, 0 out |
2015-08-25T16:40:54.780Z [(root)] input: 373690 items, 0 in, 3 out | output: 0 items, 0 in, 0 out |
2015-08-25T16:40:55.781Z [(root)] input: 373688 items, 0 in, 2 out | output: 0 items, 0 in, 0 out |
2015-08-25T16:40:56.783Z [(root)] input: 373685 items, 0 in, 3 out | output: 0 items, 0 in, 0 out |
2015-08-25T16:40:57.784Z [(root)] input: 373683 items, 0 in, 2 out | output: 0 items, 0 in, 0 out |
2015-08-25T16:40:58.784Z [(root)] input: 373680 items, 0 in, 3 out | output: 0 items, 0 in, 0 out |
2015-08-25T16:40:59.785Z [(root)] input: 373677 items, 0 in, 3 out | output: 0 items, 0 in, 0 out |

Is there anyway to tell the system to work on more than 2 - 4 at a time

We have tried the following but it does not appear to help:

data_pump._buffers['output'].size = some higher number

How can I use to read from one mongo collection and update another mongo collection

I have requirement of reading a particular mongo collection and update another mongo collection in the same db based on the _id values of the first mongo collection. after that I need to update the status of each record of the first mongo collection to indicate that they are already processed.

Please instruct me on how to perform this task.

.to() function

DataPumps has a .to() function.

I have used it when using two pumps for buffer management.
However, I am not sure fhis is correct.

Can you provide some documentation on when to use it?

Run/start

Can describe the difference between run() and start()?

I understand it as run() is used with default execution, (with no overrides)

start() is used when you want to override process() or custom buffers.

Is the above correct? Can you update your documentation?

write 2 different files (question)

Hello, is it possible that with 1 pump read from the source once, and write to 2 output files??, the first one will be a .csv file and the second one will be an excel...

this way i wouldnt have to read again the source.. if that makes sense..

Thanks!.

Apply sort on mongodb

Hello,

I'm trying to use the sort method in mongodbmixin, but without success.
I did these tests:

pump.find(query).sort({})
// and
pump._mongo.collection.find(query).sort({}).stream()

What is the way to use the sort?

tks!

RestMixin

In our attempt to use the RestMixin we recieve the following error:

/Users/miker/Projects/TestProjects/data_migration/node_modules/datapumps/node_modules/restler/lib/multipartform.js:43
var parts = path.split(//|/);
^
TypeError: Object # has no method 'split'
at Object.File._basename (/Users/miker/Projects/TestProjects/data_migration/node_modules/datapumps/node_modules/restler/lib/multipartform.js:43:22)
at Object.File (/Users/miker/Projects/TestProjects/data_migration/node_modules/datapumps/node_modules/restler/lib/multipartform.js:35:36)
at Object.exportMethods.file (/Users/miker/Projects/TestProjects/data_migration/node_modules/datapumps/node_modules/restler/lib/multipartform.js:190:12)
at target.file (/Users/miker/Projects/TestProjects/data_migration/node_modules/datapumps/lib/mixin/RestMixin.js:19:27)
at Pump.mixin (/Users/miker/Projects/TestProjects/data_migration/node_modules/datapumps/lib/Pump.js:271:9)
at Object. (/Users/miker/Projects/TestProjects/data_migration/server.js:9:6)
at Module._compile (module.js:456:26)
at Object.Module._extensions..js (module.js:474:10)
at Module.load (module.js:356:32)
at Function.Module._load (module.js:312:12)

Is this related to restler?

Here is the code snippet we are using?

var datapump = require('datapumps');
comic_data_pump = new datapump.Pump();
comic_data_pump
.mixin(datapump.mixin.RestMixin({
get: ('http://www.comicvine.com/api/volumes/'
, {
"multipart": false,
"query": {
"api_key": "xxxxxxxxx",
"format": "json",
"filter":"name:spider-man",
"field_list": "id,name,start_year,description,publisher,image"
}
})}
));

ExcelWriterMixin performance issues

From #9: Another problem is the low write performance: 10columns x 5000 rows => (> 20sec).
excel4node is the fastest module ? Have you any experience in another one.

Source is not configured/Groups

In the below example, when attempting to run, the system fails with "Source is not configured"
This is failing on the extract_pump. Not sure why this is failing in the group

var datapumps = require('datapumps');
var mssql = require('mssql');
Promise = require('bluebird');

var config = {
    user: 'user',
    password: pw',
    server: 'server', // You can use 'localhost\\instance' to connect to named instance
    database: 'db',
    stream: true, // You can enable streaming globally
    requestTimeout: 60000,
    options: {
        encrypt: false // Use this if you're on Windows Azure
    }
}

var connection = new mssql.Connection(config, function (err) {

    if (err) {
        console.log(err);
        return;
    }

    var request = connection.request();

    request.query('select top 10 column from table where anothercolumn = 0', function (err, recordset) {
        // ... error checks
        if (err) {
            console.log(err);
        }

        //Only executed if not streaming i.e streaming = false
        //console.log(recordset);

    });

    //All Event Emitters are used when streaming i.e streaming = true
    request.on('recordset', function (columns) {
        // Emitted once for each recordset in a query
    });

    request.on('row', function (row) {
        // Emitted for each row
        extract_input_buffer.writeAsync(row);

        console.log(extract_input_buffer.content.length);
    });

    request.on('error', function (err) {
        // May be emitted multiple times
    });

    request.on('done', function (returnValue) {
        // Always emitted as the last one
        connection.close();
        extract_input_buffer.seal();

    });

});

var pump_group = new datapumps.group();

var extract_pump = new datapumps.Pump();

var extract_input_buffer = new datapumps.Buffer({size: 100});

var extract_output_buffer = extract_pump.buffer();

extract_pump.errorBuffer().on('write', function (data) {
    console.log(data);
});

extract_pump.buffer('output').on('write', function (data) {
    console.log(extract_output_buffer.content.length);
});

extract_pump.buffer('output').on('end', function (data) {
    console.log("output buffer has ended")
});

var transform_pump = new datapumps.Pump();

extract_pump
    .from(extract_input_buffer)
    .to(transform_pump, 'output')
    .logErrorsToConsole()

transform_pump

    .logErrorsToConsole()

.whenFinished().then(function() {
console.log('Pumps processed.');
});

pump_group.addPump('extract_pump');
pump_group.addPump('transform_pump');


pump_group.start().debug();

ExcelWriterMixin missing Number

writeRow method uses only String method to write cell - it's very confusing.
It would be better to check whether value is the Number ( or NaN )
or expect that column is an object with keys; value, type (String/Number/Formula), style etc ?

if (isNaN(value)){
    this._excel.worksheet.Cell(this._excel.currentRow, index + 1).String(value);
} else {
    this._excel.worksheet.Cell(this._excel.currentRow, index + 1).Number(value);
}

Another problem is the low write performance: 10columns x 5000 rows => (> 20sec).
excel4node is the fastest module ?
Have you any experience in another one.

Anyway, thank you for datapumps.

ExcelWriterMixin null column not allowed

I dump data from mysql query stream and there is null allowed column.
Now I have to check null value and convert it to empty string.
In my opinion it is not necessary. Instead, leave cell empty.

Hanging up

So we are experiencing problems with the system hanging up when it posts data. It is not clear to us where the problem may be. Here is the sample node module we are using
module.exports = (function(rec14AuthToken) {
var fs = require('fs');
var path = require('path');
var datapumps = require('datapumps');
var mssql = require('mssql');
var config = require('../../config');
var candidateSourceSQL = fs.readFileSync(path.join(__dirname, 'candidateSourceQuery.sql'), 'utf8');
var rec14APIUrlCandidates = config.rec14.baseUrl + '/' + config.rec14.tenant + '/api/candidates/';
var Q = require('q');

var log_writer = require('../../helpers/log_writer');
var lw = new log_writer();
lw.init('file.txt');

var startDate = new Date();

var sqlConfig = {
    user: config.rec10.dbServerUserName,
    password: config.rec10.dbServerPassword,
    server: config.rec10.dbServer,
    database: config.rec10.db,
    requestTimeout: config.rec10.dbServerCommandTimeout,
    options: {
        encrypt: true
    }
};

var rows = 0;

var connection = new mssql.Connection(sqlConfig, function (err) {
    if (err) {
        console.log(err);
        return;
    }

    var request = connection.request();
    request.stream = true;

    request.query(candidateSourceSQL, function (err, recordSet) {
        if (err) {
            console.log(err);
            return;
        }
    });

    request.on('row', function(row) {
        //console.log(row);
        data_pump.from().writeAsync(row);
        rows++;
    });

    request.on('recordset', function(columns) {
        // Emitted once for each recordset in a query
        //console.log(columns);

    });

    request.on('done', function () {
        connection.close();
        data_pump.from().seal();
    });
});

// Setup the data pump
var data_pump = new datapumps.Pump().from(new datapumps.Buffer({size: 3000}));

//Process the Post operation
data_pump
    .mixin(datapumps.mixin.RestMixin)
    .process(function (data) {

        var record = {
            "password": "Password123!",
            "is_internal": false,
            "name": {
                "first": data.FirstName,
                "middle": null,
                "last": data.LastName
            },
            "contact_info": {
                "email": data.Email,
                "phone": {
                    "primary": data.PrimaryPhone,
                    "secondary": data.SecondaryPhone
                },
                "address": {
                    "line1": data.Address1,
                    "line2": null,
                    "city": data.City,
                    "state": {
                        "code": data.State
                    },
                    "postal_code": data.Zip,
                    "country": {
                        "code": "USA"
                    }
                }
            }
        };

        return this.postJson(rec14APIUrlCandidates, record, {
                headers: {
                    'Authorization': 'Bearer ' + rec14AuthToken,
                    'Accept': 'application/vnd.xxxxxx+json; version=2',
                    'Content-Type': 'application/json'
                },
                rejectUnauthorized: false,
                followRedirects: true
        }).then(function(response) {
                console.log(data.ID + ' - ' + response.statusCode);

                //if(response.statusCode == 201) {
                //    lw.log_write("INFO", data.ID, response.result.id, 'Candidate Successfully Posted');
                //} else {
                //    response.result.forEach(function(result){
                //        lw.log_write("ERROR", data.ID, '', 'Candidate Did Not Successfully Post - ' + result.message_code);
                //    });
                //}
                return response;
            }.bind(this));

    }).start();

    data_pump.logErrorsToConsole();

    data_pump.on('end', function() {
        console.log('Pumped everything, and all my output buffers are empty. Bye.');
    });

    data_pump.whenFinished().then(function(){
        console.log('finished!!!!');
        var endDate = new Date();
        console.log('Start Date: ' + startDate);
        console.log('End Date: ' + endDate);
        console.log(endDate - startDate);
    });

});

Any plans for supporting rest service endpoints as mixins

We like the fact that this project is pure javascript solution that can be used within NodeJS.
We have been looking for an Open Source solution that provides just this.

We would like to know if there will ever be support(mixin) for Rest Endpoints.
For example doing a Get from a Service endpoint for the Extracting process. (E)
Transforming it (T)
Then doing a Post/Put/Patch for Loading the data to another service endpoint. (L)

This would be extremely helpful to us if there were something like this.

Using the group method on MongoDbMixin

When using the MongoDbMixin to read data, I need to be able to perform a group or aggregate first. So what I have is something like:

eventsPump.mixin(MongodbMixin(...))
      .useCollection('sites')
      .from(eventsPump.group(['status.description'], {type: 'Master'},
        { sites : 0, transactions: 0 },
        function (curr, result) {
          result.sites++;
          if (curr.metricsSummary) {
            result.transactions += curr.metricsSummary.transactions.count;
          }
        }
      ))
...

However, I am getting an error on the .from line:

Error: Argument must be datapumps.Buffer or stream
    at Pump.module.exports.Pump.from (/web/gstv-server-apps-reports/node_modules/datapumps/lib/Pump.js:94:15)

Am I doing something wrong? Or is this not really supported?

Run / Custom Output buffer

In the following example
var datapumps = require('datapumps');
var mssql = require('mssql');
Promise = require('bluebird');

var config = {
user: 'use',
password: 'pw',
server: 'svrp', // You can use 'localhost\instance' to connect to named instance
database: 'db',
stream: true, // You can enable streaming globally
requestTimeout: 60000,
options: {
encrypt: false // Use this if you're on Windows Azure
}
}

var connection = new mssql.Connection(config, function (err) {

if (err) {
    console.log(err);
    return;
}

var request = connection.request();

//Only executed if not streaming i.e streaming = false
request.query('somequery', function (err, recordset) {
    // ... error checks
    if (err) {
        console.log(err);
    }
});

//All Event Emitters are used when streaming i.e streaming = true
request.on('recordset', function (columns) {
    // Emitted once for each recordset in a query
});

request.on('row', function (row) {
    // Emitted for each row
    extract_input_buffer.writeAsync(row);

    console.log(extract_input_buffer.content.length);
});

request.on('error', function (err) {
    // May be emitted multiple times
});

request.on('done', function (returnValue) {
    // Always emitted as the last one
    connection.close();
    extract_input_buffer.seal();

});

});

var extract_pump = new datapumps.Pump();

var extract_input_buffer = new datapumps.Buffer({size: 100});

extract_pump.buffers(
{test_output_buffer: extract_pump.createBuffer()}
);

test_output_buffer = extract_pump.buffer('test_output_buffer');

extract_pump.errorBuffer().on('write', function (data) {
console.log(data);
});

extract_pump.buffer('test_output_buffer').on('write', function (data) {
console.log(test_output_buffer.content.length);
});

extract_pump.buffer('test_output_buffer').on('end', function (data) {
console.log("output buffer has ended")
});

var transform_pump = new datapumps.Pump();

extract_pump
.from(extract_input_buffer)
.to(transform_pump, 'test_output_buffer')
.logErrorsToConsole()
.run()
.then(function(){

    console.log(test_output_buffer);
});

transform_pump
.logErrorsToConsole()
.run()
.then(function () {
var from1buffer = extract_pump.from();
var output1buffer = extract_pump.buffer('test_output_buffer');
var from2buffer = transform_pump.from();
var output2buffer = transform_pump.buffer('output');
console.log(from1buffer.content.length);
console.log(output1buffer.content.length);
console.log(from2buffer.content.length);
console.log(output2buffer.content.length);
});

I create a custom buffer in the extract pump, however it appears when using "run()", it cannot find the 'output' buffer in the extract_pump and reports an error:

{ error: [Error: No such buffer: output], pump: null }
Error in pump (root): Error: No such buffer: output

Can you create custom buffers and use the default execution of "run()", or do you have to override the process() function and use start() when creating custom buffers?

Documentation

We have some questions and any information you provide would be helpful.
What is the right way to handle running out of buffer space?

It is not clear to us whether we should manage the buffers or not especially when we exceed the amount of items in the buffer. Can you provide examples or information that can help us?

We would like to know the different between write and a writeAsync and how sealing the buffer plays into this?

We would like to know when we should seal or not seal?

ExcelWriterMixin - Streaming / memory handling for very large files

Hi there,

Could you please advise whether exporting a very large number of rows (100,000 +) from MySQL will cause the ExcelWriterMixin to build the whole file in memory before creating the file, or if it will stream the output row by row, freeing memory as it goes?

Many thanks.

RestMixin Promise Problem maybe

In the following example there appears to be a problem with the response.
data_pump
.mixin(datapumps.mixin.RestMixin)
.process(function (data) {

        this.post(target_service_end_point, {

            multipart: false,

            headers: {
                'Content-Type' : 'application/json',

                'X-ApiKey' : 
 target_service_end_point_shared_secret
            },

            data: test_record_json
        }).then(function(result, response) {

            console.log(result);
            console.log(response);
            return this.copy(result, response);

        }.bind(this));

    }).start()
    .whenFinished().then(function(result, response){

        console.log(result);
        console.log(response);

});

I attempted to debug it and I found that the RestMixin does have a valid response:
return restler[methodName].apply(restler, methodArgs).on('complete', function(result, response) {
if (result instanceof Error) {
return reject(result);
} else {
return resolve(result, response);
}
});
It appears however, when it gets into resolve. something is changing the response to undefined. When I get back into the promise chain,
i.e.
).then(function(result, response) {

            console.log(result);
            console.log(response);

'response' is undefined

I am stumped on this one and not sure how to debug it. Any assistance would be appreciated.

MySQL to Postgres - complex transformations

Hello....

I am reviewing packages for an upcoming ETL process to migrate from a MySQL db to Postgres.

As with most systems there are Accounts and Users as top level entities but primary key IDs will change etc. We would also have a requirement to be able to run the processes for a single (or multiple Accounts) as well as potentially do-overs for individual Users. The catch is that the schemas are going to vary significantly in some cases (which is why generic "convert this DB" scripts and libraries are not suitable).

While some information will just be discarded, Tables may need to be split or merged. Existing data will all have to port (as cleanly as possible) with existing relationships intact (with new primary keys) etc.

I did look through the issues (both Open and Closed) and didn't see anything specific that would lead me to using node-datadumps but it "feels" like I should be able to accomplish this.

I am happy to write my own bespoke system (as this will be a "one off" once everyone is migrated) but, if I can leverage this library then I'd like to.

I am not looking for specific answers right now - just a: "Yes - this library can work for the above use-case" type of response and a hope that I can ask for support as required.

Regards and thanks for a response in advance.

Whenfinished Promise

In the follow abbreviated example, everything appears to be working as expected. However after further investigation, it appear we never get a response from the whenFinished promise.
The line console.log("The processing is finished"); never executes and there is nothing reported in logErrorsToConsole()

This does not appear to be by design but wanted to make nothing is missed.

data_pump
.mixin(datapumps.mixin.RestMixin)
.process(function (data) {

        console.log(target_service_end_point)
       return  this.post(target_service_end_point, {

            multipart: false,

            headers: {
                'Content-Type' : 'application/json',

                'X-ApiKey' : 
 target_service_end_point_shared_secret
            },

            data: record
        }).then(function(response) {

            if(response.statusCode == 201)
            {
                candidate_map.push({ "source":data.id, "destination":response.result.id } )
            }
            //console.log(response);
            return this.copy(response);
        }.bind(this));

    }).logErrorsToConsole().start()
    .whenFinished().then(function(data){

        console.log("The processing is finished");
});
console.log(candidate_map);

Error while insert item in MySql

Hello,

I have a problem when I try to get data from a MongodbMixin to insert in a MysqlMixin. I'm getting the error:

Error in pump (root): Error: ER_DUP_ENTRY: Duplicate entry '[email protected]' for key 'UQ_Contat_contact_email'
Unhandled rejection PumpingFailedError: Pumping failed. See .errorBuffer() contents for error messages

I don't understanding the error message "Unhandled rejection PumpingFailedError: Pumping failed. See .errorBuffer() contents for error messages".

Can you help me?

Follow my code:

var
    mysql = require('mysql'),
    datapumps = require('datapumps'),
    Pump = datapumps.Pump,
    MongodbMixin = datapumps.mixin.MongodbMixin,
    MysqlMixin = datapumps.mixin.MysqlMixin,
    pump = new Pump();

var connection = mysql.createConnection({
    host: 'localhost',
    user: 'database',
    password: 'user',
    database: 'password'
})

pump
    .mixin(MongodbMixin('mongodb://localhost/database'))
    .useCollection('client')
    .from(pump.find({}))

    .mixin(MysqlMixin(connection))

    .process(function (client) {
        return this.query(`INSERT INTO contact \
            (contact_name,contact_email) VALUES (?)`,
            [client.name, client.contact.email]);
    })
    .logErrorsToConsole()
    .run()
    .then(function () {
        console.log("Done writing contacts to file");
    });

ETL process using the MongodbMixin works, but the node process continues to run...

Hello,

I have been working on a code snippet that uses pump groups to extract data from a rest service, transform the data, and then load it into MongoDB. The code below actually executes the ETL process successfully. Additionally, the "whenFinished" function of the pump group is invoked, but the actual Node process does not terminate. I have to invoke "process.exit()" to force termination. I am only seeing this behavior when using the MongodbMixin. Is this maybe related to an open mongo connection? or is the pump group in a bad state?..... Below I have attached the code snippet. Any help is greatly appreciated. Thanks

var datapumps = require('datapumps');
var pumpGroup = datapumps.group();

//add two pumps
var pumpOne = pumpGroup.addPump('pumpOne');
var pumpTwo = pumpGroup.addPump('pumpTwo');

//configure the pumps
pumpOne
.mixin(datapumps.mixin.RestMixin)
.from(pumpOne.createBuffer())
.get('http://www.comicvine.com/api/volumes/', {
"multipart": false,
"query": {
"api_key": "XXXXXXXX",
"format": "json",
"filter": "name:spider-man",
"field_list": "id,name,start_year,description,publisher,image"
}
})
.then(function (volumes) {
volumes.results.forEach(function (volume) {
//write to the input buffer of pumpOne
pumpOne.from().writeAsync(volume);
});
});

//pumpTwo input buffer is the output buffer of pumpOne
pumpTwo
.mixin(datapumps.mixin.MongodbMixin('mongodb://127.0.0.1:27017/pumps'))
.from(pumpOne.buffer())
.useCollection('volumes')
.process(function(volume){
pumpTwo.insert({name: volume.name});
});

pumpOne
.process(function (volume) {
//transformation
volume.name = volume.name + ' copying records to MongoDB';

    //write to the output buffer of pumpOne
    this.buffer().writeAsync(volume);

    if (pumpOne.from().isEmpty()) {
        pumpOne.from().seal();
    }
});

pumpOne.whenFinished().then(function(){
console.log('Pump 1 has finished');
});

pumpTwo.whenFinished().then(function() {
console.log('Pump 2 has finished');
});

pumpGroup
.start()
.whenFinished().then(function() {
if(!pumpOne.errorBuffer().isEmpty()){
console.log(pumpOne.errorBuffer().getContent());
}
console.log('_ENDED ETL PROCESS_');

    console.log('Output buffer for pump one', pumpOne.buffer('output'));
    console.log('Output buffer for pump two', pumpTwo.buffer('output'));

    //not sure why the process does not terminate on its own...
    process.exit(0);
});

How to do multiple processBatch commands?

I have a pump that I'd like to apply several batch processes to. I can't work out how to do this, as the last .processBatch command seems to overwrite all the preceding ones.

For example, I'd like to add the 'timestamp' attribute, perform some transformation then upsert to my SalesForce API. Here is an excerpt of my pipeline...

  etl.pump = new Pump();

  etl.pump
    .mixin(BatchMixin)
    .batchSize(50)
    .from(connections.mysql.query(etl.sourceSql).stream())
    .mixin(SalesForceMixin({connection : sfConnection}))
    .processBatch((rows) => etl.pump.addAttribute(rows, 'timestamp', batchTimestamp))
    .processBatch((rows) => etl.pump.transform(rows, transformFn))
    .processBatch((rows) => etl.pump.upsertRows(rows))
    .logErrorsToConsole()
    .run()
    .then(() => {
      console.log(etl.name + ': done');
      connections.mysql.end();
      connections.salesforce.disconnect();
    })

My addAttribute function in my SalesForceMixin returns a promise that transforms the item...

    target.transform = (_items, transformer) => {
      return new Promise((resolve) => resolve(_items.map(transformer)));
    };

I can see that only the 3rd processBatch command is executed: etl.pump.upsertRows. Looking into the code, I think I can see why; the processBatch command overrides the _processBatch function.

Am I missing something? I don't really want to do all my transformation and loading in one function.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.