Giter Club home page Giter Club logo

pgcopy's Introduction

pgcopy

https://github.com/altaurog/pgcopy/actions/workflows/test.yaml/badge.svg?branch=master https://coveralls.io/repos/github/altaurog/pgcopy/badge.svg?branch=master

Use pgcopy for fast data loading into PostgreSQL with binary copy.

Features

  • Support for many data types
  • Support for multi-dimensional array types
  • Support for schema and schema search path
  • Support for mixed-case table and column names
  • Transparent string encoding
  • Utility for replacing entire table

Quickstart

from datetime import datetime
from pgcopy import CopyManager
import psycopg2
cols = ('id', 'timestamp', 'location', 'temperature')
now = datetime.now()
records = [
        (0, now, 'Jerusalem', 72.2),
        (1, now, 'New York', 75.6),
        (2, now, 'Moscow', 54.3),
    ]
conn = psycopg2.connect(database='weather_db')
mgr = CopyManager(conn, 'measurements_table', cols)
mgr.copy(records)

# don't forget to commit!
conn.commit()

Supported datatypes

pgcopy supports the following PostgreSQL scalar types:

  • bool
  • smallint
  • integer
  • bigint
  • real
  • double precision
  • char
  • varchar
  • text
  • bytea
  • enum types
  • date
  • time
  • timestamp
  • timestamp with time zone
  • numeric
  • json
  • jsonb
  • uuid
  • arrays
  • vector

Documentation

Read the docs.

See Also

cpgcopy, a Cython implementation, about twice as fast.

pgcopy's People

Contributors

akshayeshenoi avatar altaurog avatar dependabot[bot] avatar johnbachman avatar mastak avatar nathanglover avatar samuelmarks avatar wburklund avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

pgcopy's Issues

Support for enums?

Hi altaurog,

First of all thanks for this great project. One thing I'm wondering is if pgcopy supports enums (or even better, custom types that are enums). If not, any tips on where I can modify to make it work?

Thanks
nerrons

Help debugging, meaningless error message

Thank you for all the help, I'm using your solution all the time!

My code fails in the current project on the following line (worked in many other projects):
worker.copy(row, BytesIO)

And the error that I get is:
ERROR: Something went wrong with XXXXXX table.

I tried reviewing all the code and PostgreSQL logs and I can't find anything.

Any recommendation, where or what to look for?

Thanks!

pd.timedelta support

Hi, I'd like to have ability to inserting pd.timedelta data into pg tables using pgcopy, as I understand it maps to interval pg dtype. Is there any known issues to implement it?

Question about array types

Hi, I have a very quick question for you that I wasn't able to answer myself after briefly going through the (tiny) codebase. I assume the answer is 'yes' but is it correct that pgcopy supports arrays of all of the types it supports?

I read the documentation, did a repository wide search for 'array' and did a really fast browse through the code (I've written a similar lib that supports arrays via COPY FROM CSV (it is a total hack, the array part at least, since there shouldn't be ARRAYS in CSV columns, but eh, I needed it, and it works he...

Anyway, before I go down the road of throwing away my code, can you confirm that array support is built-in for each type?

Thanks, sorry for the most ignorant issue I have ever entered on github ;)

numpy support? - `np.int64`, `np.array`

Feature would be most appreciated :)

EDIT: https://github.com/altaurog/pgcopy/blob/583ee45/pgcopy/copy.py#L337

    def writestream(self, data, datastream):
        datastream.write(BINCOPY_HEADER)
        count = len(self.cols)
        for record in data:
            fmt = [">h"]
            rdat = [count]
            for formatter, val in zip(self.formatters, record):
                if type(val).__module__ == 'numpy':
                    datastream.write(val.tobytes())
                else:
                    f, d = formatter(val)
                    fmt.append(f)
                    rdat.extend(d)
            datastream.write(struct.pack("".join(fmt), *rdat))
        datastream.write(BINCOPY_TRAILER)

Something like that anyway. The issue is the struct.pack wants the full struct and won't let me just add some bytes in the middle of what it wants to parse/emit. @altaurog any ideas?

`Replace` with table in schema other than public

Hi,

We're using pgcopy and we would need to use the class Replace but with a table in another schema.

It fails at multiple place, including https://github.com/altaurog/pgcopy/blob/master/pgcopy/util.py#L125 and alike because of the double quotes.

Indeed, the same command without the double quotes works because postgresql can understand the schema. Otherwise, it's trying to find a table with the schema in its name in the public schema.

Another way is to do "schema"."table_name"

Could you find a way to fix that in general in pgcopy?

Thanks a lot in advance!

Cheers,

Raphaël

Loss of digits for numeric data?

Hi,

I think that I found a problem when handling numeric datatypes.

In my application I need to insert large numbers. If I insert decimal.Decimal('1456312800000000') then this gets corrupted to 14563128 in the database. However inserting decimal.Decimal('1456312800000001') works.

I narrowed this down to line 75 in copy.py (https://github.com/altaurog/pgcopy/blob/master/pgcopy/copy.py#L75) where the logic is deleting the 0's at the end of the first number without taking this into account anywhere. In fact when I remove lines 75-78 pgcopy is sending the correct number across.

I'm however far from certain that this is correct as I don't understand the transformation logic fully.

Example with test file:

$ cat test.py

import psycopg2
import psycopg2.extras
import decimal
import pgcopy
import os

def records():
    yield (1,
           decimal.Decimal('1456312800000000'))
    yield (2,
           decimal.Decimal('1456312800000001'))


db_host = os.environ['DB_HOST']
db_username = os.environ['DB_USER']
db_password = os.environ['DB_PASSWORD']
db_db = os.environ['DB_DB']

with psycopg2.connect(f"sslmode=disable dbname={db_db} user={db_username} host={db_host} port=5432 password={db_password}") as conn:
        with conn.cursor(
                cursor_factory=psycopg2.extras.NamedTupleCursor) as cur:
                cur.execute("DROP TABLE log_test")
                print(f"deleted table notices: '{conn.notices}' status '{conn.status}'")
                cur.execute(
                    "CREATE TABLE log_test (id INTEGER, time NUMERIC);")
                print(f"created table notices: '{conn.notices}' status '{conn.status}'")
                cols = ['id',
                        'time']
                cm = pgcopy.CopyManager(conn, 'log_test', cols)
                cm.copy(records())
                print(
                    f"added data: '{conn.notices}' status '{conn.status}'")

$ python ./test.py
deleted table notices: '[]' status '2'
created table notices: '[]' status '2'
added data: '[]' status '2'
$ psql
psql (9.6.4)
Type "help" for help.

FSM=> select * from log_test;
id | time
----+------------------
1 | 14563128
2 | 1456312800000001
(2 rows)

Best regards, Peter

Does the `Replace` class re-create the hyptertable?

Hi, I am using pgcopy v1.5.0. Thank you for such a good tool.

But I am wondering if the Replace class re-create the hypertable since I am using TimeScaleDB. As far as I can see, the record in _timescaledb_catalog.hypertable will be deleted once the table has been replaced.

And if I use select create_hypertable(tablename, by_range('time'), migrate_data=>true);, it seems to work fine. But that tablename would not appear in _timescaledb_catalog.hypertable still. Has it converted into a hypertable, or not?

It seems to me that the Replace class should build back an identical table, or does it still not support hyptertable in TimeScaleDB yet?(only normal table in postgres?)

Thank you.

error: argument for 's' must be a bytes object

@altaurog Can you please help me check if I am missing something obvious. Any help is appreciated.

I had 16 columns in tables where I am copying to and most of column type is text except two and I get the below error when I have atleast 9 values defined as string. But if I have 8 values as string I am able to execute writestream function without any problem. Down the line I will have issues as I gave only 8 values, but atleast I can go beyond this function. If I gave 16 integers or None this writestream function is executing fine.

error                                     Traceback (most recent call last)
[<ipython-input-47-f54eec967a67>](https://localhost:8080/#) in <cell line: 173>()
    173 while True:
    174   get_data_from_skuspotpricehistory()
--> 175   insert_into_table()
    176   if skuspotpricehistory[0]['$skipToken'] == 'null':
    177     break

2 frames
[<ipython-input-47-f54eec967a67>](https://localhost:8080/#) in insert_into_table()
    128   records = [('None','None','None','None','None','None','None','None','None'),]
    129   #records = [(1,2,3,4,5,6,7,8,9,10,11),]
--> 130   mgr.copy(records)
    131 
    132     # query = """

[/usr/local/lib/python3.10/dist-packages/pgcopy/copy.py](https://localhost:8080/#) in copy(self, data, fobject_factory)
    313         """
    314         datastream = fobject_factory()
--> 315         self.writestream(data, datastream)
    316         datastream.seek(0)
    317         self.copystream(datastream)

[/usr/local/lib/python3.10/dist-packages/pgcopy/copy.py](https://localhost:8080/#) in writestream(self, data, datastream)
    344                 f, d = formatter(val)
    345                 fmt.append(f)
--> 346                 rdat.extend(d)
    347             datastream.write(struct.pack("".join(fmt), *rdat))
    348         datastream.write(BINCOPY_TRAILER)

error: argument for 's' must be a bytes object

Support for VARCHAR with no size restriction

Hi, thanks for this great resource!

I have tables with several VARCHAR fields with no size restriction. As I understand this is a Postgres-specific extension that allows the fields to act like TEXT under the hood.

https://stackoverflow.com/questions/4848964/postgresql-difference-between-text-and-varchar-character-varying

When using pgcopy, I found that the values inserted into these fields were getting truncated (which showed up as a violated uniqueness constraint). Looking into the code, I saw that what was happening was that the inspect.get_types() function was returning -1 for the attypmod, which was getting passed to the maxsize_formatter as the length, and subsequently stripping off the last character in every value. Obviously, I could change the definition of these fields to TEXT (and I still may) but I thought that pgcopy should probably support these types of fields. Will file a simple PR shortly.

struct.error on Python 3.8.6

When I'm inserting an iterable (list of lists) to my table, I get this error. Below this error are the lines missing from the traceback (because of Cython), and below those are the datatypes being used in the copy command. I get this error with mgr.copy, and mgr.threading_copy. I know that the iterable is correct, as using execute_batch with psycopg2 on the data works fine.

  File "cboe_postgres.pyx", line 18, in cboe_postgres.run
  File "cboe_postgres.pyx", line 89, in cboe_postgres.run
  File "cboe_postgres.pyx", line 114, in cboe_postgres.run
  File "/root/.pyenv/versions/3.8.6/lib/python3.8/site-packages/pgcopy/copy.py", line 290, in copy
    self.writestream(data, datastream)
  File "/root/.pyenv/versions/3.8.6/lib/python3.8/site-packages/pgcopy/copy.py", line 321, in writestream
    datastream.write(struct.pack(''.join(fmt), *rdat))
struct.error: required argument is not a float
L18    cpdef run(bint start_from_top=True):
L89        with open(path, "r") as f: # idk why, but opening any file in cython causes it to be added to the traceback?
L114                    mgr.threading_copy(rows) # mgr.copy(rows) also fails
        CREATE TABLE Day(
        row_id integer,
        undersymbol text,
        date_id date,
        ticker text,
        expiration text,
        strike real,
        type text,
        open real,
        high real,
        low real,
        close real,
        volume integer,
        iv real,
        delta real,
        day_avgs integer[] DEFAULT '{}',
        iv_chg real DEFAULT 0,
        avg_iv real DEFAULT 0,
        prev_avg_iv real DEFAULT 0,
        spread real DEFAULT 0,
        PRIMARY KEY(row_id)
        )

These are the values for fmt, ''.join(fmt), and *rdat, in case that helps at all:

fmt: ['>h', 'ii', 'i1s', 'ii', 'i1s', 'i10s', 'if', 'i1s', 'if', 'if', 'if', 'if', 'ii', 'if', 'if']

fmt str: >hiii1siii1si10sifi1sififififiiifif

rdat: 14 4 0 1 b'A' 4 7492 1 b'A' 10 b'2020-07-17' 4 40.000 1 b'C' 4 0.0000 4 0.0000 4 0.0000 4 0.0000 4 0 4 2.0359 4 0.9928

Add Time as a Supported DataType

How hard would it to be add time as a data type? I imagine not too hard because there is already timestamp and date?

If it is simple I could open a PR, which just need a bit of instruction on where to start. I imagine it would be adding to the type_formatters dictionary in copy.py with a formatter method?

`ValueError: error formatting value` when inserting JSON array (`List[dict]`) into `json[]` column

Traceback (most recent call last):
  […]
    mgr.copy(data_iter)
  File "pgcopy/pgcopy/copy.py", line 295, in copy
    self.writestream(data, datastream)
  File "pgcopy/pgcopy/copy.py", line 323, in writestream
    f, d = formatter(val)
  File "pgcopy/pgcopy/copy.py", line 136, in <lambda>
    return lambda v: ('i', (-1,)) if v is None else formatter(v)
  File "pgcopy/pgcopy/copy.py", line 214, in f
    errors.raise_from(ValueError, message, exc)
  File "pgcopy/pgcopy/errors/py3.py", line 9, in raise_from
    raise exccls(message) from exc
ValueError: error formatting value [{'thekey': None, 'text': 'stuff'}] for column json_arr_col
CREATE TABLE my_table (
    json_arr_col json[],
    json_col json,
    vchar20_col VARCHAR(20),
    id integer generated by default as identity primary key
);

Make psycopg2 an optional requirement

Would it be possible to make psycopg2 an optional requirement?

I have a project using psycopg2-binary and (afaik) pgcopy works just fine w/ this installation except that it triggers a psycopg2 source build which is not necessary and also can't be built on the deployment infrastructure (no dev libraries available).

Having it as an optional requirement would allow easy usage for both psycopg2-binary and psycopg2 in deployments.

(note: can't use the recommended approach to pin an older version)

Formatting error for `uuid` columns

Facing Value Error while formatting uuid columns.

ValueError: error formatting value 3bb8c98d-6cb0-4c2c-9cef-7a702cb63268 for column uuid

Lost files

In the installation package of version 1.4.2, the 'errors' folder and its files are missing, which causes an error in line 17 of copy.py:

from . import errors, inspect, util

Serialize None to null for numeric fields ?

This is a question rather than an issue.
I saw in the docs that:

PostgreSQL numeric does not support Decimal('Inf') or Decimal('-Inf'). pgcopy serializes these as NaN.

However I can usually set null values in postgres using Js by passing null or empty values.
Is there any way to replace NaNs with null in python pgcopy ?

I have tried :
Decimal(row["my_field"]) if 'my_field' in row else None,
and :
Decimal(row["my_field"]) if 'my_field' in row else 'NULL',

Both end up being interpreted as NaNs.

struct.error: required argument is not an integer

Using your library to batch insert data from Parquet files into PostgreSQL, prepared like so:

schema = pa.schema(
    [
        pa.field("timestamp_col", pa.timestamp("ms", tz="UTC")),
        pa.field("json_col", pa.struct([("can", pa.string())])),
        pa.field("array_str_col", pa.list_(pa.string())),
        pa.field("array_bigint_col", pa.list_(pa.int64())),
        pa.field(
            "array_json_col",
            pa.list_(pa.struct([("foo", pa.string()), ("can", pa.string())])),
        ),
    ]
)
row = pa.Table.from_pydict(
    {
        "timestamp_col": [datetime.utcnow().timestamp()],
        "json_col": [{"can": "haz"}],
        "array_str_col": [["Flamingo", "Centipede"]],
        "array_bigint_col": [np.arange(6, dtype=np.int64)],
        "array_json_col": [[{"foo": "bar"}, {"can": "haz"}]],
    },
    schema=schema,
)

Getting this weird error:

Error
Traceback (most recent call last):
  File "lib/python3.11/site-packages/pgcopy/copy.py", line 210, in f
    return formatter(v)
           ^^^^^^^^^^^^
  File "lib/python3.11/site-packages/pgcopy/copy.py", line 179, in <lambda>
    return lambda v: array_formatter(att.typelem, formatter, v)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "lib/python3.11/site-packages/pgcopy/copy.py", line 162, in array_formatter
    return str_formatter(struct.pack(''.join(fmt), *data))
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
struct.error: required argument is not an integer

The above exception was the direct cause of the following exception:

For completeness here are the related functions:

pq.write_table(row, parquet_filepath)
parquet_to_table(parquet_filepath, table_name=inspect.stack()[0].function)

def parquet_to_table(filename, table_name=None, database_uri=None, dry_run=False):
    """
    Parquet file to an executable insertion `COPY FROM` into a table

    :param filename: Path to a Parquet file
    :type filename: ```str```

    :param table_name: Table name to use, else use penultimate underscore surrounding word form filename basename
    :type table_name: ```Optional[str]```

    :param database_uri: Database connection string. Defaults to `RDBMS_URI` in your env vars.
    :type database_uri: ```Optional[str]```
    """
    parquet_file = ParquetFile(filename)
    engine = create_engine(
        environ["RDBMS_URI"] if database_uri is None else database_uri
    )

    deque(
        map(
            lambda df: df.to_sql(
                table_name,
                con=engine,
                if_exists="append",
                method=psql_insert_copy,
                index=False,
            ),
            map(methodcaller("to_pandas"), parquet_file.iter_batches()),
        ),
        maxlen=0,
    )

def psql_insert_copy(table, conn, keys, data_iter):
    """
    Execute SQL statement inserting data

    Parameters
    ----------
    table : pandas.io.sql.SQLTable
    conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection
    keys : list of str
        Column names
    data_iter : Iterable that iterates the values to be inserted
    """

    columns = keys[1:] if keys and keys[0] == "index" else keys

    mgr = CopyManager(conn.connection, table.name, columns)
    mgr.copy(map(lambda line: map(parse_col, line), data_iter))
    conn.connection.commit()

doesn’t handle python unicode strings

mgr = CopyManager(self.cnx, table, champs)
list_rows = [(0, 'Jerusalem'), (1, 'New York'), (2, 'Moscow'),]
mgr.copy(list_rows)

===> error : struct.error: argument for 's' must be a bytes object

ImportError: cannot import name CopyManager

Hi,

I am using the pgcopy module with python 2.7 and install it via using following command i.e
pip install pgcopy

System setups:-
OS:- Ubuntu 14.04
Python version: 2.7.6
virtualenv: virtualenv
RAM: 16GB

When I try to run the provided example I am getting the above error each and every time.

Traceback (most recent call last):
  File "bulk.py", line 2, in <module>
    from pgcopy import CopyManager
  File "/var/www/ansible-platform/python-bulk-insert/pgcopy.py", line 7, in <module>
    from pgcopy import CopyManager
ImportError: cannot import name CopyManager

I am also attaching a sample python script which I am using to run it under the isolated environment.
Python script

Please let know I am doing any mistake.

Looking forward your response
Thanks

add numeric datatype

def split_by(number, length = 4):
	data = [ int(number[-length-once:len(number) - once]) for once in range(0, len(number), length)]
	data.reverse()
	return data

def numeric_formater(val):
	val = str(val)
	try:
		dots = val.index('.')
		data_base = [(2, 'h', once) for once in split_by(val[:dots], 4)]
		dots_based = "{0}{1}".format(val[dots+1:], '0'*(4 - len(val[dots+1:])%4))
		data_dots = [(2, 'h', once) for once in split_by(dots_based, 4)]
		data = data_base + data_dots
		dots_positions = len(val) - dots - 1
	except ValueError:
		data = [(2, 'h', once) for once in split_by(val, 4)]
		data_base = data
		# dots = None
		dots_positions = None

	header = [
		 (2, 'h', len(data))
		,(2, 'h', len(data_base) - 1)
		,(4, 'i', 0 if not dots_positions else dots_positions)
	]
	data = header + data
	count = sum([once[0] for once in data])
	data.insert(0, (4, 'i', count))
	format = ''.join([once[1] for once in data])
	return (format, [once[2] for once in data] )

take from https://doxygen.postgresql.org/backend_2utils_2adt_2numeric_8c.html#a57a8f8ab552bae24926d252180956958

replace Temp file to StringIO

	def copy(self, data):
		datastream = StringIO.StringIO()
		self.writestream(data, datastream)
		datastream.seek(0)
		self.copystream(datastream)
		datastream.close()

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.