Giter Club home page Giter Club logo

pipelinewise-tap-postgres's Introduction

pipelinewise-tap-postgres

PyPI version PyPI - Python Version License: MIT

Singer tap that extracts data from a PostgreSQL database and produces JSON-formatted data following the Singer spec.

This is a PipelineWise compatible tap connector.

How to use it

The recommended method of running this tap is to use it from PipelineWise. When running it from PipelineWise you don't need to configure this tap with JSON files and most of things are automated. Please check the related documentation at Tap Postgres

If you want to run this Singer Tap independently please read further.

Install and Run

First, make sure Python 3 is installed on your system or follow these installation instructions for Mac or Ubuntu.

It's recommended to use a virtualenv:

  python3 -m venv venv
  pip install pipelinewise-tap-postgres

or

  make venv

Create a config.json

{
  "host": "localhost",
  "port": 5432,
  "user": "postgres",
  "password": "secret",
  "dbname": "db"
}

These are the same basic configuration properties used by the PostgreSQL command-line client (psql).

Full list of options in config.json:

Property Type Required? Default Description
host String Yes - PostgreSQL host
port Integer Yes - PostgreSQL port
user String Yes - PostgreSQL user
password String Yes - PostgreSQL password
dbname String Yes - PostgreSQL database name
filter_schemas String No None Comma separated schema names to scan only the required schemas to improve the performance of data extraction.
ssl String No None If set to "true" then use SSL via postgres sslmode require option. If the server does not accept SSL connections or the client certificate is not recognized the connection will fail.
logical_poll_total_seconds Integer No 10800 Stop running the tap when no data received from wal after certain number of seconds.
break_at_end_lsn Boolean No true Stop running the tap if the newly received lsn is after the max lsn that was detected when the tap started.
max_run_seconds Integer No 43200 Stop running the tap after certain number of seconds.
debug_lsn String No None If set to "true" then add _sdc_lsn property to the singer messages to debug postgres LSN position in the WAL stream.
tap_id String No None ID of the pipeline/tap
itersize Integer No 20000 Size of PG cursor iterator when doing INCREMENTAL or FULL_TABLE
default_replication_method String No None Default replication method to use when no one is provided in the catalog (Values: LOG_BASED, INCREMENTAL or FULL_TABLE)
use_secondary Boolean No False Use a database replica for INCREMENTAL and FULL_TABLE replication
secondary_host String No - PostgreSQL Replica host (required if use_secondary is True)
secondary_port Integer No - PostgreSQL Replica port (required if use_secondary is True)
limit Integer No None Adds a limit to INCREMENTAL queries to limit the number of records returns per run

Run the tap in Discovery Mode

tap-postgres --config config.json --discover                # Should dump a Catalog to stdout
tap-postgres --config config.json --discover > catalog.json # Capture the Catalog

Add Metadata to the Catalog

Each entry under the Catalog's "stream" key will need the following metadata:

{
  "streams": [
    {
      "stream_name": "my_topic"
      "metadata": [{
        "breadcrumb": [],
        "metadata": {
          "selected": true,
          "replication-method": "LOG_BASED",
        }
      }]
    }
  ]
}

The replication method can be one of FULL_TABLE, INCREMENTAL or LOG_BASED.

Note: Log based replication requires a few adjustments in the source postgres database, please read further for more information.

Run the tap in Sync Mode

tap-postgres --config config.json --catalog catalog.json

The tap will write bookmarks to stdout which can be captured and passed as an optional --state state.json parameter to the tap for the next sync.

Log Based replication requirements

  • PostgreSQL databases running PostgreSQL versions 9.4.x or greater. To avoid a critical PostgreSQL bug, use at least one of the following minor versions:

    • PostgreSQL 12.0
    • PostgreSQL 11.2
    • PostgreSQL 10.7
    • PostgreSQL 9.6.12
    • PostgreSQL 9.5.16
    • PostgreSQL 9.4.21
  • A connection to the master instance. Log-based replication will only work by connecting to the master instance.

  • wal2json plugin: To use Log Based for your PostgreSQL integration, you must install the wal2json plugin version >= 2.3. The wal2json plugin outputs JSON objects for logical decoding, which the tap then uses to perform Log-based Replication. Steps for installing the plugin vary depending on your operating system. Instructions for each operating system type are in the wal2json’s GitHub repository:

  • postgres config file: Locate the database configuration file (usually postgresql.conf) and define the parameters as follows:

    wal_level=logical
    max_replication_slots=5
    max_wal_senders=5
    

    Restart your PostgreSQL service to ensure the changes take effect.

    Note: For max_replication_slots and max_wal_senders, we’re defaulting to a value of 5. This should be sufficient unless you have a large number of read replicas connected to the master instance.

  • Existing replication slot: Log based replication requires a dedicated logical replication slot. In PostgreSQL, a logical replication slot represents a stream of database changes that can then be replayed to a client in the order they were made on the original server. Each slot streams a sequence of changes from a single database.

    Login to the master instance as a superuser and using the wal2json plugin, create a logical replication slot:

      SELECT *
      FROM pg_create_logical_replication_slot('pipelinewise_<database_name>', 'wal2json');
    

    Note: Replication slots are specific to a given database in a cluster. If you want to connect multiple databases - whether in one integration or several - you’ll need to create a replication slot for each database.

To run tests:

  1. Install python test dependencies in a virtual env:
 make venv
  1. You need to have a postgres database to run the tests and export its credentials.

You can make use of the local docker-compose to spin up a test database by running make start_db

Test objects will be created in the postgres database.

  1. To run the unit tests:
  make unit_test
  1. To run the integration tests:
  make integration_test

To run pylint:

Install python dependencies and run python linter

  make venv
  make pylint

pipelinewise-tap-postgres's People

Contributors

alastairstuart avatar amofakhar avatar davesgonechina avatar deanmorin avatar dependabot[bot] avatar dmosorast avatar dsprayberry avatar fuzzkat avatar hgrsd avatar ivan-transferwise avatar jeet-parekh-wise avatar judahrand avatar kasparg avatar koszti avatar louis-pie avatar mattsax avatar mdelaurentis avatar mogest avatar mostlyamiable avatar psantacl avatar samira-el avatar steap avatar tayloramurphy avatar tw-sec avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pipelinewise-tap-postgres's Issues

Syntax error on fully qualified table name

Describe the bug
A clear and concise description of what the bug is.

We're using this tap through Meltano and getting the following:

syntax error at or near "'"public"."some_table"'"
LINE 2:                                     FROM ('"public"."some_t...
                                                  ^
Traceback (most recent call last):
  File "/project/.meltano/extractors/tap-postgres/venv/bin/tap-postgres", line 8, in <module>
    sys.exit(main())          
  File "/project/.meltano/extractors/tap-postgres/venv/lib/python3.8/site-packages/tap_postgres/__init__.py", line 435, in main
    raise exc                 
  File "/project/.meltano/extractors/tap-postgres/venv/lib/python3.8/site-packages/tap_postgres/__init__.py", line 432, in main
    main_impl()               
  File "/project/.meltano/extractors/tap-postgres/venv/lib/python3.8/site-packages/tap_postgres/__init__.py", line 421, in main_impl
    do_sync(conn_config, args.catalog.to_dict() if args.catalog else args.properties,
  File "/project/.meltano/extractors/tap-postgres/venv/lib/python3.8/site-packages/tap_postgres/__init__.py", line 312, in do_sync
    state = sync_traditional_stream(conn_config,
  File "/project/.meltano/extractors/tap-postgres/venv/lib/python3.8/site-packages/tap_postgres/__init__.py", line 175, in sync_traditional_stream
    state = do_sync_incremental(conn_config, stream, state, desired_columns, md_map)
  File "/project/.meltano/extractors/tap-postgres/venv/lib/python3.8/site-packages/tap_postgres/__init__.py", line 86, in do_sync_incremental
    state = incremental.sync_table(conn_config, stream, state, desired_columns, md_map)
  File "/project/.meltano/extractors/tap-postgres/venv/lib/python3.8/site-packages/tap_postgres/sync_strategies/incremental.py", line 97, in sync_table
    cur.execute(select_sql)   
  File "/project/.meltano/extractors/tap-postgres/venv/lib/python3.8/site-packages/psycopg2/extras.py", line 146, in execute
    return super().execute(query, vars)
psycopg2.errors.SyntaxError: syntax error at or near "'"public"."some_table"'"

some_table -> anonymized table name

Upon further investigation, it looks like there have been some changes to the `fully_qualified_* functions in 1.8.2: v1.8.1...v1.8.2#diff-51129da4bc41bedfbc727a6765232439cab29b784f5b3b2878de0f5f2a1cf00cR32

See tap_postgres/db.py:32 and similar.

Downgrading to 1.8.1 solves the issue.

To Reproduce

Expected behavior
Shouldn't error out with a syntax error.

Your environment

  • Version of tap: 1.8.2
  • Version of python 3.8
  • Version of meltano 1.92

Cannot add more tables to existing postgres->snowflake log based extract

Describe the bug

I'm using log based replication with:

  • pipelinewise-tap-postgres==1.8.0
  • pipelinewise-target-snowflake==1.13.1

Initially I selected just a few tables in postgres. This worked as expected. Later, I tried to select more of the postgres tables. When I did this, it did a full table sync (logical_initial) for the new tables, which was expected. However, the state for these new tables is not emitted, and every time I run after that, it once again does a full table sync. The new tables never use log based replication.

I found this bug when running using meltano, but confirmed it using a standalone singer job.

To Reproduce

Steps to reproduce the behavior:

  1. Create a test postgres database with a couple of tables:

    CREATE TABLE a (a integer PRIMARY KEY);
    CREATE TABLE b (a integer PRIMARY KEY);
    
    INSERT INTO a VALUES (1), (2), (3);
    INSERT INTO b VALUES (1), (2), (3);
    
  2. Create config files for the tap and target, for example:

    tap_config.json

    {
      "host": "127.0.0.1",
      "port": 5432,
      "user": "myuser",
      "pass loword": "mypass",
      "dbname": "test",
      "filter_schemas": "public",
      "logical_poll_total_seconds": 60
    }

    target_config.json

    {
      "account": "myaccount",
      "user": "TEST_ROBOT",
      "password": "mypass",
      "warehouse": "TEST",
      "dbname": "TEST",
      "role": "TEST",
      "file_format": "FILE_FORMATS.CSV",
      "default_target_schema": "singer"
    }
  3. Install the tap and create catalog.json

    $ mkvirtualenv tap-postgres
    $ pip install pipelinewise-tap-postgres==1.8.0
    $ tap-postgres --config tap_config.json --discover > catalog.json
    # Add to the "public-a" section of the catalog:
    #             "selected": true,
    #             "replication-method": "LOG_BASED",
    $ deactivate
  4. Install the target

    $ mkvirtualenv target-snowflake
    $ pip install pipelinewise-target-snowflake==1.13.1
    $ deactivate
  5. Do the initial run

    $ ~/.virtualenvs/tap-postgres/bin/tap-postgres \
          --config tap_config.json \
          --properties catalog.json \
        | ~/.virtualenvs/target-snowflake/bin/target-snowflake \
          --config target_config.json \
        >> state.json
  6. Second run to show that "public-a" is now using log based replication

    $ tail -1 state.json > state.json.tmp && mv state.json.tmp state.json \
        && ~/.virtualenvs/tap-postgres/bin/tap-postgres \
          --state state.json \
          --config tap_config.json \
          --properties catalog.json \
        | ~/.virtualenvs/target-snowflake/bin/target-snowflake \
          --config target_config.json \
        >> state.json
  7. Add a new table to the selected tables and run again

    # Add to the "public-b" section of the catalog:
    #             "selected": true,
    #             "replication-method": "LOG_BASED",
    $ tail -1 state.json > state.json.tmp && mv state.json.tmp state.json \
        && ~/.virtualenvs/tap-postgres/bin/tap-postgres \
          --state state.json \
          --config tap_config.json \
          --properties catalog.json \
        | ~/.virtualenvs/target-snowflake/bin/target-snowflake \
          --config target_config.json \
        >> state.json

You'll see this in the logs:

Beginning sync of stream(public-b) with sync method(logical_initial)
Performing initial full table sync 

Later, the target will show the expected state:

Setting state to {'currently_syncing': 'public-b', 'bookmarks': {'public-a': {'last_replication_method': 'LOG_BASED', 'lsn': 37570216, 'version': 1626456439935, 'xmin': None}, 'public-b': {'last_replication_method': 'LOG_BASED', 'lsn': 37570744, 'version': 1626583810479}}}
...
Setting state to {'currently_syncing': None, 'bookmarks': {'public-a': {'last_replication_method': 'LOG_BASED', 'lsn': 37570216, 'version': 1626456439935, 'xmin': None}, 'public-b': {'last_replication_method': 'LOG_BASED', 'lsn': 37570744, 'version': 1626583810479, 'xmin': None}}}

However, then it moves on to reading from the replication slot for the existing table(s). After that's done, the state has "lost" the new table:

Setting state to {'currently_syncing': None, 'bookmarks': {'public-a': {'last_replication_method': 'LOG_BASED', 'lsn': 37570216, 'version': 1626456439935, 'xmin': None}}}

Unfortunately, this is the state that gets emitted:

Emitting state {"currently_syncing": null, "bookmarks": {"public-a": {"last_replication_method": "LOG_BASED", "lsn": 37570216, "version": 1626456439935, "xmin": null}}}

Because of this, the next time you run, it'll start with the "logical_initial" full table sync again for the new table, and every time you run after that as well.

Expected behavior
The state for public-b should be emitted by the target.

Screenshots
If applicable, add screenshots to help explain your problem.

Your environment

  • Version of target: [e.g. 1.13.1]
  • Version of python [e.g. 3.8.9]

Additional context
From this slack thread, it sounds like this may not be encountered when running using pipelinewise, since "FastSync" is used for the initial sync.

How exactly do state files work?

If I generate a INCREMENTAL state.json by simply > state.json, the resulting file cannot be re-read by --state state.json.

time=2024-01-22 17:49:08 name=tap_postgres level=CRITICAL message=Extra data: line 2 column 1 (char 1117)
Traceback (most recent call last):
File "/home/ergo/Devel/singer.io/venv-tap-postgres/bin/tap-postgres", line 8, in
sys.exit(main())
File "/home/ergo/Devel/singer.io/venv-tap-postgres/lib/python3.10/site-packages/tap_postgres/init.py", line 448, in main
raise exc
File "/home/ergo/Devel/singer.io/venv-tap-postgres/lib/python3.10/site-packages/tap_postgres/init.py", line 445, in main
main_impl()
File "/home/ergo/Devel/singer.io/venv-tap-postgres/lib/python3.10/site-packages/tap_postgres/init.py", line 393, in main_impl
args = parse_args(REQUIRED_CONFIG_KEYS)
File "/home/ergo/Devel/singer.io/venv-tap-postgres/lib/python3.10/site-packages/tap_postgres/init.py", line 373, in parse_args
args.state = utils.load_json(args.state)
File "/home/ergo/Devel/singer.io/venv-tap-postgres/lib/python3.10/site-packages/singer/utils.py", line 109, in load_json
return json.load(fil)
File "/usr/lib/python3.10/json/init.py", line 293, in load
return loads(fp.read(),
File "/usr/lib/python3.10/json/init.py", line 346, in loads
return _default_decoder.decode(s)
File "/usr/lib/python3.10/json/decoder.py", line 340, in decode
raise JSONDecodeError("Extra data", s, end)
json.decoder.JSONDecodeError: Extra data: line 2 column 1 (char 1117)

However, if I manually remove everything up to the final STATE line, it works. What am I doing wrong?

best regards,

Ernst-Georg

How to troubleshoot missing column value for Logical rep.

Hi,
I am using logical replication from postgres to redshift. For one table's text type column which contains string dump of json, we are sometimes getting NULL value instead of long string json dump.
I was wondering if you have any suggestions to help me identify cause of this issue.
for more context,

  • this is a text not null in postgres 13,
  • I am using pipelinewise-tap-postgres (1.8.3) with pipelinewise in CDC mode.
  • I have another text notnull column in same table and this problem does not occur for that.
  • I checked csv file generated by target-redshift, column value is NULL in file too, so it is not a copy command ignore thing
  • FastSync full load is getting correct data, I reload table to fix all NULLs and now CDC replication inserting nulls again
  • I was using Transform to set some other columns to NULL but I disabled that to see if that is the bug, no effect.

canceling statement due to conflict with recovery

Tap is frequently used to pull data from the postgres replica and a common issue in this case is "canceling statement due to conflict with recovery" especially for a full table sync. It happens when WAL comes to a replica (upon sync between master and a slave). The tap already has a mechanism to resume but no retry mechanism.
I think it would be helpful to be able to configure number of attempts a tap tries to pull the data.

Shouldn't fail with ERROR "could not find pg_class entry for ..." after dropping table of unrelated schema

Describe the bug
When DB contains two schemas (for example "public" and "pglogical"), but only one of them is configured for tap-postgres:

type: "tap-postgres"
schemas:
  - source_schema: "public"
...
    tables:
      - table_name: "test_table"
        replication_method: "LOG_BASED"

and if after last PipelineWise run other schema (pglogical) was deleted, then next PipelineWise run will fail with:

logger_name=tap_postgres file=/app/.virtualenvs/tap-postgres/lib/python3.8/site-packages/tap_postgres/sync_strategies/logical_replication.py:619 log_level=ERROR message=could not find pg_class entry for 7222930

logger_name=tap_postgres file=/app/.virtualenvs/tap-postgres/lib/python3.8/site-packages/tap_postgres/__init__.py:434 log_level=CRITICAL message=could not find pg_class entry for 7222930

Traceback (most recent call last):
  File "/app/.virtualenvs/tap-postgres/bin/tap-postgres", line 8, in <module>
    sys.exit(main())
  File "/app/.virtualenvs/tap-postgres/lib/python3.8/site-packages/tap_postgres/__init__.py", line 435, in main
    raise exc
  File "/app/.virtualenvs/tap-postgres/lib/python3.8/site-packages/tap_postgres/__init__.py", line 432, in main
    main_impl()
  File "/app/.virtualenvs/tap-postgres/lib/python3.8/site-packages/tap_postgres/__init__.py", line 421, in main_impl
    do_sync(conn_config, args.catalog.to_dict() if args.catalog else args.properties,
  File "/app/.virtualenvs/tap-postgres/lib/python3.8/site-packages/tap_postgres/__init__.py", line 322, in do_sync
    state = sync_logical_streams(conn_config, list(streams), state, end_lsn, state_file)
  File "/app/.virtualenvs/tap-postgres/lib/python3.8/site-packages/tap_postgres/__init__.py", line 221, in sync_logical_streams
    state = logical_replication.sync_tables(conn_config, logical_streams, state, end_lsn, state_file)
  File "/app/.virtualenvs/tap-postgres/lib/python3.8/site-packages/tap_postgres/sync_strategies/logical_replication.py", line 617, in sync_tables
    msg = cur.read_message()
psycopg2.errors.InternalError_: could not find pg_class entry for 7222930

Which is caught and "rethrown"/raised from this line.

The missing pg_class entry pointed to table from the deleted schema.

To Reproduce
Steps to reproduce the behavior:

  1. Prepare database with two schemas (at least one table in both of them)
  2. Include only a table from one of those schemas in tap-postgres configuration (with LOG_BASED replication_method).
  3. Start pipelinewise - it should succeed
  4. Drop the other schema
  5. Start pipelinewise again - it should fail with the error mentioned above and

TAP RUN SUMMARY

Status : FAILED

Expected behavior

Dropping tables from schemas not configured for PipelineWise shouldn't cause issues, and tap run should succeed despite of deleted table:

TAP RUN SUMMARY

Status : SUCCESS

Your environment

  • Version of tap: [e.g. 1.8.1]
  • Version of python [e.g. 3.8]

Allow transfer of PostgreSQL interval data type

I have a dataset I need to transfer that includes the Postgres interval dataype, this doesn't appear to be currently supported.

I would like the interval type to be transfered as though it's a string/text type. My target (Redshift) can't store the interval type but can store the text and cast it back to interval.

My naive guess is this would 'just' be a change to discovery_utils.py

if data_type == 'interval':
        schema['type'] = nullable_column('string', col.is_primary_key)
        return schema

Improve handling of very large tables

Is your feature request related to a problem? Please describe.
When performing ingestion in "incremental" mode, tap-postgres runs a single query for each table, and iterates a cursor over the table, producing rows. In the case of the initial import for a very large table (in our case, about 4 million rows), this leaves a database transaction open for a long time.

As we run tap-postgres against a DB replica, this results in "cancelling statement due to error recovery". I am aware that I can increase Postgres parameters related to this, however, I am reluctant to do this due to the DB replica being used by other clients. I don't require the import to be done within a single transaction, but I would like to be able to incrementally add data without this error.

Describe the solution you'd like
Add a user-specified LIMIT to the postgres query that reads data from the table. This should be configurable via parameters.
Once the data is read from this query and output is produced, the query is re-run. So, the ingestion will loop. Loop can finish once the query returns fewer rows than the LIMIT and those rows are produced.

In each iteration, the postgres query reads data from the last bookmark. This risks reingesting data, but is a suitable trade for our desire to keep the transaction open less often.

The above should not be the default behaviour, but should be configured. e.g. a setting called "limit" specifying the number of rows to read each time, but it defaults to -1, meaning that no limit is applied.

pseudocode:

loop start:
run query - data since last bookmark, applying limit if present
read rows using cursor and produce output
loop end - continue if limit is present and number or rows we just read was less than the limit

Describe alternatives you've considered

  • Increasing postgres max delay settings.
  • Import from views which filter out rows after a certain date, and gradually ramp up the date.

Additional context

Mechanism to pull only a single table's data

Is your feature request related to a problem? Please describe.

We have an Embedded ETL use case where we are pulling customer data into our system based on credentials they provide. We generally only pull one table at a time and want to restrict to that data transfer, rather than pulling on a schema level through the filter_schemas attribute. It would be very nice to support this, and save us a lot of redundant data transfer.

We interface with this tap through Meltano, fwiw. Works great from our testing.

Describe the solution you'd like

Would like a way to be able to filter on a table-by-table basis, not only a schema-by-schema basis. We shouldn't need to pull an entire schema if we just want to sync one table.

Describe alternatives you've considered
Other Meltano taps don't seem to support this kind of thing either. Short of fully custom implementing (which we want to avoid - nullifies the entire advantage of using an abstracted tap like this via Meltano) or using a driver in our application (won't scale well as we support more than just Postgres) we aren't really sure how we'd achieve this.

Full table replication failing due to query timeout

Describe the bug
Full table replication fails on larger tables. Process fails with the following error message:
level=CRITICAL message=canceling statement due to statement timeout cmd_type=elb consumer=False name=tap-postgres producer=True stdio=stderr string_id=tap-postgres

After investigating, the issues was traced to the fact that the sql statement submitted was something like:
SELECT ..... FROM ..... ORDER BY xmin::text::bigint ASC

The ORDER BY caused the query run time to exceed 30 seconds. After running for 30 seconds, the process fails due to timeout. Removing the ORDER BY results in an near instantaneous return of data.

I get that the xmin allows for restarts but this seems only relevant for larger tables (small tables download in seconds for me) and larger tables will cause the entire process to fail.

To Reproduce
Steps to reproduce the behavior:

  1. Run full table replication against a large table
  2. meltano run tap-postgres target-s3-jsonl
  3. See error

Expected behavior
The extractor should be able to handle larger tables without erroring out. Even if the default behaviour causes errors to be thrown, this issue to be possible to bypass using configurations. I needed to fork the extractor and remove the ORDER BY in order to fix the issue.

Your environment

  • Version of tap: 2.1.0
  • meltano:v3.3.0

Re-license

Hi,

I would like to use pipelinewise-tap-postgres in a product and my customers are unable to accept AGPL code.
Are you able to release this with a more permissive license such as Apache 2.0, MIT or BSD?

Kind regards,
Dylan Just

Tap stuck at select query

Describe the bug
I was running tap following the config and catalog guidelines, was able to pull the streams but sync was stuck

To Reproduce
Steps to reproduce the behavior:

tap-postgres -c config.json --catalog catalog.json -p properties.json

Expected behavior
It should work!

Logs

2023-03-29 10:01:44 [DEBUG] - time=2023-03-29 10:01:44 name=tap_postgres level=INFO message=Selected streams: ['public-contact_data_source'] 

2023-03-29 10:01:44 [DEBUG] - time=2023-03-29 10:01:44 name=tap_postgres level=INFO message=No streams marked as currently_syncing in state file
time=2023-03-29 10:01:44 name=tap_postgres level=INFO message=Beginning sync of stream(public-contact_data_source) with sync method(full)

2023-03-29 10:01:44 [DEBUG] - time=2023-03-29 10:01:44 name=tap_postgres level=INFO message=Stream public-contact_data_source is using full_table replication

2023-03-29 10:01:44 [WARN ] - Unknown Singer output line type: ACTIVATE_VERSION [map[stream:public-contact_data_source type:ACTIVATE_VERSION version:1.680084104966e+12]]
2023-03-29 10:01:45 [DEBUG] - time=2023-03-29 10:01:45 name=tap_postgres level=INFO message=Current Server Encoding: UTF8

2023-03-29 10:01:45 [DEBUG] - time=2023-03-29 10:01:45 name=tap_postgres level=INFO message=Current Client Encoding: UTF8

2023-03-29 10:01:45 [DEBUG] - time=2023-03-29 10:01:45 name=tap_postgres level=INFO message=hstore is UNavailable
time=2023-03-29 10:01:45 name=tap_postgres level=INFO message=Beginning new Full Table replication 1680084104966
time=2023-03-29 10:01:45 name=tap_postgres level=INFO message=select SELECT  "contact_data_source_id" , "contact_id" , "contact_list_id" ,CASE WHEN  "date_created"  < '0001-01-01 00:00:00.000' OR  "date_created"  > '9999-12-31 23:59:59.999' THEN '9999-12-31 23:59:59.999' ELSE  "date_created"  END AS  "date_created" , "id" , "is_deleted" , "new_contact_id" , "new_lead_id" , "org_id" , "other_properties" , xmin::text::bigint
                                      FROM "public"."contact_data_source"
                                     ORDER BY xmin::text ASC with itersize 20000

Your environment

  • Version of tap: 2.0.0
  • Version of python: 3.10

Support pgoutput based logical replication

Is your feature request related to a problem? Please describe.
Not all cloud based postgresql PaaS providers support the installation of plugins or provide wal2json support, which prevents the use of LOG_BASED replication with pipelinewise-tap-postgres in those environments.

Describe the solution you'd like
Add support for the built-in pgoutput plugin that comes standard with PostgreSQL.

Describe alternatives you've considered
INCREMENTAL and FULL_TABLE replication are still possible, albeit slower and lacking deletes.

Wrong state during first sync of an incremental table

Describe the bug
During the first sync of a table in incremental mode, the state which is emited is wrong.
The behavior occurs when the rows in the table aren't in the same order than the replication-key.
The reason is that the ORDER BY clause is missing from the SQL query. This was introduced by this PR:
#189

The exact line:
https://github.com/transferwise/pipelinewise-tap-postgres/pull/189/files#diff-f45be604034c53a31927c986aeb8b360cae40502b086afb6539a4fe2c4b8b0c3L140

replication slot names must be lower case

the db I'm trying to replicate is named "DWH", all caps. If I try to create a replication slot with the correct naming scheme, I get this error:
ERROR: replication slot name "pipelinewise_DWH" contains invalid character HINT: Replication slot names may only contain lower case letters, numbers, and the underscore character.

If I create it with lower case letters, logical replication can't find the replication slot:
CRITICAL Unable to find replication slot pipelinewise_DWH with wal2json

If I use "dbname = dwh" in my config, discovery and presumably many other things fail.

Is the easiest fix just to .lower() the dbname when searching for the replication slot?

leaving schema filter blank throws error in Meltano 1.97.0

Describe the bug

I am getting successful 'test connection' in the meltano UI.. but when I go to execute the pipeline postgresql to csv target.. i get a cryptic error

To Reproduce
Steps to reproduce the behavior:

  1. have a postgresql DB with a table, setup the extractor to a successful test connection but leave schema filter blank

I tried full table and key based replication.. both failed but with different errors

  1. In the GUI, wire in a target-csv.. default settings
  2. hit manual run on the pipeline connecting the two

Expected behavior

If schema filter is required.. I expect it to fail the connection test or maybe warn me more explicitaly

Screenshots

image

Your environment

  • Version of tap: [e.g. 2.0.0] (Unsure where to find this information)
  • Version of python 3.8.10

Additional context
kubuntu 22.04 LTS daily

Order by xmin

I was wondering how do you handle very big tables (3 billion) while adding ORDER BY xmin::text ASC at the end of full initial sync cursor query? Do you create index, or perhaps is there another way to make these queries faster? And help is much appreciated, thank you.

Positibility to set application name for taps

Is your feature request related to a problem? Please describe.
Would be helpful, to be able to set the application name, or at least append it, so that it is not just pipelinewise. When running multiple taps, it would be helpful, to have this opportunity to distinguish between multiple.

Describe the solution you'd like
Another configuration parameter besides the existing connection parameters, named application_name or appname.

Can't handle dates with year > 9999

Describe the bug
Postgres dates can be far into the future:

SELECT '5874897-12-31'::date;

However, datetimes in python cannot handle years past 9999. When using logical replication, a ValueError will be thrown at this line.

To Reproduce
Steps to reproduce the behavior:

  1. Use logical replication on a table with a date column.
  2. Insert a new row into that table where the data has a year past 9999.

Expected behavior
There should be a way to workaround this. In my fork I just set any date past this to 9999-12-31, but this may not be the best default behavior. It'd be better to have an option to do this.

I can think of two scenarios:

  1. You legitimately want dates this far in the future
  2. Somewhat mistyped in a form somewhere, and there isn't great validation before it enters the database

For the first scenario, either you don't use a python tap, or you override so that those fields are treated as strings rather than as dates. Either way it's not something to consider as part of this issue, and you'd want the tap to just fail rather than trying to manipulate the dates in any way.

For the second, you may want the tap to fail so you can address the bad date in the source postgres system. Others (such as myself) would rather just have a warning message and have the date coerced so that the tap can keep running.

Screenshots
N/A

Your environment

  • Version of tap: [e.g. 1.8.0]
  • Version of python [e.g. 3.9.7]

Additional context
N/A

Cannot override discovered schema

Describe the bug
Changing the schema for a stream in the catalog file has no affect, since it's alway overwritten with the discovered stream in refresh_streams_schema.

To Reproduce
Steps to reproduce the behavior:

  1. Create a test postgres database with a couple of tables:

    CREATE TABLE a (a integer PRIMARY KEY, data jsonb);
    INSERT INTO a VALUES (1, '{}');
    
  2. Create config files for the tap and target, for example:

    tap_config.json

    {
      "host": "127.0.0.1",
      "port": 5432,
      "user": "myuser",
      "password": "mypass",
      "dbname": "tap_postgres",
      "filter_schemas": "public",
      "logical_poll_total_seconds": 60
    }

    target_config.json

    {
      "host": "127.0.0.1",
      "port": 5432,
      "user": "myuser",
      "password": "mypass",
      "dbname": "target_postgres",
      "default_target_schema": "public"
    }
  3. Install the tap and create catalog.json

    $ mkvirtualenv tap-postgres
    $ pip install pipelinewise-tap-postgres==1.8.1
    $ tap-postgres --config tap_config.json --discover > catalog.json
    # Modify the catalog
    # In the metadata section where breadcrumb = [],  add:
    #             "selected": true,
    #             "replication-method": "FULL_TABLE",
    # and under schema->properties->data->type change it to:
    #             ["null", "string"]
    $ deactivate
  4. Install the target

    $ mkvirtualenv target-postgres
    $ pip install pipelinewise-target-postgres==2.1.1
    $ deactivate
  5. Run the pipeline

    $ ~/.virtualenvs/tap-postgres/bin/tap-postgres \
          --config tap_config.json \
          --properties catalog.json \
        | ~/.virtualenvs/target-postgres/bin/target-postgres \
          --config target_config.json
  6. Check the table created in the target

    target_postgres=# SELECT pg_typeof("data") FROM a;
     pg_typeof
    -----------
     jsonb
    (1 row)
    

Expected behavior
If a catalog file is provided, its schema should take precedence over the discovered schema for that stream. The data type in the target should be character varying.

Screenshots
N/A

Your environment

  • Version of tap: [e.g. 1.8.1]
  • Version of python [e.g. 3.9.7]

Additional context
I discovered this while using meltano.

Chunk up extracted data to avoid PostgreSQL server disk space errors

Is your feature request related to a problem? Please describe.
I'm having trouble running an initial extract of a large table, because the Heroku PostgreSQL instance runs out of temporary disk space. I also run out of space locally on a machine with 83 GB free!

Describe the solution you'd like
The tap could split the extraction of data into manageable chunks.

Describe alternatives you've considered
https://github.com/ClickMechanic/pipelinewise-tap-postgres/tree/Add_max_query_rows_parameter adds a limit to the SQL query.

Additional context
This is the exception stack trace when the extract runs out of disk space.

tap-postgres     | time=2021-09-06 20:22:37 name=tap_postgres level=CRITICAL message=could not write to file "pg_tblspc/16400/PG_13_202007201/pgsql_tmp/pgsql_tmp31401.67": No space left on device
tap-postgres     | 
tap-postgres     | Traceback (most recent call last):
tap-postgres     |   File "/project/.meltano/extractors/tap-postgres/venv/bin/tap-postgres", line 8, in <module>
tap-postgres     |     sys.exit(main())
tap-postgres     |   File "/project/.meltano/extractors/tap-postgres/venv/lib/python3.6/site-packages/tap_postgres/__init__.py", line 435, in main
tap-postgres     |     raise exc
tap-postgres     |   File "/project/.meltano/extractors/tap-postgres/venv/lib/python3.6/site-packages/tap_postgres/__init__.py", line 432, in main
tap-postgres     |     main_impl()
tap-postgres     |   File "/project/.meltano/extractors/tap-postgres/venv/lib/python3.6/site-packages/tap_postgres/__init__.py", line 422, in main_impl
tap-postgres     |     args.config.get('default_replication_method'), state, state_file)
tap-postgres     |   File "/project/.meltano/extractors/tap-postgres/venv/lib/python3.6/site-packages/tap_postgres/__init__.py", line 316, in do_sync
tap-postgres     |     end_lsn)
tap-postgres     |   File "/project/.meltano/extractors/tap-postgres/venv/lib/python3.6/site-packages/tap_postgres/__init__.py", line 175, in sync_traditional_stream
tap-postgres     |     state = do_sync_incremental(conn_config, stream, state, desired_columns, md_map)
tap-postgres     |   File "/project/.meltano/extractors/tap-postgres/venv/lib/python3.6/site-packages/tap_postgres/__init__.py", line 86, in do_sync_incremental
tap-postgres     |     state = incremental.sync_table(conn_config, stream, state, desired_columns, md_map)
tap-postgres     |   File "/project/.meltano/extractors/tap-postgres/venv/lib/python3.6/site-packages/tap_postgres/sync_strategies/incremental.py", line 108, in sync_table
tap-postgres     |     for rec in cur:
tap-postgres     |   File "/project/.meltano/extractors/tap-postgres/venv/lib/python3.6/site-packages/psycopg2/extras.py", line 111, in __iter__
tap-postgres     |     first = next(res)
tap-postgres     | psycopg2.errors.DiskFull: could not write to file "pg_tblspc/16400/PG_13_202007201/pgsql_tmp/pgsql_tmp31401.67": No space left on device
tap-postgres     | 

Does the incremental `SELECT` include the last inserted rows on purpose?

(Comes from Slack) Today I was experimenting with pipelinewise-tap-postgres in INCREMENTAL mode and found that the row inserted in the last incremental pass gets updated in the next pass. the reason is that the SQL query has a >= {replication_key_value}, rather than > {replication_key_value} (greater-or-equal than rather than equal-than). Is there an explanation for this?

In my mind, if the bookmark in state.json says that the last inserted key was id=8, I would expect the tap to continue on id>8, rather than id>=8 (hence updating rows that were already inserted). Or am I missing something?

source:

WHERE {post_db.prepare_columns_sql(replication_key)} >= '{replication_key_value}'::{replication_key_sql_datatype}

Is it supported to read from PostgreSQL View?

Your question
Hi! I am currently using this tap inside meltano project, I was trying to read from a PostgreSQL View but came across an error that is 'Primary key is mandatory'. PostgresSQL view does not support primary key, I am wondering whether there is way that I could read from a view? Thank you very much!

Postgres returns non ISO formatted timestamp with a 'Europe/Paris' connection

Describe the bug
When the extractor send a timestamp before '1911-03-11' to the singer_sdk (from an 'Europe/Paris' connection to Postgres), here is the timestamp received in the loader : '1911-03-10 00:00:00+00:09:21'

Example : SELECT '1800-01-01T00:00:00+00'::timestamptz ==> 1800-01-01 00:09:21+00:09:21

This timestamp is not in a valid common format.
The extractor should give an option to set the output UTC or the time zone connection to Potgres.

use_secondary with Meltano

Hi, thank you for this useful project. I was wondering if using secondary replica read works for Meltano triggered tasks as well since these parameters are not in the tap settings. And I tried to set these env vars just to see if it will work, but even for full_table tables, there were no connection to replica, it just used master db.

export TAP_POSTGRES_USE_SECONDARY=True
export TAP_POSTGRES_SECONDARY_HOST=xxxxxxxx.us-west-1.rds.amazonaws.com
export TAP_POSTGRES_SECONDARY_PORT=5432

Would be nice if you could guide me to how to leverage read replica during log_based mode's initial sync time. We have really big tables and I do not want to read from master for initial sync but also I want to keep reading wal2json log since it grows really fast, so I can not leave initial sync tables for a day.

JSONschema version not compatible with target-snowflake

Describe the bug
This tap produces a schema with exclusiveMaximum and exclusiveMinimum according the JSONschema V5, but these are incompatible with pipelinewise-target-snowflake's JSONschema v7 validator.

In JSONschema v6, the fields exclusiveMaximum and exclusiveMinimum were changed from boolean to numeric (JSON Schema Draft-06 Release Notes).

pipelinewise-target-snowflake uses the the incompatible v7 validator: pipelinewise-target-snowflake source

The exclusiveMaximum is set in the schema here

In JSON schema v5 and below, a numeric field can be described as:

{
      "type": ["null", "number" ],
      "exclusiveMaximum": true,
      "maximum": 10000,
      "multipleOf": 1e-38,
      "exclusiveMinimum": true,
      "minimum": -10000
}

The equivalent in v6 and above is:

{
      "type": ["null", "number" ],
      "exclusiveMaximum": 10000,
      "multipleOf": 1e-38,
      "exclusiveMinimum": -10000
}

Steps to replicate

  1. create config.json for a database with a numeric type colum, and for a snowflake database.
  2. tap-postgresql -c tap-config.json | target-snowflake -c target-config.json
  3. See error from the jsonschema validator: jsonschema.exceptions.ValidationError: 85000 is greater than or equal to the maximum of True

Expected behavior
The tap and target should communicate with the same standard

Your environment

  • pipelinewise-tap-postgres version: 2.1.0

I can see that just moving to JSONschema v7 might be simple, it will probably break something else. Can you give some input on what the best allround solution to this would look like?

Can I use this to have multiple slots for a single DB that correspond to specific tables?

I have multiple large tables in a single source Postgres DB that I want to sync independently - e.g. one pipeline and/or project per table - using wal2json and the add-tables parameter. If they all share the same replication slot for the entire DB, the replication slot is a) wasteful since it contains tables I don't care to sync and 2) I can't run these sync concurrently. What I'm looking for is to create a Meltano pipeline per table each with its own replication slot. The README only discusses creating a pipelinewise_<database_name> slot, but it appears I can create
multiple pipelinewise_<database_name><tap_id> set to specific tables? What would the tap_id correspond to in my Meltano configuration, the name of the extractor?

def generate_replication_slot_name(dbname, tap_id=None, prefix='pipelinewise'):
"""Generate replication slot name with
:param str dbname: Database name that will be part of the replication slot name
:param str tap_id: Optional. If provided then it will be appended to the end of the slot name
:param str prefix: Optional. Defaults to 'pipelinewise'
:return: well formatted lowercased replication slot name
:rtype: str
"""
# Add tap_id to the end of the slot name if provided
if tap_id:
tap_id = f'_{tap_id}'
# Convert None to empty string
else:
tap_id = ''
slot_name = f'{prefix}_{dbname}{tap_id}'.lower()
# Replace invalid characters to ensure replication slot name is in accordance with Postgres spec
return re.sub('[^a-z0-9_]', '_', slot_name)

Logical replication failing due to too many sessions being opened with the database

Describe the bug
When running with log replication, if the extractor encounters array data, it will open multiple connections to Postgres in order to figure out the array data's schema. When there are large tables that contain array fields, the number of connections being opened to Postgres will cause new connections to be denied. This results in a connection timeout error.

To Reproduce
Steps to reproduce the behavior:

  1. Setup the extractor to run against a large table containing array fields
  2. Perform a migration in Postgres on this large table (causing a large number of records to be dumped into the transaction log)
  3. Run the extractor using log replication
  4. Monitor the increase in connections being opened
  5. Hit connection timeout at Postgres starts rejecting new connections

Timeout error:
name=tap_postgres level=CRITICAL message=connection to server at "XXXXXX" (XXXXXX), port 5432 failed: timeout expired

Spikes in connections count when running Meltano. Behaviour is shown being turned on and off by including / excluding the table containing the array field. Session counts drop after the timeout error is hit.
Screenshot 2024-04-22 at 16 04 11

Expected behavior
There should be options to allow for the array fields to be handled as string or parsed as JSON. I would be hassle to have to write a potentially have to write a UDF in a downstream platform to parse the string but this is better than a process that fails and also degrades the production database or the read replica database.

Your environment

  • Version of tap: 2.1.0
  • meltano:v3.3.0

Additional context
Add any other context about the problem here.

Field-Level Metadata Removed

Describe the bug
Field level metadata is removed from the catalog when discovery is performed

To Reproduce
Steps to reproduce the behavior:

  1. Provide field-level metadata in the catalog, for example:
{
          'breadcrumb': ['properties', 'char_name'],
          'metadata': {
	          'arbitrary_field_metadata': 'should be preserved'
          }
}
  1. Run the tap

This affects at least Meltano users who use the meltano select feature to populate the selected metadatum to indicate whether fields should be copied or not. In that case, the selected field is being stripped and therefore ignored and all fields get copied regardless.

Expected behavior
Field-level metadata should be preserved through the stream_utils.refresh_streams_schema() with only discoverable metadata being updated/added.

Your environment

  • Version of tap: master
  • Version of python: 3.8

Additional context
I'm using the tap with meltano and setting a field metadatum: selected: True, however this is stripped from the metadata and in turn not honored by the tap.

Handling of hstore fields

Currently, hstore fields get serialized as text which results in values like "my_key"=>"my value". However, that requires non-standard parsing and transformations in destinations that would support JSON-like column types natively, like Snowflake. For instance, I have to run the following transformation in all of my dbt base models that reference an hstore serialized as text:

{% macro hstore_to_json(str) %}
  PARSE_JSON(CONCAT('{', REPLACE({{ str }}, '=>', ':'), '}'))
{% endmacro %}

If the tap would, however, encode the hstore as JSON and then send it as an object to the target it could be handled natively. This is also how Debezium does it by default.
Is there any specific reason for the current behavior?

Cannot select specific columns

Describe the bug
It is impossible to select specific columns because all the columns are always selected

To Reproduce

  1. Run a posgres db
docker run --name basic-postgres --rm -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=4y7sV96vA9wv46VR -e PGDATA=/path/to/pgdata -v /tmp:/var/lib/postgresql/data -p 5432:5432 -it postgres:14.1-alpine
  1. Create the table
CREATE TABLE prodotti (
id int4 NULL,
merchant varchar
);
insert into prodotti values (1, 'aaa');
insert into prodotti values (2, 'bbb');
  1. Configure the catalog to select only the id column and not select merchant
  2. Run tap-postgres with the attached files
tap-postgres -c config.json --catalog catalog.json

catalog.json.txt
config.json.txt

Expected behavior
Only the column id is extracted

Your environment

  • Version of tap: 2.0.0
  • Version of python 3.9

ARM build for tap

Is your feature request related to a problem? Please describe.
The tap doesn't have a ARM build

Describe the solution you'd like
few dependent libraries doesn't have arm build hence the build of tap doesn't work on ARM machines, alternatives to these dependencies needs to find

Support of partitioned tables

Is your feature request related to a problem? Please describe.
In case I need sync partitioned table I could not do it. Tap has no access to partitioned table under the base name (e.g. table_name), instead it can access only by exact partition name (e.g. table_name_2021_05). It is not suitable.

Describe the solution you'd like
As far as I understand from the source code the issue could be fixed by just adding relkind='p' at the line

AND pg_class.relkind IN ('r', 'v', 'm')
So the result line will be AND pg_class.relkind IN ('r', 'v', 'm', 'p'). It will allow to scan partitioned tables by simple name.

However, I'm not person in context. I'm afraid there are some other reasons the tap does not support partitioned tables (any side effects?).

Describe alternatives you've considered
The alternatives I'm using now specify exact partition name according to current month to get fresh data.

Additional context
No

Primary key updates handled incorrectly in LOG_BASED replication

Describe the bug
Primary key updates handled incorrectly in LOG_BASED replication, causing old data to be kept in target tables.

To Reproduce
Steps to reproduce the behavior:

-- table defintion
create table table_name
(
	a_id integer not null,
	b_id integer not null,
	c integer not null,
	constraint pkey
		primary key (a_id, b_id, c)
);

-- update one row in table

in consume_message, following wal2json payload is read:

def consume_message(streams, state, msg, time_extracted, conn_info):

# wal2json payload
PAYLOAD = {
    'kind': 'update',
    'schema': 'public',
    'table': 'table_name',
    'columnnames': ['a_id', 'b_id', 'c'],
    'columntypes': ['integer', 'integer', 'integer'],
    # new primary key
    'columnvalues': [2, 10, 1],
    'oldkeys': {
         'keynames': ['a_id', 'b_id', 'c'],
         'keytypes': ['integer', 'integer', 'integer'],
         # old primary key
         'keyvalues': [2, 5, 1]
    }
}

# emitted record by tap
RECORD_MESSAGE = {
    'type': 'RECORD', 'stream': 'public-table_name',
    'record':
        {'a_id': 1, 'b_id': 10, 'c': 1, '_sdc_deleted_at': None},
    'version': 1,
    'time_extracted': '2021-05-13T09:20:31.892225Z'
}

Expected behavior
In target table row with PK [2, 5, 1] is updated to [2, 10, 1]

Actual result
In target table row with PK [2, 5, 1] is kept and row with PK [2, 10, 1] is added.
Target table now contains row [2, 5, 1] which has been deleted from source.

Your environment

  • Version of tap: [1.7.1]
  • Version of python [3.7]

`schema` is an invalid JSON Schema instance

I'm not sure if this issue should be reported here, but any help will be highly appreciated.

Describe the bug
On running meltano with tap-postgres on the tables from my DB I get an error:

CRITICAL ('`schema` is an invalid JSON Schema instance: {"type": "SCHEMA", "stream": "public-blogPosts", "schema": {"type": "object", "properties": {"slug": {"type": ["string"], "maxLength": 255}, "title": {"type": ["null", "string"], "maxLength": 255}, "body": {"type": ["null", "string"]}, "tags": {"type": ["null", "array"], "items": {"$ref": "#/definitions/sdc_recursive_string_array"}}, "image": {"type": ["null", "string"]}, "created_at": {"type": ["null", "string"], "format": "date-time"}, "updated_at": {"type": ["null", "string"], "format": "date-time"}}, "definitions": {"sdc_recursive_integer_array": {"type": ["null", "integer", "array"], "items": {"$ref": "#/definitions/sdc_recursive_integer_array"}}, "sdc_recursive_number_array": {"type": ["null", "number", "array"], "items": {"$ref": "#/definitions/sdc_recursive_number_array"}}, "sdc_recursive_string_array": {"type": ["null", "string", "array"], "items": {"$ref": "#/definitions/sdc_recursive_string_array"}}, "sdc_recursive_boolean_array": {"type": ["null", "boolean", "array"], "items": {"$ref": "#/definitions/sdc_recursive_boolean_array"}}, "sdc_recursive_timestamp_array": {"type": ["null", "string", "array"], "format": "date-time", "items": {"$ref": "#/definitions/sdc_recursive_timestamp_array"}}, "sdc_recursive_object_array": {"type": ["null", "object", "array"], "items": {"$ref": "#/definitions/sdc_recursive_object_array"}}}}, "key_properties": ["slug"], "bookmark_properties": []}\n', '`$ref` path "{\'type\': [\'null\', \'string\', \'array\'], \'items\': {\'$ref\': \'#/definitions/sdc_recursive_string_array\'}}" is recursive')

To Reproduce
Steps to reproduce the behavior:

  1. Follow the tutorial from meltano
  2. Run the pipeline: meltano elt tap-postgres target-postgres --job_id=gitlab-to-postgres

Expected behavior
Data is imported

Your environment

  • Version of tap: [?]
  • Version of python [e.g. 3.7.9]

Bit types of length > 1 not being detected

Describe the bug
Bit types of length > 1 not being detected

To Reproduce
create a table with a BIT(8) field:

create table jmtest (
id INTEGER,
name VARCHAR,
somebytes bit(8),
etl_updated_timestamp TIMESTAMP);

Run tap in discover mode:
tap-postgres --config ~/.pipelinewise/bigquery/tap_jmtest/config.json --discover

Expected behavior
We should see the type of the somebytes column. Instead we are getting empty type for this column

Screenshots
image

Your environment

  • Version of tap: 1.8.0
  • Version of python 3.7.7

Additional context
This is related to the issue in the main repo: https://github.com/transferwise/pipelinewise/issues/740
It seems to affect all targets and also in a different way this PostgreSQL tap

Handle Postgres TOAST values and missing columns/values from wal2json

Describe the bug

We ran into the problem that particular columns ended up NULL in the data warehouse after logical replication updates. After diving into the problem we discovered that this happens because of TOAST values in Postgres. A good explanation can be found here https://debezium.io/blog/2019/10/08/handling-unchanged-postgres-toast-values/, but TL;DR Postgres can sometimes decide to not include a column in the logical replication message from wal2json if that particular column was not part of the UPDATE statement.

To Reproduce

Used the steps described here https://github.com/eulerto/wal2json/pull/99/files#diff-dc97f558087716ec3429b35c430504cbd91e93aa6e477188c289927632d01b6f to reproduce the problem.

Expected behavior
The expectation is that if the message from wal2json contains a toast value and misses a column it does not set that column to NULL for the downstream singer target.

Your environment

  • Version of tap: forked version of 1.8.0 (Hacker0x01#1)
  • Version of python 3.9.7

Additional context
Because HackerOne is using this tap in production we have already pushed a fix to our fork of tap-postgres in Hacker0x01#1. The commit with the working fix for HackerOne is Hacker0x01@1584b8d. This fix requires altering tables with REPLICA IDENTITY FULL.

Release 1..8.3

I see a fix for an issue we have in Release 1.8.3. when would that be available?

max_run_seconds not ending job after time elapses

Describe the bug
I set max_run_seconds at 300 but 20 minutes later my pipeline is still running. There are no new messages because I am testing the pipeline against a test DB where no changes are being made to the source tables.

To Reproduce

  extractors:
  - name: tap-postgres
    variant: transferwise
    pip_url: pipelinewise-tap-postgres
  - name: primary-config
    inherit_from: tap-postgres
    config:
      dbname: my-db
      host: host
      user: user
      port: 5432
      default_replication_method: LOG_BASED
      max_run_seconds: 300
  - name: child-extractor
    inherit_from: primary-config
    select:...

Also tried max_run_seconds: '30' and max_run_seconds: 30, neither worked (logical_poll_total_seconds did at 300).

Expected behavior
I set max_run_seconds to 30 or 300, 30 or 300 seconds later the pipeline turns itself off.

Screenshots
If applicable, add screenshots to help explain your problem.

Your environment

  • Version of tap: 1.8.1
  • Version of python: 3.9

Additional context
Is the problem that because there are no new changes to the source tables, there are no new messages and therefore msg is null and these conditional statements are never evaluated?

https://github.com/transferwise/pipelinewise-tap-postgres/blob/master/tap_postgres/sync_strategies/logical_replication.py#L608-L631

logical_poll_total_seconds is not dependent on an if msg: condition and works fine. I can see why break_at_end_lsn needs to be checked within if msg: but not max_run_seconds.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.