Giter Club home page Giter Club logo

node-pg-copy-streams's Introduction

pg-copy-streams

Build Status

COPY FROM / COPY TO for node-postgres. Stream from one database to another, and stuff.

how? what? huh?

Did you know that PostgreSQL supports streaming data directly into and out of a table? This means you can take your favorite CSV or TSV file and pipe it directly into an existing PostgreSQL table.

PostgreSQL supports text, csv/tsv and binary data. If you have data in another format (say for example JSON) convert it to one of the supported format and pipe it directly into an existing PostgreSQL table !

You can also take a table and pipe it directly to a file, another database, stdout, even to /dev/null if you're crazy!

What this module gives you is a Readable or Writable stream directly into/out of a table in your database. This mode of interfacing with your table is very fast and very brittle. You are responsible for properly encoding and ordering all your columns. If anything is out of place PostgreSQL will send you back an error. The stream works within a transaction so you wont leave things in a 1/2 borked state, but it's still good to be aware of.

If you're not familiar with the feature (I wasn't either) you can read this for some good helps: https://www.postgresql.org/docs/current/sql-copy.html

examples

pipe from a table to stdout (copyOut - copy-to)

var { Pool } = require('pg')
var { to as copyTo } = require('pg-copy-streams')

var pool = new Pool()

pool.connect(function (err, client, done) {
  var stream = client.query(copyTo('COPY my_table TO STDOUT'))
  stream.pipe(process.stdout)
  stream.on('end', done)
  stream.on('error', done)
})


// async/await
import { pipeline } from 'node:stream/promises'
import pg from 'pg'
import { to as copyTo } from 'pg-copy-streams'

const pool = new pg.Pool()
const client = await pool.connect()
try {
  const stream = client.query(copyTo('COPY my_table TO STDOUT'))
  await pipeline(stream, process.stdout)
} finally {
  client.release()
}
await pool.end()

Important: When copying data out of postgresql, postgresql will chunk the data on 64kB boundaries. You should expect rows to be cut across the boundaries of these chunks (the end of a chunk will not always match the end of a row). If you are piping the csv output of postgres into a file, this might not be a problem. But if you are trying to analyse the csv output on-the-fly, you need to make sure that you correctly discover the lines of the csv output across the chunk boundaries. We are not recommending any specific streaming csv parser but csv-parser and csv-parse seem to correctly handle this.

pipe from a file to table (copyIn - copy-from)

var fs = require('node:fs')
var { Pool } = require('pg')
var { from as copyFrom } = require('pg-copy-streams')

var pool = new Pool()

pool.connect(function (err, client, done) {
  var stream = client.query(copyFrom('COPY my_table FROM STDIN'))
  var fileStream = fs.createReadStream('some_file.tsv')
  fileStream.on('error', done)
  stream.on('error', done)
  stream.on('finish', done)
  fileStream.pipe(stream)
})


// async/await
import { pipeline } from 'node:stream/promises'
import fs from 'node:fs'
import pg from 'pg'
import { from as copyFrom } from 'pg-copy-streams'

const pool = new pg.Pool()
const client = await pool.connect()
try {
  const ingestStream = client.query(copyFrom('COPY my_table FROM STDIN'))
  const sourceStream = fs.createReadStream('some_file.tsv')
  await pipeline(sourceStream, ingestStream)
} finally {
  client.release()
}
await pool.end()

Note: In version prior to 4.0.0, when copying data into postgresql, it was necessary to wait for the 'end' event of pg-copy-streams.from to correctly detect the end of the COPY operation. This was necessary due to the internals of the module but non-standard. This is not true for versions including and after 4.0.0. The end of the COPY operation must now be detected via the standard 'finish' event. Users of 4.0.0+ should not wait for the 'end' event because it is not fired anymore.

In version 6.0.0+, If you have not yet finished ingesting data into a copyFrom stream and you want to ask postgresql to abort the process, you can call destroy() on the stream (or let pipeline do it for you if it detects an error in the pipeline). This will send a CopyFail message to the backend that will rollback the operation. Please take into account that this will not revert the operation if the CopyDone message has already been sent and is being processed by the backend.

duplex stream for replication / logical decoding scenarios (copyBoth - copy-both)

This is a more advanded topic. Check the test/copy-both.js file for an example of how this can be used.

Note regarding logical decoding: Parsers for logical decoding scenarios are easier to write when copy-both.js pushes chunks that are aligned on the copyData protocol frames. This is not the default mode of operation of copy-both.js in order to increase the streaming performance. If you need the pushed chunks to be aligned on copyData frames, use the alignOnCopyDataFrame: true option.

install

$ npm install pg-copy-streams

notice

This module only works with the pure JavaScript bindings. If you're using require('pg').native please make sure to use normal require('pg') or require('pg.js') when you're using copy streams.

Before you set out on this magical piping journey, you really should read this: http://www.postgresql.org/docs/current/static/sql-copy.html, and you might want to take a look at the tests to get an idea of how things work.

Take note of the following warning in the PostgreSQL documentation:

COPY stops operation at the first error. This should not lead to problems in the event of a COPY TO, but the target table will already have received earlier rows in a COPY FROM. These rows will not be visible or accessible, but they still occupy disk space. This might amount to a considerable amount of wasted disk space if the failure happened well into a large copy operation. You might wish to invoke VACUUM to recover the wasted space.

benchmarks

The COPY command is commonly used to move huge sets of data. This can put some pressure on the node.js loop, the amount of CPU or the amount of memory used. There is a bench/ directory in the repository where benchmark scripts are stored. If you have performance issues with pg-copy-stream do not hesitate to write a new benchmark that highlights your issue. Please avoid to commit huge files (PR won't be accepted) and find other ways to generate huge datasets.

If you have a local instance of postgres on your machine, you can start a benchmark for example with

$ cd bench
$ PGPORT=5432 PGDATABASE=postgres node copy-from.js

tests

In order to launch the test suite, you need to have a local instance of postgres running on your machine.

Since version 5.1.0 and the implementation of copy-both.js for logical decoding scenarios, your local postgres instance will need to be configured to accept replication scenarios :

postgresql.conf
  wal_level = logical
  max_wal_senders > 0
  max_replication_slots > 0

pg_hba.conf
  make sure your user can connect using the replication mode
$ PGPORT=5432 PGDATABASE=postgres make test

contributing

Instead of adding a bunch more code to the already bloated node-postgres I am trying to make the internals extensible and work on adding edge-case features as 3rd party modules. This is one of those.

Please, if you have any issues with this, open an issue.

Better yet, submit a pull request. I love pull requests.

Generally how I work is if you submit a few pull requests and you're interested I'll make you a contributor and give you full access to everything.

Since this isn't a module with tons of installs and dependent modules I hope we can work together on this to iterate faster here and make something really useful.

changelog

version 6.0.6 - published 2023-07-17

  • copy-to: fix rowCount in BINARY mode. The file trailer was incorrectly counted as a row

version 6.0.5 - published 2023-03-07

  • improve ejs/cjs Named exports compatibility for easier async/await usage

version 6.0.4 - published 2022-09-05

version 6.0.3 - published 2022-09-05

  • copy-from: fix issue #136 when the _writev mechanism was triggered with a very large number of chunks

version 6.0.2 - published 2021-09-13

  • copy-from : fix interaction with pg optional timeout mechanism

version 6.0.1 - published 2021-08-23

  • Bugfix for node 14+. The order of _destroy / _final calls are different before and after node 14 which caused an issue with the COPY FROM _destroy implementation that appeared in version 6.0.0.

version 6.0.0 - published 2021-08-20

  • Implement _destroy in COPY FROM operations. pipeline will automatically send a CopyFail message to the backend is a source triggers an error. cf #115

This version is a major change because some users of the library may have been using other techniques in order to ask the backend to rollback the current operation.

version 5.1.1 - published 2020-07-21

Bugfix release handling a corner case when an empty stream is piped into copy-from

  • fix copy-from.js handling of an empty source

version 5.1.0 - published 2020-06-07

This version adds a Duplex stream implementation of the PostgreSQL copyBoth mode described on https://www.postgresql.org/docs/9.6/protocol-flow.html. This mode opens the possibility of dealing with replication and logical decoding scenarios.

  • implement copy-both.js

version 5.0.0 - published 2020-05-14

This version's major change is a modification in the COPY TO implementation. The new implementation now extends Readable while previous version where extending Transform. This should not have an effect on how users use the module but was considered to justify a major version number because even if the test suite coverage is wide, it could have an impact on the streaming dynamics in certain edge cases that are not yet captured by the tests.

  • Rewrite copy-to in order to have it extend Readable instead of Transform

version 4.0.0 - published 2020-05-11

This version's major change is a modification in the COPY FROM implementation. In previous version, copy-from was internally designed as a Transform duplex stream. The user-facing API was writable, and the readable side of the Transform was piped into the postgres connection stream to copy the data inside the database. This led to an issue because Transform was emitting its 'finish' too early after the writable side was ended. Postgres had not yet read all the data on the readable side and had not confirmed that the COPY operation was finished. The recommendation was to wait for the 'end' event on the readable side which correcly detected the end of the COPY operation and the fact that the pg connection was ready for new queries. This recommendation worked ok but this way of detecting the end of a writable is not standard and was leading to different issues (interaction with the finished and pipeline API for example) The new copy-from implementation extends writable and now emits 'finish' with the correct timing : after the COPY operation and after the postgres connection has reached the readyForQuery state. Another big change in this version is that copy-to now shortcuts the core pg parsing during the COPY operation. This avoids double-parsing and avoids the fact that pg buffers whole postgres protocol messages.

  • Rewrite copy-from in order to have it extend Writable instead of Transform
  • Modify copy-to to shortcut the pg protocol parser during the COPY operation
  • Add Stream compliance tests for copy-to and copy-from

version 3.0.0 - published 2020-05-02

This version's major change is a modification in the COPY TO implementation. In the previous versions, a row could be pushed downstream only after the full row was gathered in memory. In many cases, rows are small and this is not an issue. But there are some use cases where rows can grow bigger (think of a row containing a 1MB raw image in a BYTEA field. cf issue #91). In these cases, the library was constantly trying to allocate very big buffers and this could lead to severe performance issues. In the new implementation, all the data payload received from a postgres chunk is sent downstream without waiting for full row boundaries.

Some users may in the past have relied on the fact the the copy-to chunk boundaries exactly matched row boundaries. A major difference in the 3.x version is that the module does not offer any guarantee that its chunk boundaries match row boundaries. A row data could (and you have to realize that this will happen) be split across 2 or more chunks depending on the size of the rows and on postgres's own chunking decisions.

As a consequence, when the copy-to stream is piped into a pipeline that does row/CSV parsing, you need to make sure that this pipeline correcly handles rows than span across chunk boundaries. For its tests, this module uses the csv-parser module

  • Add prettier configuration following discussion on brianc/node-postgres#2172
  • Rewrite the copy-to implementation in order to avoid fetching whole rows in memory
  • Use mocha for tests
  • Add new tests for copy-to.js focusing on chunk boundaries
  • Add integration tests for two streaming csv parsers: csv-parser and csv-parse
  • Add eslint
  • Add test for quick&dirty bytea binary extraction
  • Add benchmark for copy-to in bench/copy-to.js

version 2.2.2 - published 2019-07-22

  • Bugfix copy-to could pause the client connection, preventing re-use

version 2.2.1 - published 2019-07-22

  • Bugfix copy-from was not correctly unpiped from the the connection stream

version 2.2.0 - published 2019-03-21

  • Small refactor in copy-from passing from 3 push to 2 push in every chunk transform loop
  • Add bench/ directory for benchmarks
  • Add benchmark to compare performance of pg-copy-stream wrt psql during copy-from
  • Add benchmark to measure memory usage of copy-from

version 2.1.0 - published 2019-03-19

  • Change README to stop using the pg pool singleton (removed after pg 7.0)
  • Do not register copy-to.pushBufferIfNeeded on the instance itself (avoid dangling method on the object)
  • Fix copy-to test wrt intermittent unhandled promise bug
  • Add tests regarding client re-use

version 2.0.0 - published 2019-03-14

This version's major change is a modification in the COPY TO implementation. In the previous version, when a chunk was received from the database, it was analyzed and every row contained within that chunk was pushed individually down the stream pipeline. Small rows could lead to a "one chunk" / "thousands of row pushed" performance issue in node. Thanks to @rafatower & CartoDB for the patch. This is considered to be a major change since some people could be relying on the fact that each outgoing chunk is an individual row.

Other changes in this version

  • Use Strict
  • Travis deprecation of old node version (0.12, 0.4). Support LTS 6, 8, 10 and Current 11
  • Update dev dependencies (pg, lodash)
  • Stop using deprecated Buffer constructor
  • Add package-lock.json

license

The MIT License (MIT)

Copyright (c) 2013 Brian M. Carlson

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

node-pg-copy-streams's People

Contributors

ab-pm avatar alexconlin avatar brianc avatar dependabot[bot] avatar drob avatar gabegorelick avatar jaweesner avatar jbergknoff avatar jeromew avatar k-funk avatar pensierinmusica avatar pspi avatar thecatontheflat avatar ubnt-michals avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

node-pg-copy-streams's Issues

Memory Usage directly proportional to the data

I used this library to load records from an Oracle database to a Postgres database. However the node process crashes ๐Ÿ’ฃ after running a while with a garbage collection exception.

I have narrowed down the script just to include node-pg-copy-streams and read from a file on disk.

Test with a small file small.file

> ls -lh small.file
387B 
> node -v
v8.11.3

Instrumented the script to measure Resident Set Size and reported memory usage 25MB

Starting Memory Usage 24 MB
Successfully truncated table timeline.score
The script used approximately 25 MB

Not bad. ๐Ÿ†’

Now tried the same program with a large file big.file

> ls -lh big.file
214M

This time the memory usage reported is directly proportional to the input given 244MBโ€ผ๏ธ

> node scripts/test.js
Starting Memory Usage 24 MB
Successfully truncated table timeline.score
The script used approximately 244 MB
Done

Relevant portions of the script:

const inputFileStream = fs.createReadStream('./big.file');
...
const writeClient = await this.writePoolPg.connect();
const copyQueryStream = copyFrom(`COPY ${destTableName} FROM STDIN`);

const writeStream = writeClient.query(copyQueryStream);
inputFileStream
    .pipe(writeStream);

Full code

I suspect the library is not providing the advantage we would expect from streams where one would expect memory to not grow directly proportional to the input.

Error TypeError: Object [object Object] has no method 'pipe'

Code:

var pg = require('pg');
var fs = require('fs');

var writableStream = fs.createWriteStream("foo4.csv");
pg.connect(config.POSTGRESURL, function (err, db, close) {
    var stream = db.query(copyTo("COPY (select id from users) TO STDOUT WITH CSV HEADER"));
    debugger;
    stream.pipe(writableStream);
    stream.on('end', function(){
        close();
        writableStream.end();
        return callback(null);
    });
    stream.on('error', function(err) {
        close();
        writableStream.end();
        logger.error(logPrefix + "exportToCsv stream error exporting data", err, query);
        return callback(err);
    });
});

The bizarre thing is that I have this working in another project! Both are using node-postgres 3.3.0 and I am using pg-copy-streams 0.2.4 in both. Any ideas?

Can I write in the stream?

Here's my scenario (I will try to write it short):

Load a file as stream, upon the first row/chunk create a DB table then COPY the rest of the file to the created table.

I have problems with this asynchrony of creating the db table, because for pg-copy-streams to properly work (i guess) the table already need to exist so I can't just simply use .pipe(stream) because upon the first chunk I won't have the table created yet. So here's what I tried to do

  1. create a Transformer
  2. Create control flow, upon first row create the table
  3. While the table is being created buffer the chunks
  4. When the table is created write the chunks to the stream like stream.write(chunks)

But nothing happens I can't see anything in my table and there's no error either. Maybe I can't use stream.write() ?

Here's an excerpt of my code

const Transform = require('stream').Transform;
class Transformer extends Transform {
    constructor(tableName, client) {
        super();
        this.tableName = tableName;
        this.client = client;
        this.stream = null;
        this.chunks = [];
        this.isFirstChunk = true;
        this.creatingDbTable = true;
    }
    _transform(chunk, encoding, cb) {
        const _this = this;
        if (this.isFirstChunk) {
            this.isFirstChunk = false;
            console.log('Start working on first chunk');
            const parsedRecord = syncParse(chunk);
            const queryText = tableCreator(this.tableName, parsedRecord[0]);
            this.client.query(queryText, function (err) {
                if (err) console.error(err);
                _this.stream = _this.client.query(copyFrom(`COPY "${_this.tableName}" FROM STDIN`));

                _this.stream.on('error', (_err) => {
                    console.error('CERR', _err);
                });

                _this.stream.on('end', () => {
                    console.log('Stream Ended');
                });

                console.log('Created DB table');
                _this.creatingDbTable = false;
                cb();
            });
        } else if (this.creatingDbTable) {
            this.chunks.push(chunk);
            cb();
        } else {
            this.chunks.push(chunk);
            this.chunks.forEach(_chunk => {
                this.stream.write(_chunk);
            });
            cb();
        }
    }
}

and also the pipe

const client = pgclient();
const transformer = new Transformer(tableName, client);
this.fileLoaderService
            .loadFileAsStream(fileKey)
            .pipe(es.split())
            .pipe(transformer)
            .on('error', (err) => {
                console.error(err);
            })
            .on('end', () => {
                console.log('Stream ended');
            });

Any ideas? As you can see I'm not that familiar with node streams yet :/

Error Callback Not Initiating on PG Constraint Error

Hi Brian,

Thank you so much for this tool. It easily streams 40,000 rows + into our Postgres database. I have one issue though, I'm unable to get it to throw errors when there is a problem. Certain tables within our database have unique constraints. When a user attempts to upload a duplicated file, the database throws an error, but I can't get this to bubble back to the user. I've tinkered with the error callback provided, but for whatever reason it doesn't trigger when a database error is thrown.

Do you have any recommendations? Please let me know if I'm doing something wrong here.

Thanks again!
Brandon

I'm using this code:

pg.connect(connectionString, function(err, client, done) {
if(err) {
return console.error('could not connect to postgres', err);
}
var stream = client.query(copyFrom(query));
var s = new Readable;
s.push(csv);
s.push(null);
s.pipe(stream).on('finish', function () {
res.send(200);
done();
}).on('error', function () {
res.send(500);
done();
});
});
};

Does not work with pg > 3.0.0

I am getting errors when using this library with versions of node-postgres (pg) greater than 3. It looks like the activeQuery is not getting cleaned up when the copy query finishes, causing a connection error when the pg connection is closed (via client.end() or pg.end()).

Connection Terminated - Node 4.2.2

Connecting to Postgres 9.3 / AWS RDS
Running under node 4.2.2

Recently switched from an old version of PG with built in copy streams to pg 4.4.3 and node-pg-copy-streams 0.3.0

After a while export is terminating with:

stream error exporting data [Error: Connection terminated]

Tried merging: #28 to see if it helped but no go.

Any thoughts?

can we use querystream (multiple insert statement)

Hi,
below is provided sample code for pg copy streams

var fs = require('fs');
var pg = require('pg');
var copyFrom = require('pg-copy-streams').from;

pg.connect(function(err, client, done) {
  var stream = client.query(copyFrom('COPY my_table FROM STDIN'));
  var fileStream = fs.createReadStream('some_file.tsv')
  fileStream.on('error', done);
  stream.on('error', done);
  stream.on('end', done);
  fileStream.pipe(stream);
});

my query is can we stream multiple insert statements rather than tsv file . since we can create querystream

something like below codebase

  pg.connect(connectionString,function(err, client, done) {
  if(err) 
    {
        console.log(err)
    }

  
var basescrp=format('INSERT INTO srt (starttime, endtime,content,showname,season,ep,createdat,updatedat) VALUES %L', data)

 
  var stream = client.query(copyFrom('COPY srt FROM STDIN'));
  
  //release the client when the stream is finished
  var rs = new QueryStream(basescrp)
  
rs.pipe(stream).on('finish', done).on('error', function (err) {
    console.log(err);
    done();
  });

})

since above code does not work actually !
any pointer on how to stream bulkinsert into postgres

Strange output for on('data', ...) event

The code looks like this

const copyFrom = require('pg-copy-streams').from;


const columns = [
        'col0',
        'col1', 
        'col2',
        'col3',
        'col4',
        'col5',
        'col6',
        'col7',
        'col8',
        'col9'
];

let sql = `COPY my_table (${columns.join(',')}) FROM STDIN (DELIMITER '|', NULL '')`;
const client = new pg.Client(...);
await client.connect();
const fromStream = client.query(copyFrom(sql));
fromStream.on('data', function (data) {
                console.log(' ~~~ DATA ~~~', data.toString());
            });

fromStream.write('XXXXXX001|123456|alpha|1991||49|25|13145253.38|13145253.38|100');
fromStream.write('XXXXXX002|123456|alpha|1991||49|25|13145253.38|13145253.38|100');
fromStream.end();

Event on('data'), however is printing additional lines.


~~~ DATA ~~~~ dB
~~~ DATA ~~~~ XXXXXX001|123456|alpha|1991||49|25|13145253.38|13145253.38|100
~~~ DATA ~~~~ dB
~~~ DATA ~~~~ XXXXXX002|123456|alpha|1991||49|25|13145253.38|13145253.38|100
~~~ DATA ~~~~ c

What am I missing?
It is not inserting data into the table so while debugging I found this so asking if that is the bug or expected.

NPM?

It doesn't appear that this module is on NPM. Is that intentional? Am I just overlooking it?

It's not that huge of an issue, but it was a little unexpected.

Are query parameters supported?

Copy supports queries, not just tables:

COPY { table_name [ ( column [, ...] ) ] | ( query ) }

Is it possible to send parameters to the query included in the copyTo command? Our use case here is that we are creating an csv export service, and with this library, we can export csv data directly from postgres to s3

Synchronous or Promises management

Hello,

I work with Google Function and Google don't like when there is background task.
I try to make a Promise resolved in the end event of filestream but there is nothing to do, it's don't work.

here is an exemple of what I made:

const promise = new Promise((resolve, reject) => {
            pool.connect((err, client, done) => {
                let fsStream = client.query(copyFrom('COPY table (some data) FROM STDIN WITH (FORMAT CSV, DELIMITER \';\')'));

                fsStream.on('end', () => {
                    pool.end();
                    res.set({
                        "Access-Control-Allow-Origin": "*",
                        "Access-Control-Allow-Credentials": "true",
                    });
                    res.status(200).send('OK');
                    resolve('OK');
                });
                fsStream.on('error', (e) => {
                    console.log('error', e);
                    process.stdout.write(' KO\n');
                    pool.end();
                    reject(e);
                });

               
                for (const data of datas) {
                    csv += 'some data';

                    if (csv.length > 30878915) {
                        fsStream.write(csv);
                        csv= '';
                    }
                }
               
                fsStream.write(csv);
                fsStream.end(null);
            });
        });
        return promise;

there is a way to do copyFrom with promise or something synchronous ?

`finish` emitted before data is copied

I'm running my code in AWS lambda, wherein you call done when you're process is ready to exit.

        /** @type {NodeJS.WritableStream} */
        var pgStream = client.query(copyFrom(copyQuery));

        csvStream
          .pipe(pgStream)
          .on('error', function(e){
            logerr('pg stream error');
            done(e);
          })
          .on('finish', function(){
            console.timeEnd('query time');
            done(); //////////////// <---- Offending line
          });

This always exits before the data is copied. If I remove done, the process won't exit but the data does copy. Why is finish getting called before data is streamed? Should I be listening for "finish" elsewhere?

Warning about listener count.

I'm using node-pg-copy-streams with some simple queries. (In particular, COPY (SELECT * FROM foo WHERE id >= 12345) TO STDOUT and COPY bar FROM STDIN.)

Everything appears to be working correctly, but I get the warning below. It appears to be internal to the stream created by to, as opposed to from, and not from the client code. (I.e., the from/to streams I'm using only have 1-2 listeners each.) Is this something I should be worried about?

I can pull out code if necessary. If it matters, this only happens when I've done multiple copies via copy-streams. (I can't pin down the number, but it appears to be 3-4.)

(node) warning: possible EventEmitter memory leak detected. 11 listeners added. Use emitter.setMaxListeners() to increase limit.
Trace
    at Socket.EventEmitter.addListener (events.js:160:15)
    at Socket.Stream.pipe (stream.js:71:12)
    at CopyStreamQuery.submit (/usr/local/heap/node_modules/pg-copy-streams/copy-to.js:23:21)
    at Client._pulseQueryQueue (/usr/local/heap/node_modules/pg/lib/client.js:279:24)
    at Client.query (/usr/local/heap/node_modules/pg/lib/client.js:322:8)
    at copyDataFromQueries (/usr/local/heap/back/db/citus/cluster.coffee:54:29)
    at copyData (/usr/local/heap/back/db/citus/cluster.coffee:393:16)
    at iterate (/usr/local/heap/node_modules/async/lib/async.js:131:13)
    at /usr/local/heap/node_modules/async/lib/async.js:142:25
    at null.<anonymous> (/usr/local/heap/back/db/citus/cluster.coffee:72:14)
    at g (events.js:175:14)
    at EventEmitter.emit (events.js:117:20)
    at finishMaybe (_stream_writable.js:354:12)
    at endWritable (_stream_writable.js:361:3)
    at Writable.end (_stream_writable.js:339:5)
    at onend (_stream_readable.js:483:10)
    at g (events.js:175:14)
    at EventEmitter.emit (events.js:117:20)
    at _stream_readable.js:920:16
    at process._tickCallback (node.js:415:13)

Error: expected "d" (copydata message)

Hi @brianc! Running the following block of code:

{from, to} = require 'pg-copy-streams'
# ... pg.Client setup ..
fromStream = remoteClient.query(to("COPY (SELECT col1, col2, col3 FROM tbl) TO STDOUT"))
toStream = localClient.query(from("COPY tbl(col1, col2, col3) FROM STDIN"))
fromStream.on 'error', cleanup
toStream.on 'error', cleanup
toStream.on 'end', cleanup
fromStream.pipe toStream

Yields the following error:

Error: expected "d" (copydata message)
  at [object Object].CopyStreamQuery._transform (/usr/local/heap/node_modules/pg-copy-streams/copy-to.js:73:33)
  at [object Object].Transform._read (_stream_transform.js:179:10)
  at [object Object].Transform._write (_stream_transform.js:167:12)
  at doWrite (_stream_writable.js:223:10)
  at writeOrBuffer (_stream_writable.js:213:5)
  at [object Object].Writable.write (_stream_writable.js:180:11)
  at Socket.ondata (stream.js:51:26)
  at Socket.EventEmitter.emit (events.js:95:17)
  at Socket.<anonymous> (_stream_readable.js:746:14)
  at Socket.EventEmitter.emit (events.js:92:17)
  at emitReadable_ (_stream_readable.js:408:10)
  at emitReadable (_stream_readable.js:404:5)
  at readableAddChunk (_stream_readable.js:165:9)
  at Socket.Readable.push (_stream_readable.js:127:10)
  at TCP.onread (net.js:526:21)

Some potentially pertinent bits of info:

  • Running latest versions of pg-copy-streams and pg.
  • Remote table is ~100GB.
  • Both local and remote instances are running psql 9.2.6.
  • Works successfully when copying just a few rows of the source table.

Let me know if there's other debug information I should provide. Thanks!

rowCount not working as intended?

The CopyStreamQuery class exposes a property rowCount which does not appear to work correctly. Here's a slightly modified example from the readme:

var fs = require('fs');
var pg = require('pg');
var copyFrom = require('pg-copy-streams').from;

pg.connect("postgres://...", function(err, client, done) {
  client.query("CREATE TABLE IF NOT EXISTS my_table (col1 INT, col2 INT, col3 INT)");
  copyQuery = copyFrom('COPY my_table FROM STDIN');
  var stream = client.query(copyQuery);
  var fileStream = fs.createReadStream('some_file.tsv')
  fileStream.pipe(stream).on('finish', function() { console.log('rowCount: %d', copyQuery.rowCount); process.exit(); });
});

I prepared some_file.tsv with 4 rows. The output is rowCount: 1. The data passing through CopyStreamQuery._transform is

<Buffer 31 09 32 09 33 0a 34 09 35 09 36 0a 37 09 38 09 39 0a 31 30 09 31 31 09 31 32 0a>
i.e. the contents of the entire some_file.tsv:
1   2   3
4   5   6
7   8   9
10  11  12

I can split my inbound stream on line breaks, but, unless I do, the rowCount feature seems to not be working as intended.

Parsing & Deparsing binary streams

Hello,

@brianc I just published the module I was working on for handling the binary streams.

https://www.npmjs.com/package/pg-copy-streams-binary

I tried to find a useful API but tell me what you think.

Note that inside the module, I started a new pg_types.js that has nothing to do with pg-types because I needed both parsing & deparsing. We might want to see if it could make sense to merge these in the longer term.

I'd be glad to have your feedback if you find time to test it !

Thanks again for your work on the pg galaxy.

Dependencies in node_modules

I noticed that the dependencies are installed into the root of node_modules... that's bad practise. They should be in the lib folder...

Cannot execute on ืable names with uppercase letter

Hi,

When I am trying to copy table content when the table name has upper case it fails, saying that the table not exist (with the lower cased name of the table).

e.g. when trying to execute the following: sCOPY my_schema.Entity TO STDOUT
I get error: error: relation "my_schema.entity" does not exist

After renaming the table name to lower case it works

Invalid byte sequence error

I am getting this error while trying to import a CSV file with about 400k lines into Postgres:

events.js:85
      throw er; // Unhandled 'error' event
            ^
error: invalid byte sequence for encoding "UTF8": 0x00
    at Connection.parseE (/Users/ilyabo/Workspace/edge/node_modules/pg/lib/connection.js:534:11)
    at Connection.parseMessage (/Users/ilyabo/Workspace/edge/node_modules/pg/lib/connection.js:361:17)
    at Socket.<anonymous> (/Users/ilyabo/Workspace/edge/node_modules/pg/lib/connection.js:105:22)
    at Socket.emit (events.js:107:17)
    at readableAddChunk (_stream_readable.js:163:16)
    at Socket.Readable.push (_stream_readable.js:126:10)
    at TCP.onread (net.js:529:20)

I use the following code for import:

  var stream = client.query(pgCopy.from(
    "COPY "+table.name+" FROM STDIN WITH CSV HEADER DELIMITER ','"
  ));

  var fileStream = fs.createReadStream(INPUT_DIR + '/' + table.name + '.csv')
  fileStream.on('error', done);
  fileStream.pipe(stream)
    .on('finish', done)
    .on('error', done);

The CSV file itself seems fine. It doesn't appear to have non-ascii characters in it. Importing it directly by specifying the full path in the Postgres query works just fine:

COPY edge_counts FROM '/Users/ilyabo/Workspace/edge.csv' WITH CSV HEADER DELIMITER ',';

Moreover, when I reduce the number of rows in the file to something like 50k (doesn't seem to matter which 50k lines!), the import via pg-copy works too.

What could be the cause of this error?

the efficiency of pg-copy-streams is very low

This library efficiency is relatively low, you can find a reason for it, I just began to use the following form: node xxx.js | PSQL -c "copy" XXX from STDIN"
This efficiency is relatively high, however, when I use the pg-copy-streams library streamer directly to write data, using the ATOP command to view the performance of the system found that IRQ CPU load is high, and then write the speed is very slow.

i use ubuntu 16.04 os.

Streaming between DBs, Uncaught Error (Solved)

I'm streaming data from one database into another. This works great, however if the source table does not exist this results in an uncaught exception. This only happens when streaming to another db. When streaming to stdout the exception is caught as expected.

What am I doing wrong? Can you please advice?

Here is a minimal test case

const Pool = require('pg-pool');
const readPool = new Pool({
    host: 'localhost',
    database: 'readDb',
    user: 'postgres',
    password: 'password',
    port: 5432
});
const writePool = new Pool({
    host: 'localhost',
    database: 'writeDb',
    user: 'postgres',
    password: 'password',
    port: 5432
});
const pgCopy = require('pg-copy-streams');
const copyTo = pgCopy.to;
const copyFrom = pgCopy.from;

readPool.connect(function(readErr, readClient, readDone) {
    const readStream = readClient.query(copyTo('COPY non_existing_table TO STDOUT'));

    writePool.connect(function(writeErr, writeClient, writeDone) {
        const writeStream = writeClient.query(copyFrom('COPY existing_table FROM STDIN'));
        writeStream.on('error', function (err) {
            console.log(err);
            writeDone(err);
            readDone(err);
        });
        readStream.pipe(writeStream);
        readStream.on('end', function () {
            writeDone();
            readDone();
        });
        readStream.on('error', function (err) {
            console.log(err);
            writeDone(err);
            readDone(err);
        });
    });
});

Actually just solved this by moving

const readStream = readClient.query(copyTo('COPY non_existing_table TO STDOUT'));

into the inner connect.

getting TypeError: dest.end is not a function while trying to use a Buffer instead of a file's read stream for copy-from

Hi,

I'm getting error from attempting to use copy-from with a Buffer instead of file. Is this possible? Will I need to write my own Read/Write-er stream object?

TypeError: dest.end is not a function
at PassThrough.onend (_stream_readable.js:498:10)
at PassThrough.g (events.js:273:16)
at emitNone (events.js:85:20)
at PassThrough.emit (events.js:179:7)
at endReadableNT (_stream_readable.js:913:12)
at _combinedTickCallback (node.js:383:13)
at process._tickCallback (node.js:407:11)

And here's my code (sorry can't get markdown to work):

    return pg_pool.connect(function (error, pgClient, pgDone) {

        var streamCopy = pgClient.query(copyFrom('COPY mytable FROM STDIN'));
        // Initiate the source
        console.log('creating bufferStream');
        var bufferStream = new stream.PassThrough();

        console.log('attaching error handler to bufferStream');
        bufferStream.on('error', pgDone);

        console.log('attaching error handler to streamCopy');
        streamCopy.on('error', pgDone);

        console.log('attaching end handler to bufferStream');
        streamCopy.on('end', pgDone);
        //streamCopy.on('finish', function () {
        //    console.log('finish', arguments);
        //});

        fileLogger.log('Start DB insert');
        TimeElapsed.start('db_insert');

        // Write your buffer
        console.log('writing bufferArray[index] to bufferStream');
        bufferStream.end(bufferArray[index]);

        // Pipe it to COPY
        console.log('pipe bufferStream to streamCopy');
        bufferStream.pipe(streamCopy);

    });

I'm getting some data from a feed using sockets, then formatting that data into multivalue INSERTS, accumulating, then putting inside Buffers so that I have an array of Buffer objects. (Now that I think of it should have been individual row inserts, but I think it's not the cause of the error).

Hmm, piping not working

Seems like this has been an issue before, but it may be again. I'm trying the example verbatim:

var pg = require('pg.js');
var copyTo = require('pg-copy-streams').to;

pg.connect(process.env.DATABASE_URL, function(err, client, done) {
    var stream = client.query(process.env.DATABASE_URL, copyTo('COPY  MyTable TO STDOUT'));
    stream.pipe(process.stdout);
    stream.on('end', done);
    stream.on('error', done);
});

where 'MyTable' is a table in my database and the database connection string is in the environment variable 'DATABASE_URL'.

I'm getting the error:

pgtest devtest$ node PGStream.js

/Users/devtest/pgtest/PGStream.js:6
    stream.pipe(process.stdout);
           ^
TypeError: Object [object Object] has no method 'pipe'
    at /Users/devtest/pgtest/PGStream.js:6:9
    at /Users/devtest/node_modules/pg.js/lib/pool.js:71:9
    at /Users/devtest/node_modules/pg.js/node_modules/generic-pool/lib/generic-pool.js:278:11
    at /Users/devtest/node_modules/pg.js/lib/pool.js:45:18
    at null.<anonymous> (/Users/devtest/node_modules/pg.js/lib/client.js:145:7)
    at g (events.js:175:14)
    at EventEmitter.emit (events.js:117:20)
    at Socket.<anonymous> (/Users/devtest/node_modules/pg.js/lib/connection.js:109:12)
    at Socket.EventEmitter.emit (events.js:95:17)
    at Socket.<anonymous> (_stream_readable.js:746:14)

I'm using Postgres.app and the following versions:
Postgres: 9.3.0.0
Node.js: 0.10.18
pg: 3.4.0
pg.js: 3.4.1
pg-copy-streams: 0.2.4

Any idea what might be going wrong? Thanks.

query end doesn't appear to get emitted

Hi, I'm attempting to use the code as prescribed, with a query as so:

return new Promise((resolve, reject) => {
   try {
      let query = "COPY ... FROM STDIN (HEADER true, FORMAT csv, QUOTE '''')";

      let queryStream = client.query(copyFrom(query));
      let fileStream = s3.getObject(s3FileDetails).createReadStream();

      fileStream.on('error', function(error) {
          console.log("Filesteam errored: " + error)
          return reject(error);
      })
      queryStream.on('end', function (response) {
          console.log("queryStream ended: ", response)
          return resolve("success: ", response);
      })

      queryStream.on('error', function (error) {
          console.log("queryStream errored: " + error)
          return reject(error);
      });

      fileStream.pipe(queryStream);
   } catch (err) {
      console.log(err)
      return reject(err)
   }
})

But I am finding that query stream is not emitting "end". I can put an end listener on the file stream, but that would obviously end the connection before the data has been loaded. Is there anything I am obviously doing wrong?

Database instead of table

Hi @brianc,

I have a project where I'm downloading a database backup as a stream and then pipe it to decrypt it, and then pipe to a file. I would love to skip the file step and stream the decrypted SQL directly in to the database.

The backup is created using pg_dump so I have the whole DB structure. In your example you have the following line:

var stream = client.query(copyFrom('COPY my_table FROM STDIN'));

If you have a moment could you explain what this COPY my_table FROM STDIN means? Dose it mean that I have to have the whole DB structure and can only import the data?

Or could I some how have an empty DB, and not only import the data but also the whole structure?

"Read ETIMEDOUT" on long copies.

I'm using both the from and to streams to pipe data around, and I occasionally get a Read ETIMEDOUT error on large copies. Is there anything I can to to make this more robust, beyond adding retry logic?

These show up for copies that take more than an hour. (The underlying query takes ~5 seconds. I'm moving a lot of data around, and it's all going through node.)

Using FORMAT binary might help speed this up, but I'm a little paranoid about using it. How robustly have you tested this module with binary copies, if at all?

CSV not being split into columns

I have a CSV generated by Google Sheets - just regular Comma Separated Values.

Using the basic "pipe from a file to table" example, I'm getting the entire row inserted into the first database column, and then every other column empty.

Can anyone else replicate this?

second example is misleading

In the second example (pipe from a file to table) you attach an event listener to fileStream:

fileStream.on('error', done);

But that doesn't capture errors and an exception is thrown. When I instead attach the listener to the stream object, everything is cool.

I found it's also preferable to listen for an 'end' event on stream because that event isn't emitted in the case of error (fileStream still does emit an 'end' event).

Unable to pipe into gzip stream

Does not work

var stream = client.query(copyTo(XXX));
stream.pipe(zlib.createGzip()).pipe(fs.createWriteStream('out.gz'));

Works great

var stream = client.query(copyTo(XXX));
stream.pipe(fs.createWriteStream('out.gz'));

Any ideas as to why the pg stream won't work with the gzip transform stream?

Handling errors

Hi,
I'm using this library to pipe large csvs to db. It's been good so far, but It is not clear to me how to handle errors.

Basically I have a fileStream and a dbStream, first one reads from a csv, second one pipes to db

const dbStream = await client.query(from(someQuery))
fileStream.pipe(dbStream)

If fileStream is sucessful everything works out fine. However, if fileStream has an error (for example a timeout if file is over ftp), then dbStream remains dangling. If I connect to postres the query is marked as active. I've tried, dbStream.emit('error', e), or dbStream.destroy() to no avail.

Any idea?

One option would be to open a different connection to the db to do pg_cancel_backend but it really doesn't seem like the way.

Thanks

documentation note?

It would seem from the postgres doc that wrapping in transaction might not be sufficient (or necessary?) to undo the effect of an error in copyFrom:

COPY stops operation at the first error. This should not lead to problems in the event of a COPY TO, but the target table will already have received earlier rows in a COPY FROM. These rows will not be visible or accessible, but they still occupy disk space. This might amount to a considerable amount of wasted disk space if the failure happened well into a large copy operation. You might wish to invoke VACUUM to recover the wasted space.

Perhaps this would be good to note in the documentation (right around the "half-borked" part :))?

Having trouble getting "from" to work.

Really cool module! Excited to get it working.

I'm using the following code to play with from:

pg = require 'pg'
{from, to} = require 'pg-copy-streams'

pg.connect 'tcp://user:pw@host/dbname', (err, client, done) ->
  return console.log "Error!" if err
  client.query(from("COPY awesome_table_name FROM STDIN"))

I get the following stacktrace:

/usr/local/heap/node_modules/pg/lib/client.js:110
      self.activeQuery.handleCopyInResponse(self.connection);
                       ^
TypeError: Object [object Object] has no method 'handleCopyInResponse'
    at null.<anonymous> (/usr/local/heap/node_modules/pg/lib/client.js:110:24)
    at EventEmitter.emit (events.js:95:17)
    at null.<anonymous> (/usr/local/heap/node_modules/pg/lib/connection.js:97:12)
    at Socket.EventEmitter.emit (events.js:95:17)
    at Socket.<anonymous> (_stream_readable.js:746:14)
    at Socket.EventEmitter.emit (events.js:92:17)
    at emitReadable_ (_stream_readable.js:408:10)
    at emitReadable (_stream_readable.js:404:5)
    at readableAddChunk (_stream_readable.js:165:9)
    at Socket.Readable.push (_stream_readable.js:127:10)
    at TCP.onread (net.js:526:21)

I'm using pg version 2.8.2, though I get the same error with 2.8.3, with pg-query-stream 0.2.0. The connection string used above works correctly, and I can run queries with it.

Am I doing something silly? If not, what other info would be helpful in debugging this?

Unexpected PostgreSQL message

Hi every one!

I'm tryng do COPY from the sql. A file is generated, but i get error below:

Error: Unexpected PostgreSQL message Z
index.js:45
at CopyStreamQuery._transform (/home/fredericosilva/Projetos/pg-copy/node_modules/pg-copy-streams/copy-to.js:78:28)
at CopyStreamQuery.Transform._read (_stream_transform.js:186:10)
at CopyStreamQuery.Transform._write (_stream_transform.js:174:12)
at doWrite (_stream_writable.js:396:12)
at writeOrBuffer (_stream_writable.js:382:5)
at CopyStreamQuery.Writable.write (_stream_writable.js:290:11)
at Socket.ondata (_stream_readable.js:639:20)
at emitOne (events.js:121:20)
at Socket.emit (events.js:211:7)
at addChunk (_stream_readable.js:263:12)

My code is this:
client.connect(function(err, client, done) { var stream = client.query(copyTo(COPY (${config.select}) TO '${config.out_dir}' DELIMITER ';' CSV`));
stream.pipe(process.stdout);

stream.on('end', () =>{
console.log(Completed loading data into ${config.out_dir});
client.end();
});
stream.on('error', (e) => {
console.log(e.stack);
client.end();
});
});`

I saw that this is a error generic of the pg-copy-streams.
Someone can help me?

flush() support?

Hi,

We have a data feed that sends 6-10 data packages per sec. I wanted to leverage the power of COPY to insert the (preprocessed) data into the database. Unfortunately failed so far.

What I'd like to achieve is to set up some sort of limit (let's say 1000 row event or data package) and when the limit is reached the stream would fire the query to the DB and clean up itself.

I've tried to use stream.end() and then recreate stream, but it does not really work and it is far from being an elegant solution.

Is there any way to flush the stream? I've found the _flush() private method, but that is not for this purpose I guess.

Cheers

Question: how to handle constraints?

My db has foreign key constraints. I am using csv export format. Problem is that table A has reference to table B and it is alphabetically before B thus import is failing. Is there eloquent workarounds? Other options to use? I am using export and import a lot so manually adding numbers to overcome alphabetic order is not an option.

Usage with knex.

Hi,
I'm trying to return a stream of a csv from a table.

I have knex on my project, and can't find a way to use this module with is.

getStream() {
    const client = this.knex./* Problem here: how to access to the pg client showed in the readme */

    return client.query(to(`COPY "${this.table}" TO STDOUT WITH CSV HEADER`))
  }

I've tried and search everything I've found, without success.
I've tried with knex.query, knex.client.query seen some things with knex.client.Runner, knex.client.pool.aquire, but didn't succeed to manage it.

I'm also looking for general documentation about this knex "client" object, because I can't find anything on documentation, and don't know where to find it.

Thank you very much,
Matt'.

CopyTo: `end` event does not guarantee stream is flushed

I have the following code, testing the file size when copying from table to file:

var pg = require('pg');
var pgTo = require('pg-copy-streams').to;
var fs = require('fs');

   var client = new pg.Client(connString);
    client.connect(function (err) {

        var _cb = function(what) {
            console.log(what, 'file size:', fs.statSync(outFile).size);
        };

        var tmpStream = fs.createWriteStream(outFile);
        tmpStream.on('end', function () {
            _cb("tmpStream.on('end')");
        });

        tmpStream.on('finish', function () {
            _cb("tmpStream.on('finish')");
        });
        
        var srcStream = client.query(pgTo(sql));
        srcStream.on('error', function (err) {
            _cb("srcStream.on('error')");
        });
        srcStream.on('end', function () {
            _cb("srcStream.on('end')");

            //this is only to check the file size a bit later, too
            process.nextTick(function () {
                _cb("srcStream.on('end') second call");
            });
        });

        srcStream.pipe(tmpStream);
    });

My problem is that at the moment when srcStream.on('end') is called (recommended for use, rather than tmpStream.on('finish'), the file is not always ready yet. Actually only at tmpStream.on('finish') it is ready. Sample output:

srcStream.on('end') file size: 755020
srcStream.on('end') second call file size: 755020
#the correct file size:
tmpStream.on('finish') file size: 762760 

Another example (exporting different table) is even worse, because the file size is 0:

srcStream.on('end') file size: 0
srcStream.on('end') second call file size: 0
tmpStream.on('finish') file size: 10889

Hangs during repeated use

I am recursively calling a method that issues a COPY TO query. The first time always works, but the second time just hangs there, nothing happening. I can provide a code sample if needed.

Delimiters specs

Hi,
using node -v [8.1.4]
below is my codebase


var stream = client.query(copyFrom('COPY srt(starttime) FROM STDIN '));

console.log(data)


var internmap = through2.obj(function(arr, enc, cb) {
   var rowText = arr.join('\t') + '\n'
console.log(rowText);
   cb(null, rowText)
 })
internmap.write(data);

internmap.pipe(stream)


the issue which i am facing is below
copyto

even tried putting tabs on
with below codebase
both

the issues seems to with delimiter specs
any pointer on where should i define.
Ideally \t tabs is by default applied , but in my case it is not working.
Any pointers , suggestions on how to resolve this !

.to returns a writable stream

const copyTo = require('pg-copy-streams').to

const csvStream = pg.query(exports.copyTo(dedent`
  COPY (
    SELECT *
    FROM some_table
  ) TO STDOUT
  WITH CSV DELIMITER ','
  HEADER;
`))

console.log(csvStream.writable, csvStream.readable) // true, true

Most of the libraries out there rely on those flags, I have to manually set it to
csvStream.writable = false to make it work!

piping won't work

I am trying the example almost verbatim:

   pg = require('pg').native
   copyFrom = require('pg-copy-streams').from

   pg.connect pgURL, (err, pgClient, done) ->

      to = pgClient.query copyFrom 'Copy sf_xxx From Stdin'
      fileStream = fs.createReadStream('/tmp/dump.csv')
      fileStream.pipe(to)

The only difference is that I require('pg').native.
I am getting

_stream_readable.js:583
    var written = dest.write(chunk);
                       ^
TypeError: Object [object Object] has no method 'write'
  at write (_stream_readable.js:583:24)
  at flow (_stream_readable.js:592:7)
  at ReadStream.pipeOnReadable (_stream_readable.js:624:5)
  at ReadStream.EventEmitter.emit (events.js:92:17)
  at emitReadable_ (_stream_readable.js:408:10)
  at emitReadable (_stream_readable.js:404:5)
  at readableAddChunk (_stream_readable.js:165:9)
  at ReadStream.Readable.push (_stream_readable.js:127:10)
  at onread (fs.js:1563:12)
  at Object.wrapper [as oncomplete] (fs.js:454:17)

cb_flush called before initialization

There appears to be a condition that can arise where the handleCommandComplete callback is called before the first time _flush is called. This causes the below exception to occur:

TypeError: this.cb_flush is not a function
	at CopyStreamQuery.handleCommandComplete (./node_modules/pg-copy-streams/index.js:67:8)
	at Connection.<anonymous> (./node_modules/pg/lib/client.js:132:24)
	at emitOne (events.js:96:13)
	at Connection.emit (events.js:188:7)
	at Socket.<anonymous> (./node_modules/pg/lib/connection.js:121:12)
	at emitOne (events.js:96:13)
	at Socket.emit (events.js:188:7)
	at readableAddChunk (_stream_readable.js:176:18)
	at Socket.Readable.push (_stream_readable.js:134:10)
	at TCP.onread (net.js:548:20)

I added a little logging to confirm:

screen shot 2016-10-31 at 5 45 58 pm

I'm having a hard time figuring out exactly what's causing it, though it seems like larger input files tend to trigger it. Here's the SQL statement I'm using:

BEGIN;
TRUNCATE locations RESTART IDENTITY;
COPY my_table(my_fields) FROM STDIN CSV;
COMMIT;

Is it possible the transaction block is causing this? I'm on the latest versions of both pg and this module as of writing.

Edit: node version 6.9.1

cant get it to work.

var copyQuery = ['COPY ', params.tableName, ' TO STDOUT CSV HEADER'].join('');
pgStream = client.query(copyTo(copyQuery));
pgStream.pipe(process.stdout);

{ text: 'COPY tablename TO STDOUT CSV HEADER',
values: undefined,
rows: undefined,
types: undefined,
name: undefined,
binary: undefined,
stream: undefined,
portal: '',
callback: undefined,
_fieldNames: [],
_fieldConverters: [],
_result: { command: null, rowCount: null, oid: null, rows: [] },
isPreparedStatement: false,
_canceledDueToError: false,
domain: null,
_events: {},
_maxListeners: 10 }

TypeError: Object [object Object] has no method 'pipe'

Non-descriptive error messages

If the output of a COPY...TO STDOUT stream contains an error message (e.g. "Ran out of disk space"), then an Error: expected "d" (copydata message) will get emitted, regardless of the type of error in the source stream.

Preferably, this would be detected, and the actual error would get emitted instead of a parsing error.

Permissions to run this on the DB

From the look of things this module is just wrapping the native COPY into a stream right? and has nothing to do with the psql (cli) \copy

I ask because, native COPY requires superuser privileges to run where the cli version does not.

If i need the \copy version am i better off finding some cli interface for node?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.