Giter Club home page Giter Club logo

kafka-connect-jdbc's Introduction

Kafka Connect JDBC Connector

kafka-connect-jdbc is a Kafka Connector for loading data to and from any JDBC-compatible database.

Documentation for this connector can be found here.

Development

To build a development version you'll need a recent version of Kafka as well as a set of upstream Confluent projects, which you'll have to build from their appropriate snapshot branch. See the FAQ for guidance on this process.

You can build kafka-connect-jdbc with Maven using the standard lifecycle phases.

FAQ

Refer frequently asked questions on Kafka Connect JDBC here - https://github.com/confluentinc/kafka-connect-jdbc/wiki/FAQ

Contribute

Contributions can only be accepted if they contain appropriate testing. For example, adding a new dialect of JDBC will require an integration test.

Information

For more information, check the documentation for the JDBC connector on the confluent.io website. Questions related to the connector can be asked on Community Slack or the Confluent Platform Google Group.

License

This project is licensed under the Confluent Community License.

kafka-connect-jdbc's People

Contributors

aakashnshah avatar c0urante avatar confluentjenkins avatar cyril-engels avatar cyrusv avatar ddasarathan avatar ewencp avatar gharris1727 avatar gwenshap avatar ishiihara avatar jimgalasyn avatar joel-hamill avatar kkonstantine avatar liukrimhrim avatar mageshn avatar manasjyotisharma avatar maxzheng avatar mukkachaitanya avatar ncliang avatar patrick-premont avatar rajdangwal avatar rhauch avatar rnpridgeon avatar shikhar avatar sp-gupta avatar stheppi avatar tanish0019 avatar wicknicks avatar xiangxin72 avatar xjin-confluent avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Forkers

duongthangge

kafka-connect-jdbc's Issues

Schema migration support

We should be able to detect when tables change and react appropriately, updating the schema. To do so probably just requires detecting the change in the Task as we perform queries and making sure we perform the translation.

Support for uuid type in postgres

Tables with columns of type UUID in postgres cannot be "connected" by just listing the table in the whitelist because the UUID type is not handled by the converter.

One workaround is to use a SQL query instead and convert UUIDs to strings instead but obviously this isn't ideal.

support postgresql boolean/BIT

I know you are already aware of this, as it's mentioned in the code but it'd be nice to handle the BIT type for postgresql.

Indeed, boolean columns get seen as BIT types and fail while being converted:

[2016-01-08 16:09:48,292] WARN Ignoring record due to SQL error: (io.confluent.connect.jdbc.JdbcSourceTask:73)
org.postgresql.util.PSQLException: Bad value for type byte : f
    at org.postgresql.jdbc2.AbstractJdbc2ResultSet.getByte(AbstractJdbc2ResultSet.java:2100)
    at io.confluent.connect.jdbc.DataConverter.convertFieldValue(DataConverter.java:281)
    at io.confluent.connect.jdbc.DataConverter.convertRecord(DataConverter.java:68)
    at io.confluent.connect.jdbc.TimestampIncrementingTableQuerier.extractRecord(TimestampIncrementingTableQuerier.java:165)
    at io.confluent.connect.jdbc.JdbcSourceTask.poll(JdbcSourceTask.java:211)
name=control-info-pgstaging
    at org.apache.kafka.connect.runtime.WorkerSourceTask$WorkerSourceTaskThread.execute(WorkerSourceTask.java:353)
    at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)

The resulting avro message ends up with all boolean values being null (and also I think these failures might slow down the import process quite a lot).

Timestamp based incremental loads

Add support for incrementally loading data from a table by using a timestamp column.

  • Enabling this and specifying the column needs to be exposed in the config
  • Could potentially work with any monotonically increasing, ordered, possibly non-unique column, so it would be good to try to test this with both timestamps and some other type.
  • Since timestamps are non-unique, offset handling has to be managed carefully, e.g. we cannot use the row's timestamp as the offset, we have to use something smaller.

I've made some changes in my fork to allow DB-specific query generation

Hi,

I made some revisions in a fork of this project off of 2.0.1 release to allow DB-specific SQL. The current implementation of the timestamp/incrementing queries is very inefficient due to the OR clause with many optimizers. In addition, there is no limit as how many rows may be returned. The current implemenation uses auto-commit, which disables paging of rows. Disabling autocommit is not desirable anyway as it can create excessive locking on the database. I take advantage of paging predicates (timestamp, incr) > (?,?) and the LIMIT clause.

The original query is generated by the "generic" query generator. I've created a postgres generator, which has uses paging predicates and a limit clause. Ostensibly that form should work on several different RDBMS's, as well with perhaps a few variants.

The head seems to have added a new lower bound to the timestamp which is unnecessary for the my new generator. I removed the ability to automatically add a where clause to a custom query, and I would like to add a parameter which describes the bind variables. Internally, the query generators work that way so queries may have any combination of timestamp, and incementing parameters. We can all add the limit as a bind variable for some platforms, but is probably unnecessary, and if we want to add a lower_bound_timestamp as a parameter type we can too.

Should I submit a pull request, or do you want to peek at me repo first?

Duplicates mysql data while running in distributed mode

While running kafka jdbc connector in distributed connector in incrementing mode,
Connector keeps copying all data of the table every time it polls.

Any reason this can happen? may be i am missing some thing with config here. Here is my connector config.

{
"name": "mysql-test1",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "10",
"connection.url": "",
"mode": "incrementing",
"validate.non.null": "false",
"table.whitelist": "test_table",
"topic.prefix": "Newtest5-"
}
}

Can you please suggest?

connect-dsitributed process dies if null value goes into incrementing.column.name is null

I have been trying to use the connect-distributed service and it dies for pretty much any error / boundary condition. It dies if

  • the incrementing.column.name is null,
  • if the timestamp.column.name is null,
  • if the topic.prefix is not set
  • if incrementing.column.name and timestamp.column.name are both set.

The only way to recover is to delete / flush the connect-configs topic. I am building an app to automatically create these connectors, and the service keeps dying on me.

When Mode=Bulk : Unhandled exception when committing WorkerSourceTask{id=***}: (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:118) java.lang.NullPointerException

I'm using Confluent 2.0.1-2.11.7 from: http://packages.confluent.io/archive/2.0/confluent-2.0.1-2.11.7.tar.gz

Here is the environment of each of my nodes. I'm running in docker.

/opt/confluent-2.0.1 # export
export ALPINE_GLIBC_URL='https://circle-artifacts.com/gh/andyshinn/alpine-pkg-glibc/6/artifacts/0/home/ubuntu/alpine-pkg-glibc/packages/x86_64/'
export CONFLUENT_VERSION='2.0.1-2.11.7'
export DOCKERIZE_VERSION='0.2.0'
export GLIBC_BIN_PKG='glibc-bin-2.21-r2.apk'
export GLIBC_PKG='glibc-2.21-r2.apk'
export HOME='/root'
export HOSTNAME='bf08d841be2d'
export JAVA_HOME='/opt/jdk'
export JAVA_PACKAGE='jdk'
export JAVA_VERSION_BUILD='14'
export JAVA_VERSION_MAJOR='8'
export JAVA_VERSION_MINOR='45'
export PATH='/opt/confluent-2.0.1/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/jdk/bin'
export PWD='/opt/confluent-2.0.1'
export SHLVL='1'
export TERM='xterm'
/opt/confluent-2.0.1 # 

Here is the dockerfile I base it all off of.

FROM java-oracle8-alpine #if you need these base images as well I can post them.

RUN apk add --update bash unzip wget curl git jq # good

ENV CONFLUENT_VERSION="2.0.1-2.11.7"
# ENV KAFKA_VERSION="0.8.2.2" SCALA_VERSION="2.11"

RUN cd /opt && \
    mirror="http://packages.confluent.io/archive/2.0/" && \
    url="${mirror}confluent-${CONFLUENT_VERSION}.tar.gz" && \
    curl "${url}" | tar xzf - && \
    curl "http://cdn.mysql.com/Downloads/Connector-J/mysql-connector-java-5.1.38.tar.gz" | tar xzf - && \
    cp ./mysql-connector-java-5.1.38/*.jar ./confluent-2.0.1/share/java/kafka-connect-jdbc/ && \
    curl "https://download.microsoft.com/download/0/2/A/02AAE597-3865-456C-AE7F-613F99F850A8/sqljdbc_6.0.6629.101_enu.tar.gz" | tar xzf - && \
    cp sqljdbc_6.0/enu/sqljdbc42.jar ./confluent-2.0.1/share/java/kafka-connect-jdbc/
    # cd confluent-2.0.1 && \

ENV PATH="/opt/confluent-2.0.1/bin:$PATH"
WORKDIR "/opt/confluent-2.0.1"

ENTRYPOINT ["sh"]

I'm getting the following error while running a single table import test against a distributed setup:

[2016-03-17 20:50:28,093] INFO Joined group and got assignment: Assignment{error=0, leader='connect-1-f84f1dc6-4aab-4af3-ae8b-ae3d1a9d6a24', leaderUrl='http://172.24.0.4:8083/', offset=7, connectorIds=[], taskIds=[test-mysql-jdbc-source-0]} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:868)
[2016-03-17 20:50:28,093] INFO Starting connectors and tasks using config offset 7 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:639)
[2016-03-17 20:50:28,093] INFO Starting task test-mysql-jdbc-source-0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:650)
[2016-03-17 20:50:28,093] INFO TaskConfig values: 
    task.class = class io.confluent.connect.jdbc.JdbcSourceTask
 (org.apache.kafka.connect.runtime.TaskConfig:165)
[2016-03-17 20:50:28,093] INFO Creating task test-mysql-jdbc-source-0 (org.apache.kafka.connect.runtime.Worker:256)
[2016-03-17 20:50:28,094] INFO Instantiated task test-mysql-jdbc-source-0 with version 2.0.1 of type io.confluent.connect.jdbc.JdbcSourceTask (org.apache.kafka.connect.runtime.Worker:267)
[2016-03-17 20:50:28,094] INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:659)
[2016-03-17 20:50:28,094] INFO JdbcSourceTaskConfig values: 
    mode = bulk
    timestamp.column.name = 
    incrementing.column.name = id
    topic.prefix = mysql-jdbc-
    tables = [country]
    poll.interval.ms = 300000
    query = 
    batch.max.rows = 100
    connection.url = jdbc:mysql://mysql:3306/ctliveimport2?useSSL=false&user=****&password=****
    table.blacklist = []
    table.poll.interval.ms = 60000
    table.whitelist = [****]
 (io.confluent.connect.jdbc.JdbcSourceTaskConfig:135)
[2016-03-17 20:50:28,097] INFO Source task Thread[WorkerSourceTask-test-mysql-jdbc-source-0,5,main] finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:342)
[2016-03-17 20:51:28,095] ERROR Unhandled exception when committing WorkerSourceTask{id=test-mysql-jdbc-source-0}:  (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:118)
java.lang.NullPointerException
    at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.set(KafkaOffsetBackingStore.java:121)
    at org.apache.kafka.connect.storage.OffsetStorageWriter.doFlush(OffsetStorageWriter.java:162)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:267)
    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:110)
    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:76)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
[2016-03-17 20:52:28,095] ERROR Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this (org.apache.kafka.connect.storage.OffsetStorageWriter:108)
[2016-03-17 20:52:28,095] ERROR Unhandled exception when committing WorkerSourceTask{id=test-mysql-jdbc-source-0}:  (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:118)
org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing
    at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:110)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:227)
    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:110)
    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:76)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Here is my setup:

#topics
./bin/kafka-topics --zookeeper $ZOOKEEPER_NODES_DNS --create --topic "connect-configs" --replication-factor 1 --partitions 1 \
./bin/kafka-topics --zookeeper $ZOOKEEPER_NODES_DNS --create --topic "connect-offsets" --replication-factor 1 --partitions 16 \

Env

ENV CONSUMER_SCHEMA_REGISTRY_URL="http://schema-registry:8081"
ENV ZOOKEEPER_NODES_DNS="zookeeper1:2181,zookeeper2:2181,zookeeper3:2181"
ENV KAFKA_BROKER_LIST="kafka1:9092,kafka2:9092,kafka3:9092"

Using Connect-Avro-Distributed with:

# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092

# The group ID is a unique identifier for the set of workers that form a single Kafka Connect
# cluster
group.id=connect-cluster

# The converters specify the format of data in Kafka and how to translate it into Connect data.
# Every Connect user will need to configure these based on the format they want their data in
# when loaded from or stored into Kafka
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081

# The internal converter used for offsets and config data is configurable and must be specified,
# but most users will always want to use the built-in default. Offset and config data is never
# visible outside of Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# Kafka topic where connector configuration will be persisted. You should create this topic with a
# single partition and high replication factor (e.g. 3)
config.storage.topic=connect-configs

# Kafka topic where connector offset data will be persisted. You should create this topic with many
# partitions (e.g. 25) and high replication factor (e.g. 3)
offset.storage.topic=connect-offsets

I've made the mysqlJDBC available on the classpath, and am submitting the job to a 3 worker connect setup.
Here is the job I'm submitting.

{
                        "name": "test-mysql-jdbc-source",
                        "config": {
                            "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                            "tasks.max": "10",
                            "topic.prefix": "mysql-jdbc-",
                            "connection.url":"jdbc:mysql://mysql:3306/ctliveimport2?useSSL=false&user=$user&password=$password",
                            "table.whitelist":"smalltable",
                            "poll.interval.ms":300000,
                            "mode":"bulk",
                            "incrementing.column.name":"id"
                        }
                    }

It appears to be related to the OffsetStorageWriter as that is also in the call stack for both errors in the log. It made me think that I had my topics set-up incorrectly. Here is the results of kafka-topic list

-------Listing Topics in Kafka...
./bin/kafka-topics --zookeeper zookeeper1:2181,zookeeper2:2181,zookeeper3:2181 --describe --topic connect-configs
Topic:connect-configs   PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: connect-configs  Partition: 0    Leader: 3   Replicas: 3 Isr: 3
./bin/kafka-topics --zookeeper zookeeper1:2181,zookeeper2:2181,zookeeper3:2181 --describe --topic connect-offsets
Topic:connect-offsets   PartitionCount:16   ReplicationFactor:1 Configs:
    Topic: connect-offsets  Partition: 0    Leader: 1   Replicas: 1 Isr: 1
    Topic: connect-offsets  Partition: 1    Leader: 2   Replicas: 2 Isr: 2
    Topic: connect-offsets  Partition: 2    Leader: 3   Replicas: 3 Isr: 3
    Topic: connect-offsets  Partition: 3    Leader: 1   Replicas: 1 Isr: 1
    Topic: connect-offsets  Partition: 4    Leader: 2   Replicas: 2 Isr: 2
    Topic: connect-offsets  Partition: 5    Leader: 3   Replicas: 3 Isr: 3
    Topic: connect-offsets  Partition: 6    Leader: 1   Replicas: 1 Isr: 1
    Topic: connect-offsets  Partition: 7    Leader: 2   Replicas: 2 Isr: 2
    Topic: connect-offsets  Partition: 8    Leader: 3   Replicas: 3 Isr: 3
    Topic: connect-offsets  Partition: 9    Leader: 1   Replicas: 1 Isr: 1
    Topic: connect-offsets  Partition: 10   Leader: 2   Replicas: 2 Isr: 2
    Topic: connect-offsets  Partition: 11   Leader: 3   Replicas: 3 Isr: 3
    Topic: connect-offsets  Partition: 12   Leader: 1   Replicas: 1 Isr: 1
    Topic: connect-offsets  Partition: 13   Leader: 2   Replicas: 2 Isr: 2
    Topic: connect-offsets  Partition: 14   Leader: 3   Replicas: 3 Isr: 3
    Topic: connect-offsets  Partition: 15   Leader: 1   Replicas: 1 Isr: 1
./bin/kafka-topics --zookeeper zookeeper1:2181,zookeeper2:2181,zookeeper3:2181 --describe --topic metrics
Topic:metrics   PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: metrics  Partition: 0    Leader: 2   Replicas: 2 Isr: 2
-------End Listing...

Any ideas?

Database schema support

I have a Postgres database with multiple tables of the same name, but in different schemas. I'm not seeing a way to specify the schemas for the tables in the properties file, and if I provide the entire "schema.tablename" as the name in the properties file that doesn't seem to work either.

It looks like there are a couple of problems causing this. First, JdbcUtils.getTables returns just a list of table names (without the schema name prepended). Second, also in JdbcUtils, whenever DatabaseMetaData.getTables is called, it's always called with null as the second parameter for the schema, which results in all of the schemas being searched.

I think the fix would be allowing for the "schema.tablename" syntax in the table whitelist or blacklist, and then parsing schema out of this string. TableQuerier would also need an additional field for this schema in order to pass this schema along to JdbcUtils.

I've got a proof of concept working, but I was curious if this was a known issue or if support would be added soon?

Thanks

Allow overriding the default SELECT * queries

Currently SELECT * is hard coded. The user should be able to override this, allowing them to filter columns, join with other tables, etc. This probably means an alternate mode where the connector doesn't divvy up tables anymore since the query could potentially do things like join tables such that the result set is no longer tied to a single table.

This may affect some other features, e.g. if keys are generated from primary keys by default, a join query could break that functionality.

Schema Error

Hi,

I'm running into this

    org.apache.kafka.connect.errors.DataException: Unknown Java type for schemaless data: class org.apache.kafka.connect.data.Struct

when trying to extract a simepl SQLITE table. I am using the the 0.9 KAFKA Snapshot and the Confluent 2.0 SNAPSHOT

Any idea? Lib mismatches?

Thank you,
Artyom

ORA-01000: maximum open cursors exceeded

I'm using kafka connect (Confluent platform v2.0.1) to pull data from Oracle db and getting the below error. Is there a connection leak?. Here's my job configuration:

{
  "name": "oracle-db-test",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": "3",
    "connection.url": "jdbc:oracle:thin:user/password@hostname:port:dbname",
    "query": "SELECT * FROM TEST_DB",
    "mode": "timestamp",
    "timestamp.column.name": "MODIFIED_DATE",
    "topic.prefix": "test-topic"
  }
}

Error:

ERROR Error while trying to get updated table list, ignoring and waiting for next table poll interval (io.confluent.connect.jdbc.TableMonitorThread:107)
java.sql.SQLException: ORA-01000: maximum open cursors exceeded

    at oracle.jdbc.driver.SQLStateMapping.newSQLException(SQLStateMapping.java:70)
    at oracle.jdbc.driver.DatabaseError.newSQLException(DatabaseError.java:131)
    at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:204)
    at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:455)
    at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:413)
    at oracle.jdbc.driver.T4C8Oall.receive(T4C8Oall.java:1034)
    at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:194)
    at oracle.jdbc.driver.T4CPreparedStatement.executeForDescribe(T4CPreparedStatement.java:791)
    at oracle.jdbc.driver.T4CPreparedStatement.executeMaybeDescribe(T4CPreparedStatement.java:866)
    at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1186)
    at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3387)
    at oracle.jdbc.driver.OraclePreparedStatement.executeQuery(OraclePreparedStatement.java:3431)
    at oracle.jdbc.driver.OraclePreparedStatementWrapper.executeQuery(OraclePreparedStatementWrapper.java:1203)
    at oracle.jdbc.OracleDatabaseMetaData.getTables(OracleDatabaseMetaData.java:2503)
    at io.confluent.connect.jdbc.JdbcUtils.getTables(JdbcUtils.java:90)
    at io.confluent.connect.jdbc.JdbcUtils.getTables(JdbcUtils.java:79)
    at io.confluent.connect.jdbc.TableMonitorThread.updateTables(TableMonitorThread.java:105)
    at io.confluent.connect.jdbc.TableMonitorThread.run(TableMonitorThread.java:61)

error 500

i use kafka_2.11-0.9.0.1 and debezium-connector-mysql-0.2.1-plugin ,i set the rest.port is 8011,iand i can use http://IP:8011/connectors get the connector info
and its works well ,and when i use kafka_2.11-0.10.0.0 instead of 0.9 ,same setting ,and i got 500 error like
[2016-06-27 14:24:15,646] INFO ip - - [27/Jun/2016:18:22:45 +0000] "GET /connectors HTTP/1.1" 500 48 90002 (org.apache.kafka.connect.runtime.rest.RestServer:60)

and no others info,any body got the same problem?

Support dynamic sets of tables

If the user does a CREATE TABLE or DROP TABLE, the Connector should pick this up and reconfigure tasks appropriately. This requires some polling in the Connector on a background thread to periodically check whether the table set has changed.

As part of this, we should verify the Tasks gracefully handle the DROP TABLE case. This should just result in a SQLException which we need to handle anyway, but we might be able to handle this specific case more gracefully if we can narrow the exception to being caused by a dropped table.

Issue with Oracle column type NUMBER set as "Incrementing Column" in Kafka Connect

Kafka-connect-jdbc program has an issue with Oracle Table that has a Primary Key of Type NUMBER and if we use that for Column as Incrementing (in the mode settings of the Kafka-Connect program).

It appears (from our debugging efforts) we see that NUMBER => java.math.BigDecimal => NUMERIC => BYTES in the kafka-connect-jdbc code. Essentially, the SchemaBuilder.java sets it to Type.BYTES.

And as result, we see the error "Invalid type of Incrementing column: BYTES" when we connect to the Oracle Database. This is a rather severe limitation for the Kafka Connect to be able to work with Oracle Database as most of the PK columns are of type NUMBER

Steps to Reproduce this Issue:

  • Set-up a table in Oracle
create table TEST_KAFKA_CONNECT {
KAKFA_PK NUMBER not null, UPDATED_AT timestamp not null } 
  • Configure the Kafka-Connect
mode=timestamp+incrementing
incrementing.column.name=KAKFA_PK
timestamp.column.name=UPDATED_AT 

In the Kafka-Connect-JDBC use the Oracle's JDBC driver (tested with Oracle 11.2.0.2.0; driver corresponding Oracle JDBC driver available here: http://www.oracle.com/technetwork/apps-tech/jdbc-112010-090769.html

CC-375: Option to include key in SourceRecord

Currently it appears that when a new SourceRecord is created, the keySchema and key fields are left null, which results in the message sent to the Kafka topic also having a null key. It would be useful to have the option for the JDBC Connector to use something as the message key, e.g. the primary key id value. If compaction is enabled on this Kafka topic, then the most recent changes to each database table row are replicated in the topic and consumer may then fully rebuild the current state of the table. Otherwise, the message will be removed from the Kafka topic after the retention period passes, and the database change will be lost from the topic.

To achieve this currently, a consumer would need to extract the primary key id out of the message value, and write the key & value to a 2nd Kafka topic.

I'm sure there is complexity around which columns to use as the message key and how to configure this, but it would be nice functionality to have.

generate duplicate fields in schema for mssql

when I use jdbc connect for mssql, schema has duplicate fields. like this;

{"schema":{"type":"struct", "fields":[{"type":"int8", "optional":false, "field":"paidFlag"}, {"type":"int16", "optional":false, "field":"portNo"}, {"type":"int16", "optional":false, "field":"portNo"}, 
{"type":"int16", "optional":false, "field":"portNo"}, .... ], "optional":false, "name":"MyServer"}, "payload":{ ...}

I found duplicate "portNo" fields in result schema.
I tried to connect another table, I found same problems in schema, but payload values are correct.

How can I resolve it?
Thanks.

timestamp or timestamp+incrementing mode infinitely produces last record into topic

It might not be an issue but my bad configuration, but I can reproduce it with default configs.

Postgres configs

name=demo-postgresql
tasks.max=1

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.url=jdbc:postgresql://172.17.0.1:5433/sc_orders?user=postgres&password=system

table.whitelist=bpo_customer
mode=incrementing

incrementing.column.name=updated_at
timestamp.column.name=updated_at

topic.prefix=test_jdbc_distr_

Connector configs without avro

bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data.
# Every Connect user will need to configure these based on the format they want their data in
# when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schems.enable=false

# The internal converter used for offsets and config data is configurable and must be specified,
# but most users will always want to use the built-in default. Offset and config data is never
# visible outside of Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# Local storage file for offset data
offset.storage.file.filename=/tmp/connect.offsets

Run script

bin/connect-standalone ~/kafka/connect-json-standalone.properties ~/kafka/postgresql.properties

Result of the kafka-console-consumer

{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"field":"bpo_id"},{"type":"string","optional":true,"field":"bpo_id"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"updated_at"},{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"bpo_customer"},"payload":{"customer_id":"14","bpo_id":"14","updated_at":1469011811729,"id":17}}
{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"field":"bpo_id"},{"type":"string","optional":true,"field":"bpo_id"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"updated_at"},{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"bpo_customer"},"payload":{"customer_id":"14","bpo_id":"14","updated_at":1469011811729,"id":17}}
{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"field":"bpo_id"},{"type":"string","optional":true,"field":"bpo_id"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"updated_at"},{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"bpo_customer"},"payload":{"customer_id":"14","bpo_id":"14","updated_at":1469011811729,"id":17}}

The latest record infinitely produces into topic. In case of mode incrementing it works as expected no duplicates.

Maybe I'm doing smth wrong or it's a feature of timestamp implementation?

Integration with Connect sinks

Hi,

Sorry for posting this here, but not sure where to ask questions about this.
Im trying to get the connector to work with the 0.9 Kafka but with the current Confluent registry and serializers.
I can get new rows pushed to Kafka and can see them using a normal console consumer, but say if I try the Kafka connect console sink or file sink I get

java.lang.StackOverflowError
        at io.confluent.connect.avro.AvroData.toConnectSchema

I would really appreciate the help, this is basically the only thing that currently supports our usecase.

Artyom

JDBC connector running with Kafka TLS failing to push messages to brokers.

Hi,

When try to run JDBC connector with kafka TLS ports, it starts failing. How ever it does create a config and Offset topics. Unable to put message to database table topic.

ERROR Failed to flush WorkerSourceTask{id=******}, timed out while waiting for producer to flush outstanding messages, 1 left ({ProducerRecord(topic=Newtest-zip, partition=null, key=null, value=[B@6232c19=ProducerRecord(topic=Newtest-zip, partition=null, key=null, value=[B@6232c19}) (org.apache.kafka.connect.runtime.WorkerSourceTask:237)

Let us know if i am missing any thing here.

java.lang.OutOfMemoryError

I running kafka-connect jdbc on one table, for the first time, and I am quickly running out of memory even though I have set:

batch.max.rows = 1
tasks.max = 1

It looks like the batch limit is not being effective in my case (I am running it against a postgres database).

Does the batch limit apply the very first time kafka-connect runs?

Looks like we are not handling some SQLExceptions correctly

I observed a case where disconnect from MySQL caused us to poll() in a tight loop, returning null every time. We should probably figure out unrecoverable SQL errors and either reconnect ourselves or kill the task so another worker can take over.

[2016-01-29 23:46:30,118] ERROR Failed to run query for table TimestampIncrementingTableQuerier{name='bar', query='null', topicPrefix='test-', timestampColumn='null', incrementingColumn='id'}: com.mysql.jdbc.exceptions.jdbc4.MySQLQueryInterruptedException: Query execution was interrupted (io.confluent.connect.jdbc.JdbcSourceTask)
[2016-01-29 23:46:30,122] ERROR Failed to run query for table TimestampIncrementingTableQuerier{name='bar', query='null', topicPrefix='test-', timestampColumn='null', incrementingColumn='id'}: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet successfully received from the server was 9 milliseconds ago.  The last packet sent successfully to the server was 3 milliseconds ago. (io.confluent.connect.jdbc.JdbcSourceTask)
[2016-01-29 23:46:30,122] ERROR Failed to run query for table TimestampIncrementingTableQuerier{name='bar', query='null', topicPrefix='test-', timestampColumn='null', incrementingColumn='id'}: java.sql.SQLException: No operations allowed after statement closed. (io.confluent.connect.jdbc.JdbcSourceTask)
[2016-01-29 23:46:30,122] ERROR Failed to run query for table TimestampIncrementingTableQuerier{name='bar', query='null', topicPrefix='test-', timestampColumn='null', incrementingColumn='id'}: java.sql.SQLException: No operations allowed after statement closed. (io.confluent.connect.jdbc.JdbcSourceTask)
[2016-01-29 23:46:30,122] ERROR Failed to run query for table TimestampIncrementingTableQuerier{name='bar', query='null', topicPrefix='test-', timestampColumn='null', incrementingColumn='id'}: java.sql.SQLException: No operations allowed after statement closed. (io.confluent.connect.jdbc.JdbcSourceTask)

The last error repeats in tight loop forever.

Primary key not being used for kafka key

As far as I can tell, there is no key being used for the records being inserted into Kafka when using this connector. This seems problematic for a number of reasons, one being that log compaction is not going to work in the natural way where a single database row's changes will be compacted. I have a PR together that adds support for checking the primary key of the table and using the value of that column as the key for the records, but wanted to see if I was missing something.

Add some basic system tests

The unit tests cover the functionality of the connector using Derby, but we should also have integration tests. These would cover some end-to-end tests validating the functionality against a real database. It might also be useful to test a couple of the major database options, e.g. MySQL and Postgres, since JDBC connectors vary pretty widely in quality and behavior.

Security

This isn't really an issue--more of an inquiry. We're poking at Kafka connect. Is there anything special that needs to happen to make the JDBC connector send to Kafka brokers with security enabled?

KAFKA Connect - Distributed

Has anybody tried executing this JDBC connect in distributed mode with multiple workers. I am looking for a process to split the task assignment across different workers\connectors.

regards
Saravanan

JDBC connector currently does not support partitioned topics.

JDBC connector currently does not support partitioned topics. I think JDBC connector should have a default partitioning logic to publish the messages to the different partitions in the topic. It should also support a way to plug-in any custom partitioning logic.

Wrong Kafka Version

In pom.xml there is version 0.10.1.0-SNAPHOT and it is not present in confluent repo. When I try to compile project I get:

Downloading: http://packages.confluent.io/maven/org/apache/kafka/connect-api/0.10.1.0-SNAPSHOT/maven-metadata.xml
Downloading: http://packages.confluent.io/maven/org/apache/kafka/connect-api/0.10.1.0-SNAPSHOT/connect-api-0.10.1.0-SNAPSHOT.pom
[WARNING] The POM for org.apache.kafka:connect-api:jar:0.10.1.0-SNAPSHOT is missing, no dependency information available
Downloading: http://packages.confluent.io/maven/org/apache/kafka/connect-api/0.10.1.0-SNAPSHOT/connect-api-0.10.1.0-SNAPSHOT.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 0.871 s
[INFO] Finished at: 2016-05-18T18:23:26+07:00
[INFO] Final Memory: 11M/309M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal on project kafka-connect-jdbc: Could not resolve dependencies for project io.confluent:kafka-connect-jdbc:jar:3.1.0-SNAPSHOT: Could not find artifact org.apache.kafka:connect-api:jar:0.10.1.0-SNAPSHOT in confluent (http://packages.confluent.io/maven/) -> [Help 1]

I don't know where to find proper version so this is why I report it here.

Reading from Kafka Topic for Custom Sink Connector

I have written custom sink connector to database using sink connectors. But the problem i foresee is sink connector is able to fetch topic message only if the topic is available. If the topic is created after the start of sink connector. It's not able to fetch topic messages. Is there any solution to handle this?

Looks like now it reads the topic after 5 minutes...how do i change the default consumer config values?

Table whitelist/blacklist options

The user should be able to filter the set of tables down, either by a whitelist (if they only want a couple of tables) or blacklist (if they just want to not load a couple).

Build Dependency Issue

$ mvn clean package

[INFO] Scanning for projects...
[INFO]                                                                         
[INFO] ------------------------------------------------------------------------
[INFO] Building kafka-connect-jdbc 3.0.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[WARNING] The POM for org.apache.kafka:connect-api:jar:0.10.0.0-SNAPSHOT is missing, no dependency information available
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 0.921 s
[INFO] Finished at: 2016-04-28T16:57:25-04:00
[INFO] Final Memory: 7M/78M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal on project kafka-connect-jdbc: Could not resolve dependencies for 
project io.confluent:kafka-connect-jdbc:jar:3.0.0-SNAPSHOT: Failure to find 
org.apache.kafka:connect-api:jar:0.10.0.0-SNAPSHOT in http://packages.confluent.io/maven/ was 
cached in the local repository, resolution will not be reattempted until the update interval of confluent 
has elapsed or updates are forced -> [Help 1]
[ERROR] 
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR] 
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException

ids, timestamps, and transactions

This isn't so much an issue as it is a question. Have you guys put any thought into how to handle transactions committing IDs and timestamps out-of-order? Let me give an example with two connections.

1: begin
1: insert (get id 1)
2: begin
2: insert (get id 2)
2: commit (recording id 2)
kafka-connect-jdbc runs and thinks it has handled everything through id 2
1: commit (recording id 1)

This would result in kafka-connect-jdbc missing id 1. The same thing could happen with timestamps. I've read through some of the kafka-connect-jdbc code and I think it may be susceptible to this problem, but I haven't run it or verified that it would be an issue. Has this come up before? Are there plans to deal with this situation?

Obviously something like bottled-water for pg would handle this nicely as it would get the changes once they're committed.

SQL Server Change tracking + bulk copy

Hi All,

SQL Server (and maybe other vendors) support product specific change tracking features, without adding any column/field, but using an internal version number, not part of the data.

https://msdn.microsoft.com/en-us/en-en/library/bb934145%28v=sql.120%29.aspx

This allow to track updated, inserts and deletes of any existing schema without any modification and in a very optimal way. So any table with identity column would automatically became "Timestamp and Incrementing" but the timestamp would be the internal version number of the DB instead of a column.

Also Microsoft official java (type 4) driver supports bulk import into the DB, which is a big performance improvement for that particular engine.

Makes sense? not in favor of doing any vendor specific feature?

Thanks in advance

Jose Luis

Can not set multiple timestamp columns

Hi,

I try to use Kafka Connect to dump mysql data to Kafka, but I found that timestamp.column.name only accept one column, but in my case, I have three columns, no matter which one modified, new data will be updated to Kafka.

unable to create jar of kafka-connect-jdbc 2.1.0-SNAPSHOT

I am unable to create jar of kafka-connect-jdbc 2.1.0-SNAPSHOT
When i run mvn clean install it throws the following error
C:\Users\E23228\Desktop\kafka-connect-jdbc-master>mvn clean install
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building kafka-connect-jdbc 2.1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
Downloading: https://nexus:8443/nexus/content/groups/public/io/confluent/common-config/2.1.0-SNAPSHOT/maven-metadata.xml
Downloading: https://nexus:8443/nexus/content/groups/public/io/confluent/common-config/2.1.0-SNAPSHOT/common-config-2.1.0-SNAPSHOT.pom
[WARNING] The POM for io.confluent:common-config:jar:2.1.0-SNAPSHOT is missing, no dependency information available
Downloading: https://nexus:8443/nexus/content/groups/public/io/confluent/common-config/2.1.0-SNAPSHOT/common-config-2.1.0-SNAPSHOT.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 9.212s
[INFO] Finished at: Mon Feb 15 14:25:20 IST 2016
[INFO] Final Memory: 9M/123M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal on project kafka-connect-jdbc: Could not resolve dependencies for project io.confluent:kafka-connect-jdbc:jar:2.1.0-SNAPSHOT: Could not find artifact io.confluent:common-config:jar:2.1.0-SNAPSHOT in NexusExternal (https://nexus:8443/nexus/content/groups/public) -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException

I am unable to find the io.confluence in maven repository too..

Date type is converted to integer

I am using DB2.
As the column type Date is transfered to integer, so after the data into Hive, it is a number.
why not puting the date into Hive directly ?
Very thanks.

End to end connect setup?

Hello,

Very anxious to begin using this for SQL Server. I'm struggling to find decent instructions for setting this up end to end. Many instructions seem to be missing the actual substantial information for getting this going. Any tips?

Add support for larger tinyint column values - mysql

We have some tables in our legacy mysql schema which have rows with tinyint column values greater than 127. I think the connector is converting TINYINT to java byte ranging from -127 to 127.
https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/DataConverter.java#L291
Can their be a fix to address TINYINT column having larger values.

According to the JDBC documentation
The recommended Java mapping for the JDBC TINYINT type is as either a Java byte or a Java short. The 8-bit Java byte type represents a signed value from -128 to 127, so it may not always be appropriate for larger TINYINT values, whereas the 16-bit Java short will always be able to hold all TINYINT values.

Make schema name configurable when in query mode

Currently when the JDBC connector is running in query mode the schema name defaults to null which causes the AvroConverter to fill in ConnectDefault. I would suggest adding a configuration property query.schema.name for the connector to be able to change this.

Prepared statement breaks on PostgreSQL when schema changed

There is an issue when using prepared statements on PostgreSQL with wildcard (SELECT *) queries -- if the schema is changed so the query columns will change, then the following error occurs:

ERROR Failed to run query for table TimestampIncrementingTableQuerier{name='spaces', query='null', topicPrefix='positron-db-', timestampColumn='updated_at', incrementingColumn='null'}: {} (io.confluent.connect.jdbc.JdbcSourceTask:239)
org.postgresql.util.PSQLException: ERROR: cached plan must not change result type
        at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2182)
        at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1911)
        at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:173)
        at org.postgresql.jdbc2.AbstractJdbc2Statement.execute(AbstractJdbc2Statement.java:645)
        at org.postgresql.jdbc2.AbstractJdbc2Statement.executeWithFlags(AbstractJdbc2Statement.java:495)
        at org.postgresql.jdbc2.AbstractJdbc2Statement.executeQuery(AbstractJdbc2Statement.java:380)

PR available at #68

Oracle Column Type Number Not Converted Back From Bytes

I am getting unicode output from the avro-console-consumer for records that were inserted into number columns. Sent to Kafka using JDBCSourceConnector.

Steps to Reproduce:

  1. I started zookeeper, single kafka server, and schema registry
  2. I created simple table in oracle, with a couple columns of type number.
    create table employees( empno Number(5), ename varchar2(20), created timestamp default sysdate);
  3. I then inserted some records into the oracle table.
    insert into employees(empno, ename) values(1, 'bob');
    insert into employees(empno, ename) values(2, 'alice');
  4. I started the kafka-avro-consumer and received the following as output:
    {"EMPNO":{"bytes":"\u0002"},"ENAME":{"string":"alice"},"CREATED":{"long":1452175115000}}
    {"EMPNO":{"bytes":"\u0001"},"ENAME":{"string":"bob"},"CREATED":{"long":1452175115000}}

Appears Conversions of column type number are not working properly in Connector

Add support for string incrementing columns

Would it be possible to add support for using a string column as the incrementing column? Looking at the code a little it seems like it might be as easy as making TimestampIncrementingTableQuerier generic.

Mode=timestamp+incrementing always return the last row

db log show the timestamp used in the query only use up to millisecond. The data I have contains microseconds and the missing microseconds cause the last row to be returned over and over again

db log

2016-04-23 03:52:22 UTC LOG: execute S_1: SELECT * FROM "loyalty_program" WHERE "updated_at" < CURRENT_TIMESTAMP AND (("updated_at" = $1 AND "id" > $2) OR "updated_at" > $3) ORDER BY "updated_at","id" ASC
2016-04-23 03:52:22 UTC DETAIL: parameters: $1 = '2016-03-19 05:00:02.856', $2 = '12', $3 = '2016-03-19 05:00:02.856'

data in the database

2016-03-19 05:00:02.856665 | 2016-03-19 05:00:02.856665 | 12 | Sed eos harum eum consequuntur aut numquam. | Nesciunt eum dolorum repudiandae mollitia aliquam provident enim neque. Sequi commodi incidunt aperiam quam et amet unde omnis. Iure quas eum dolorem fuga debitis. Dolorem necessitatibus reprehenderit deserunt exercitationem omnis qui exercitationem.

convertRecord method in the DataConverter class is not thread-safe

The convertRecord method in the DataCoverter class is not thread-safe. A shared mutable Calendar instance is being used in the method to convert to the UTC time zone. So, if there are multiple tasks being used in the JDBC connector, this causes a race condition among the tasks trying to fetch data from timestamp or date type columns of different tables resulting in an incorrect column value being returned as a result.

BigDecimal has mismatching scale value for given Decimal schema

Used the latest kafka-connect-jdbc-2.1.0-SNAPSHOT jar but I am unable to parse the Number format of oracle db. It gives the following exception:

org.apache.kafka.connect.errors.DataException: BigDecimal has mismatching scale value for given Decimal schema
at org.apache.kafka.connect.data.Decimal.fromLogical(Decimal.java:69)
at io.confluent.connect.avro.AvroData$5.convert(AvroData.java:218)
at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:318)
at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:450)
at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:267)
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:90)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:142)
at org.apache.kafka.connect.runtime.WorkerSourceTask.access$600(WorkerSourceTask.java:50)
at org.apache.kafka.connect.runtime.WorkerSourceTask$WorkerSourceTaskThread.execute(WorkerSourceTask.java:356)
at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)

It seems that the latest version has fixed the issue but even after using the latest jar I am facing the same issue.

Please help..

Please support Postgres jsonb

Getting this warning for a table with a JSONB column:

WARN JDBC type 1111 not currently supported

I haven't tried regular JSON.

+1 for supporting Postgres boolean type:

WARN Ignoring record due to SQL error: (io.confluent.connect.jdbc.JdbcSourceTask:73)
org.postgresql.util.PSQLException: Bad value for type byte : f

Thanks in advance!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.