Giter Club home page Giter Club logo

postgresql-async's Introduction

Build Status This project is not being maintained anymore, feel free to fork and work on it

The main goal for this project is to implement simple, async, performant and reliable database drivers for PostgreSQL and MySQL in Scala. This is not supposed to be a JDBC replacement, these drivers aim to cover the common process of send a statement, get a response that you usually see in applications out there. So it's unlikely there will be support for updating result sets live or stuff like that.

This project always returns JodaTime when dealing with date types and not the java.util.Date class.

If you want information specific to the drivers, check the PostgreSQL README and the MySQL README.

You can view the project's CHANGELOG here.

Abstractions and integrations

  • Activate Framework - full ORM solution for persisting objects using a software transactional memory (STM) layer;
  • ScalikeJDBC-Async - provides an abstraction layer on top of the driver allowing you to write less SQL and make use of a nice high level database access API;
  • mod-mysql-postgresql - vert.x module that integrates the driver into a vert.x application;
  • dbmapper - enables SQL queries with automatic mapping from the database table to the Scala class and a mechanism to create a Table Date Gateway model with very little boiler plate code;
  • Quill - A compile-time language integrated query library for Scala.

Include them as dependencies

And if you're in a hurry, you can include them in your build like this, if you're using PostgreSQL:

"com.github.mauricio" %% "postgresql-async" % "0.2.21"

Or Maven:

<dependency>
  <groupId>com.github.mauricio</groupId>
  <artifactId>postgresql-async_2.11</artifactId>
  <version>0.2.21</version>
</dependency>

respectively for Scala 2.12:

<dependency>
  <groupId>com.github.mauricio</groupId>
  <artifactId>postgresql-async_2.12</artifactId>
  <version>0.2.21</version>
</dependency>

And if you're into MySQL:

"com.github.mauricio" %% "mysql-async" % "0.2.21"

Or Maven:

<dependency>
  <groupId>com.github.mauricio</groupId>
  <artifactId>mysql-async_2.11</artifactId>
  <version>0.2.21</version>
</dependency>

respectively for Scala 2.12:

<dependency>
  <groupId>com.github.mauricio</groupId>
  <artifactId>mysql-async_2.12</artifactId>
  <version>0.2.21</version>
</dependency>

Database connections and encodings

READ THIS NOW

Both clients will let you set the database encoding for something else. Unless you are 1000% sure of what you are doing, DO NOT change the default encoding (currently, UTF-8). Some people assume the connection encoding is the database or columns encoding but IT IS NOT, this is just the connection encoding that is used between client and servers doing communication.

When you change the encoding of the connection you are not affecting your database's encoding and your columns WILL NOT be stored with the connection encoding. If the connection and database/column encoding is different, your database will automatically translate from the connection encoding to the correct encoding and all your data will be safely stored at your database/column encoding.

This is as long as you are using the correct string types, BLOB columns will not be translated since they're supposed to hold a stream of bytes.

So, just don't touch it, create your tables and columns with the correct encoding and be happy.

Prepared statements gotcha

If you have used JDBC before, you might have heard that prepared statements are the best thing on earth when talking to databases. This isn't exactly true all the time (as you can see on this presentation by @tenderlove) and there is a memory cost in keeping prepared statements.

Prepared statements are tied to a connection, they are not database-wide, so, if you generate your queries dynamically all the time you might eventually blow up your connection memory and your database memory.

Why?

Because when you create a prepared statement, locally, the connection keeps the prepared statement description in memory. This can be the returned columns information, input parameters information, query text, query identifier that will be used to execute the query and other flags. This also causes a data structure to be created at your server for every connection.

So, prepared statements are awesome, but are not free. Use them judiciously.

What are the design goals?

  • fast, fast and faster
  • small memory footprint
  • avoid copying data as much as possible (we're always trying to use wrap and slice on buffers)
  • easy to use, call a method, get a future or a callback and be happy
  • never, ever, block
  • all features covered by tests
  • next to zero dependencies (it currently depends on Netty, JodaTime and SFL4J only)

What is missing?

  • more authentication mechanisms
  • benchmarks
  • more tests (run the jacoco:cover sbt task and see where you can improve)
  • timeout handler for initial handshare and queries

How can you help?

  • checkout the source code
  • find bugs, find places where performance can be improved
  • check the What is missing piece
  • check the issues page for bugs or new features
  • send a pull request with specs

Main public interface

Connection

Represents a connection to the database. This is the root object you will be using in your application. You will find three classes that implement this trait, PostgreSQLConnection, MySQLConnection and ConnectionPool. The difference between them is that ConnectionPool is, as the name implies, a pool of connections and you need to give it an connection factory so it can create connections and manage them.

To create both you will need a Configuration object with your database details. You can create one manually or create one from a JDBC or Heroku database URL using the URLParser object.

QueryResult

It's the output of running a statement against the database (either using sendQuery or sendPreparedStatement). This object will contain the amount of rows, status message and the possible ResultSet (Option[ResultSet]) if the query returns any rows.

ResultSet

It's an IndexedSeq[Array[Any]], every item is a row returned by the database. The database types are returned as Scala objects that fit the original type, so smallint becomes a Short, numeric becomes BigDecimal, varchar becomes String and so on. You can find the whole default transformation list at the project specific documentation.

Prepared statements

Databases support prepared or precompiled statements. These statements allow the database to precompile the query on the first execution and reuse this compiled representation on future executions, this makes it faster and also allows for safer data escaping when dealing with user provided data.

To execute a prepared statement you grab a connection and:

val connection : Connection = ...
val future = connection.sendPreparedStatement( "SELECT * FROM products WHERE products.name = ?", Array( "Dominion" ) )

The ? (question mark) in the query is a parameter placeholder, it allows you to set a value in that place in the query without having to escape stuff yourself. The driver itself will make sure this parameter is delivered to the database in a safe way so you don't have to worry about SQL injection attacks.

The basic numbers, Joda Time date, time, timestamp objects, strings and arrays of these objects are all valid values as prepared statement parameters and they will be encoded to their respective database types. Remember that not all databases are created equal, so not every type will work or might work in unexpected ways. For instance, MySQL doesn't have array types, so, if you send an array or collection to MySQL it won't work.

Remember that parameters are positional the order they show up at query should be the same as the one in the array or sequence given to the method call.

Transactions

Both drivers support transactions at the database level, the isolation level is the default for your database/connection, to change the isolation level just call your database's command to set the isolation level for what you want.

Here's an example of how transactions work:

  val future = connection.inTransaction {
    c =>
    c.sendPreparedStatement(this.insert)
     .flatMap( r => c.sendPreparedStatement(this.insert))
  }

The inTransaction method allows you to execute a collection of statements in a single transactions, just use the connection object you will receive in your block and send your statements to it. Given each statement causes a new future to be returned, you need to flatMap the calls to be able to get a Future[T] instead of Future[Future[...]] back.

If all futures succeed, the transaction is committed normally, if any of them fail, a rollback is issued to the database. You should not reuse a database connection that has rolled back a transaction, just close it and create a new connection to continue using it.

Example usage (for PostgreSQL, but it looks almost the same on MySQL)

You can find a small Play 2 app using it here and a blog post about it here.

In short, what you would usually do is:

import com.github.mauricio.async.db.postgresql.PostgreSQLConnection
import com.github.mauricio.async.db.postgresql.util.URLParser
import com.github.mauricio.async.db.util.ExecutorServiceUtils.CachedExecutionContext
import com.github.mauricio.async.db.{RowData, QueryResult, Connection}
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}

object BasicExample {

  def main(args: Array[String]) {

    val configuration = URLParser.parse("jdbc:postgresql://localhost:5233/my_database?user=postgres&password=somepassword")
    val connection: Connection = new PostgreSQLConnection(configuration)

    Await.result(connection.connect, 5 seconds)

    val future: Future[QueryResult] = connection.sendQuery("SELECT 0")

    val mapResult: Future[Any] = future.map(queryResult => queryResult.rows match {
      case Some(resultSet) => {
        val row : RowData = resultSet.head
        row(0)
      }
      case None => -1
    }
    )

    val result = Await.result( mapResult, 5 seconds )

    println(result)

    connection.disconnect

  }

}

First, create a PostgreSQLConnection, connect it to the database, execute queries (this object is not thread safe, so you must execute only one query at a time) and work with the futures it returns. Once you are done, call disconnect and the connection is closed.

You can also use the ConnectionPool provided by the driver to simplify working with database connections in your app. Check the blog post above for more details and the project's ScalaDocs.

LISTEN/NOTIFY support (PostgreSQL only)

LISTEN/NOTIFY is a PostgreSQL-specific feature for database-wide publish-subscribe scenarios. You can listen to database notifications as such:

  val connection: Connection = ...

  connection.sendQuery("LISTEN my_channel")
  connection.registerNotifyListener {
    message =>
    println(s"channel: ${message.channel}, payload: ${message.payload}")
  }

Contributing

Contributing to the project is simple, fork it on Github, hack on what you're insterested in seeing done or at the bug you want to fix and send a pull request back. If you thing the change is too big or requires architectural changes please create an issue before you start working on it so we can discuss what you're trying to do.

You should be easily able to build this project in your favorite IDE since it's built by SBT using a plugin that generates your IDE's project files. You can use sbt-idea for IntelliJ Idea and sbteclipse for Eclipse integration.

Check our list of contributors!

Licence

This project is freely available under the Apache 2 licence, fork, fix and send back :)

postgresql-async's People

Contributors

antonzherdev avatar darthdeus avatar dboissin avatar devsprint avatar domdorn avatar dragisak avatar dylex avatar fwbrasil avatar gitter-badger avatar golem131 avatar guilherme avatar haski avatar kxbmap avatar lifey avatar lpiepiora avatar magro avatar mauricio avatar mchunkytrunk avatar mpain avatar mst-appear avatar njeuk avatar nkgm avatar nyavro avatar pitr avatar sattailanfear avatar seratch avatar theon avatar unhydrous avatar vietj avatar xuwei-k avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

postgresql-async's Issues

Console warnings in "hello world" app.

Decided to give the driver a go. But getting warnings. There is a bazillion of these warnings when some actors go down in an akka app, and they show up in this hello world chunk of code as well.

Test extends App {
    println("starting")
    val configuration = URLParser.parse("jdbc:postgresql://localhost/crawler?user=hassan&password=")
    val connection = new PostgreSQLConnection(configuration)
    Await.result(connection.connect, 5.seconds)

    println("connect")


    val fut =  connection.sendQuery("select 229;")
    val actualResult = Await.result(fut,5 seconds)

    println(actualResult.rows.map(_.head).map(_.apply(0)).get)

    val res = Await.result(connection.disconnect,5.seconds)
    println("disconnect")
    Thread.sleep(5000)
}

Warning :

18:59:21.236 [pool-6-thread-1] WARN  io.netty.channel.nio.NioEventLoop - Selector.select() returned prematurely 512 times in a row; rebuilding selector.
18:59:21.236 [pool-6-thread-1] INFO  io.netty.channel.nio.NioEventLoop - Migrated 0 channel(s) to the new Selector.
[success] Total time: 10 s, completed 08-Dec-2013 18:59:21
18:59:21.242 [pool-6-thread-1] WARN  io.netty.channel.nio.NioEventLoop - Selector.select() returned prematurely 512 times in a row; rebuilding selector.
18:59:21.242 [pool-6-thread-1] INFO  io.netty.channel.nio.NioEventLoop - Migrated 0 channel(s) to the new Selector.

[mysql] exception on selecting with 10 columns

mysql-async 0.2.11 with mariadb 10.0.7 on centos 6.5

create table test_10 (
    id int primary key not null,
    c2 text not null, c3 text not null, c4 text not null,
    c5 text not null, c6 text not null, c7 text not null,
    c8 text not null, c9 text not null, c10 text not null
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

create table test_9 (
    id int primary key not null,
    c2 text not null, c3 text not null, c4 text not null,
    c5 text not null, c6 text not null, c7 text not null,
    c8 text not null, c9 text not null
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

create table test_11 (
    id int primary key not null,
    c2 text not null, c3 text not null, c4 text not null,
    c5 text not null, c6 text not null, c7 text not null,
    c8 text not null, c9 text not null, c10 text not null,
    c11 text not null
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
val con = new MySQLConnection(new Configuration(...))
Await.ready(con.connect, 3.seconds)

def test(query: String) {
  println(Await.result(con.sendQuery(query), 3.seconds))
}

// ok
test("select * from test_9")
test("select * from test_11")
test("select id, c2 from test_10")

// ng
test("select * from test_10")
// or test("select id, c2, c3, c4, c5, c6, c7, c8, c9, c10 from test_11")
output:
QueryResult{rows -> 0,status -> null}
QueryResult{rows -> 0,status -> null}
QueryResult{rows -> 0,status -> null}
[error] (run-main-1) java.lang.IndexOutOfBoundsException: readerIndex(1) + length(1) exceeds writerIndex(1): SlicedByteBuf(ridx: 1, widx: 1, cap: 1/1, unwrapped: UnpooledUnsafeDirectByteBuf(ridx: 5, widx: 585, cap: 8192))
java.lang.IndexOutOfBoundsException: readerIndex(1) + length(1) exceeds writerIndex(1): SlicedByteBuf(ridx: 1, widx: 1, cap: 1/1, unwrapped: UnpooledUnsafeDirectByteBuf(ridx: 5, widx: 585, cap: 8192))
        at io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1161)
        at io.netty.buffer.AbstractByteBuf.readByte(AbstractByteBuf.java:563)
        at io.netty.buffer.SwappedByteBuf.readByte(SwappedByteBuf.java:435)
        at com.github.mauricio.async.db.util.ByteBufferUtils$.readCString(ByteBufferUtils.scala:53)
        at com.github.mauricio.async.db.mysql.decoder.HandshakeV10Decoder.decode(HandshakeV10Decoder.scala:37)
        at com.github.mauricio.async.db.mysql.codec.MySQLFrameDecoder.decode(MySQLFrameDecoder.scala:147)
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:226)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:139)
        at io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:338)
        at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:324)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:785)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:126)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:485)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:452)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:346)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:101)
        at java.lang.Thread.run(Thread.java:744)

Add support on byte arrays for PostgreSQL 8 and older

The current byte array implementation only works if the server supports the "hex" encoding format, which was introduced on PostgreSQL 9 only. We need to implement the "escape" encoding support to make it possible to use byte arrays on previous versions.

Docs about bytea fields can be found here

More examples

First of all, thank you for a great lib!
I'm looking for some more examples of how to create rowmappers for one to many relationships and nested object structures. Let say I have a user
and that user can have many messages. A message has a text and a messageInfo E.g

case class User(id: Option[Long]: Long, name: String, email: String)
case class MessageInfo(color: String, time: DateTime)
case class Message(id: Option[Long], text:String, messageInfo:MessageInfo)

"select u.id, u.name, u.email, m.color, m.time, m.text from User left join Message m on u.id=m.user_id where u.id = 1"

Sorry for any typos =) How would you create a neat rowmapper in a scala way?
Thank you!

Example with Postgres json column type throws exception

Not sure how to use json column types, do you have a working test that uses them? I get the stack trace:

Caused by: com.github.mauricio.async.db.postgresql.exceptions.GenericDatabaseException: ErrorMessage(fields=Map(Position -> 57, Line -> 510, Hint -> You will need to rewrite or cast the expression., File -> parse_target.c, SQLSTATE -> 42804, Routine -> transformAssignedExpr, Message -> column "addresses" is of type json but expression is of type character varying, Severity -> ERROR))
at com.github.mauricio.async.db.postgresql.PostgreSQLConnection.onError(PostgreSQLConnection.scala:165) ~[postgresql-async_2.10-0.2.10.jar:0.2.10]
at com.github.mauricio.async.db.postgresql.codec.PostgreSQLConnectionHandler.channelRead0(PostgreSQLConnectionHandler.scala:151) ~[postgresql-async_2.10-0.2.10.jar:0.2.10]
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:103) ~[netty-all-4.0.12.Final.jar:na]
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:338) ~[netty-all-4.0.12.Final.jar:na]
at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:324) ~[netty-all-4.0.12.Final.jar:na]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:153) ~[netty-all-4.0.12.Final.jar:na]


create table people
(
id bigserial primary key,
name varchar(256),
addresses json,
phones json
);

case class People ( id : Option[Long], name : String, addresses :String, phones : String)

object PeopleServiceSql {
val Insert = "INSERT INTO people (name, addresses, phones) VALUES (?,?,?) RETURNING id"
val Update = "UPDATE people SET name = ?, addresses = ?, phones = ? WHERE id = ?"
}

class PeopleService(pooledConn: Connection) {

def crupdate(m: People): Future[People] = {

m.id match {
  case Some(id) => pooledConn.sendPreparedStatement(Update, Array(m.name, m.addresses, m.phones, id)).map {
    queryResult => m
  }
  case None => pooledConn.sendPreparedStatement(Insert, Array(m.name, m.addresses, m.phones)).map {
    queryResult => new People(Some(queryResult.rows.get(0)("id").asInstanceOf[Long]),
      m.name, m.addresses, m.phones)
  }
}

}
}

def add = Action { implicit request =>

val toInsert = People(id = None, name = "Sabrina Howitzer",
  addresses = (
    """
      [ {"Home" : {"city" : "Tahoe", "state" : "CA"}} ]
    """.trim),
  phones = (
    """
      [ "925-575-0415", "916-321-2233" ]
    """.trim)
)

AsyncResult {
  Services.peopleService.crupdate(toInsert).map { p =>
    Ok("person inserted: " + p.toString)
  }
}

}

Error on Transaction example

Hello,

I'm asking if there is not an error on the Transaction example in your README.

  val future = connection.inTransaction {
    c =>
    c.sendPreparedStatement(this.insert)
     .flatMap( r => connection.sendPreparedStatement(this.insert))
  }

Line 4: connection.sendPreparedStatement should be c.sendPreparedStatement, no?

Timed out when inserting a joda LocalDateTime value

I encountered some unexpected timeouts in a spray.io project, and it seems to be related to the encoding of LocalDateTime values.

Test snippet:

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import com.github.mauricio.async.db.postgresql.PostgreSQLConnection
import com.github.mauricio.async.db.postgresql.util.URLParser
import org.joda.time.LocalDateTime

object LocalDateTimeTest extends App {
  def await[T](f: Future[T]): T = Await.result(f, 5 seconds)

  val conn = new PostgreSQLConnection(
    URLParser.parse("jdbc:postgresql://localhost:5432/db?user=testuser&password=")
  )

  try {
    await(conn.connect)

    val date1 = new LocalDateTime

    await(conn.sendPreparedStatement("CREATE TEMP TABLE test(t TIMESTAMP)"))
    await(conn.sendPreparedStatement("INSERT INTO test(t) VALUES(?)", Seq(date1)))
    val result = await(conn.sendPreparedStatement("SELECT t FROM test"))
    val date2 = result.rows.get.head(0)

    assert(date2 == date1)

  } finally {
    conn.disconnect
  }
}
scala> await(conn.sendPreparedStatement("INSERT INTO test(t) VALUES(?)", Seq(date1)))
java.util.concurrent.TimeoutException: Futures timed out after [5 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:107)
        ...

I'm using Scala 2.10.3 and PostgreSQL 9.3.2.

nio.NioEventLoop: Selector.select() returned prematurely 512 times in a row; rebuilding selector

I'm inserting a list of records into Postgres, using a connection pool, using something like this:

 val f = records.map(x => connection.sendPreparedStatement(insertionStatement, convertRecord(x)))

val futureInsertions = Future.sequence(f)

// Block until all results ready.
val resultList = Await.result(futureInsertions, 1 minute)

The records seem to be getting inserted properly, but, my job is being blocked on the resultList line, and my screen filled with the following error message. Googling around this looks like some sort of longstanding bug in NIO. However, to me it seems as if the last line above should not block indefinitely.

Are there any workarounds for this, or is it expected behavior?

Thanks!


13/11/27 21:38:08 INFO nio.NioEventLoop: Migrated 0 channel(s) to the new Selector.
13/11/27 21:38:08 WARN nio.NioEventLoop: Selector.select() returned prematurely 512 times in a row; rebuilding selector.
13/11/27 21:38:08 WARN nio.NioEventLoop: Selector.select() returned prematurely 512 times in a row; rebuilding selector.
13/11/27 21:38:08 WARN nio.NioEventLoop: Selector.select() returned prematurely 512 times in a row; rebuilding selector.
13/11/27 21:38:08 WARN nio.NioEventLoop: Selector.select() returned prematurely 512 times in a row; rebuilding selector.
13/11/27 21:38:08 WARN nio.NioEventLoop: Selector.select() returned prematurely 512 times in a row; rebuilding selector.

Fails to connect to Amazon Redshift

Amazon Redshift is a hosted data warehouse with driver compatibility based on PostgreSQL 8.0.2. I get the following error trying to connect:

ErrorMessage(fields=Map(Line -> 5993, File -> /home/awsrsqa/padb/src/pg/src/backend/utils/misc/guc.c, SQLSTATE -> 42704, Routine -> set_config_option, Message -> unrecognized configuration parameter "application_name", Severity -> FATAL))

Removing the application_name property in PostgreSQLConnectionHandler resolves the issue. Perhaps the application name could be configurable.

class PostgreSQLConnectionHandler {
 โ€ฆ

  private val properties = List(
    "user" -> configuration.username,
    "database" -> configuration.database,
    "application_name" -> "Netty-PostgreSQL-driver-0.1.2-SNAPSHOT",
    "client_encoding" -> configuration.charset.name(),
    "DateStyle" -> "ISO",
    "extra_float_digits" -> "2")

 โ€ฆ
}

Concurrency problem for multiple queries

The "query succeed" event is being fired before the "read for query" event. This leaves to a concurrency problem with multiple queries run: the "read for query" event can be received after that a second query is already sent. In the this scenario, the query promise is cleared and the second query result will never fire a "query succeed" event.

Implement timeout support on connect and query calls

There should be a way for the connection itself cause a timeout and give up a query from the database or give up connecting if it fails to finish the handshake (it should issue a CancelRequest message to the backend when this happens and a query was running).

MySQL bit type is unsupported

when reading a table with at least a one column of type bit an exception is thrown:

scala.MatchError: 16 (of class java.lang.Integer)
at com.github.mauricio.async.db.mysql.codec.DecoderRegistry.binaryDecoderFor(DecoderRegistry.scala:37)
at com.github.mauricio.async.db.mysql.decoder.ColumnDefinitionDecoder.decode(ColumnDefinitionDecoder.scala:63)
at com.github.mauricio.async.db.mysql.codec.MySQLFrameDecoder.decodeQueryResult(MySQLFrameDecoder.scala:205)
at com.github.mauricio.async.db.mysql.codec.MySQLFrameDecoder.decode(MySQLFrameDecoder.scala:139)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:232)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:131)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:337)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:323)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:785)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:100)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:478)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:447)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:341)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:101)
at java.lang.Thread.run(Thread.java:722)
scala.MatchError: 16 (of class java.lang.Integer)
at com.github.mauricio.async.db.mysql.codec.DecoderRegistry.binaryDecoderFor(DecoderRegistry.scala:37)
at com.github.mauricio.async.db.mysql.decoder.ColumnDefinitionDecoder.decode(ColumnDefinitionDecoder.scala:63)
at com.github.mauricio.async.db.mysql.codec.MySQLFrameDecoder.decodeQueryResult(MySQLFrameDecoder.scala:205)
at com.github.mauricio.async.db.mysql.codec.MySQLFrameDecoder.decode(MySQLFrameDecoder.scala:139)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:232)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:131)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:337)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:323)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:785)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:100)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:478)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:447)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:341)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:101)
at java.lang.Thread.run(Thread.java:722)

Multiple executions of a prepared statement that doesn't return rows fail

If a prepared statement does not return rows (an update for example), the second sendPreparedStatement execution fails with the error:

ErrorMessage(fields=Map(Line -> 480, File -> prepare.c, SQLSTATE -> 42P05, Routine -> StorePreparedStatement, Message -> prepared statement "UPDATE "anentity" SET "i" = $1 WHERE ID = $2" already exists, Severity -> ERROR))

The driver isn't detecting that the prepared statement is already parsed when it does not return rows.

Database url must contain port, username and password

A database url without a port (e.g. jdbc:postgresql://localhost/foo) cannot be parsed:

scala.MatchError: jdbc:postgresql://localhost/foo (of class java.lang.String)

It would be nice if it would be possible to specify such a plain url.

ResultSet.columnNames order does not match ResultSet order

Hi,

i found a Bug in the order of the columnnames in a result.
Here is a small example to reproduce the bug:

import com.github.mauricio.async.db.postgresql.PostgreSQLConnection
import com.github.mauricio.async.db.postgresql.util.URLParser
import com.github.mauricio.async.db.util.ExecutorServiceUtils.CachedExecutionContext
import com.github.mauricio.async.db.{RowData, QueryResult, Connection}
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}

object BugExample {

  def main(args: Array[String]) {

    val configuration = URLParser.parse("jdbc:postgresql://localhost:5432/testdb?user=postgres&password=test")
    val connection: Connection = new PostgreSQLConnection(configuration)

    Await.result(connection.connect, 5 seconds)
    val sql = "SELECT id, username, age, email, firstname, lastname FROM person"
    val future: Future[QueryResult] = connection.sendQuery(sql)

    val mapResult: Future[Any] = future.map(queryResult => {
      queryResult.rows match {
        case Some(resultSet) => {
          println("SQL-Query: " + sql)
          println("resultSet columnNames : " + resultSet.columnNames.mkString("[", ",", "]"))
          println("resultSet columnValues: " + resultSet.head.mkString("[", ",", "]"))
          val row : RowData = resultSet.head
          row(0)
        }
        case None => -1
      }
    })

    val result = Await.result( mapResult, 5 seconds )

    println(result)

    connection.disconnect
  }
}

You need a postgresql database installed with a database "testdb" and the following table:

CREATE TABLE public.person (
                id bigserial,
                username VARCHAR NOT NULL UNIQUE,
                firstname VARCHAR NOT NULL,
                lastname VARCHAR NOT NULL,
                email VARCHAR NOT NULL UNIQUE,
                age BIGINT NOT NULL,
                PRIMARY KEY (id)
);

INSERT INTO person(
            username, firstname, lastname, email, age)
    VALUES ('test', 'John', 'Doe', '[email protected]', 26);

When you run the example you get the following output:

SQL-Query: SELECT id, username, age, email, firstname, lastname FROM person
resultSet columnNames : [email,username,firstname,age,lastname,id]
resultSet columnValues: [1,test,26,[email protected],John,Doe]

As you can see, the order from the columnnames does not match the order of the resultSet values.

SingleThreadedAsyncObjectPool alternatives

I'm performing some scalability tests using postgresql-async. Currently, the SingleThreadedAsyncObjectPool is the bottleneck. Do you have plans to review this implementation?

I think that a good alternative is to create multiple SingleThreadedAsyncObjectPool that are used according a thread affinity (thead.getId % partitionCount). Something like a PartitionedSingleThreadedAsyncObjectPools.

Provide way to escape ? (placeholder) in queries, or to use postgres $N placeholders instead

This is really picky, but there are a number of postgres geometric operators that contain question marks (and other user-defined ones may as well). There's currently no way to access these operators (without creating wrapper functions) as all '?' in queries are replaced with placeholders.

(I realize mysql doesn't have this problem and there's a desire to provide a unified interface. My preference would have been to use the postgres-style $N placeholders, and substitute them with ? for mysql (reordering/duplicating/filling arguments as neccesary). There's also something to be said for the generic named-argument approach. Oh, well.)

Add ability to pass my own ExecutionContext for Connection

For creating a new Connection, it should be possible to pass in my own ExecutionContext. Vert.x would need the execution being done on the same thread for example, so a hardcoded CachedExecutionContext in PostgreSQLConnection won't work.

It should work if PostgreSQLConnection had the ability to replace the CachedExecutionContext like it was done in the ConnectionPool:
https://github.com/mauricio/postgresql-async/blob/master/db-async-common/src/main/scala/com/github/mauricio/async/db/pool/ConnectionPool.scala#L42

Add mutex/synchronization for running queries

While normal use cases are fine assuming no other problems, if somehow multiple threads get ahold of the same Connection and try to run queries on it (which would clearly be a bug) it's possible for them to end up corrupting each other's queries. It would be nice if problems like this are caught. In particular, it seems that the window between validateQuery and setQueryPromise in each Connection.send* function is vulnerable and, unless I'm missing something, the AtomicReference isn't providing much benefit.

A quick glance suggests that setQueryPromise could check whether queryPromise is already defined (e.g., using a simple synchronized {} mutex or AtomicReference.compareAndSet) and throw the ConnectionStillRunningQueryException at that point.

Similarly, since the connection is effectively locked (with this change) during the query, there's no need for preparedStatementCounter to be atomic.

Apache Spark application fails when mysql-async is add in build.sbt

My application developed using Apache Spark is failing when the mysql-async library is add to the build.sbt.

I've add the following line to build.sbt "com.github.mauricio" %% "mysql-async" % "0.2.11"
When this line is commented out the application runs just fine.

Could you please help?

I'm getting the following error message:

23:43:54.429 [spark-akka.actor.default-dispatcher-3] INFO o.a.s.s.local.LocalTaskSetManager - Loss was due to java.lang.ClassNotFoundException
java.lang.ClassNotFoundException: scala.None$
at java.net.URLClassLoader$1.run(Unknown Source)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Unknown Source)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:36)
at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
at java.io.ObjectInputStream.readClassDesc(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.readObject(Unknown Source)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:39)
at org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:61)
at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:129)
at java.io.ObjectInputStream.readExternalData(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.readObject(Unknown Source)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:39)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:61)
at org.apache.spark.scheduler.local.LocalScheduler.runTask(LocalScheduler.scala:191)
at org.apache.spark.scheduler.local.LocalActor$$anonfun$launchTask$1$$anon$1.run(LocalScheduler.scala:68)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
23:43:54.438 [DAGScheduler] DEBUG o.a.spark.scheduler.DAGScheduler - Got event of type org.apache.spark.scheduler.TaskSetFailed
23:43:54.443 [test-akka.actor.default-dispatcher-3] INFO o.a.spark.scheduler.DAGScheduler - Failed to run count at DataSession.scala:26
23:43:54.447 [spark-akka.actor.default-dispatcher-3] INFO o.a.s.scheduler.local.LocalScheduler - Remove TaskSet 0.0 from pool
[ERROR] [01/19/2014 23:43:54.455] [test-akka.actor.default-dispatcher-6] [akka://test/user/testServer/1/771192171] Job failed: Task 0.0:0 failed more than 4 times; aborting job java.lang.ClassNotFoundException: scala.None$
org.apache.spark.SparkException: Job failed: Task 0.0:0 failed more than 4 times; aborting job java.lang.ClassNotFoundException: scala.None$
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:761)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:759)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:759)
at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:380)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:442)
at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:150)

Unexpected behaviour of sendPreparedStatement

First of all, I'm not sure if this is an problem at all. But it seem weird to me.

When sendPreparedStatement takes less argument then expected you get an error message:

pool.sendPreparedStatement("INSERT INTO test VALUES (?,?)", "data1")

returns:

ErrorMessage(1210,#HY000,Incorrect arguments to mysqld_stmt_execute)

But when parameter count is more than expected, some weird things starting to occur.

pool.sendPreparedStatement("INSERT INTO test VALUES (?,?)",
"data1", "data2", "data3")

inserts:

+-----------------+-------+
| test | test2 |
+-----------------+-------+
| data1data1d | ta1 |
+-----------------+-------+

or

pool.sendPreparedStatement("INSERT INTO test VALUES (?,?)", 123, 123, 123)

inserts:

+-----------------+--------+
| test | test2 |
+-----------------+---------+
| 8060931 | 8060928 |
+-----------------+---------+

I tried to execute statements in mysql just to be sure about where the problem occurs.

mysql> prepare stm from 'insert into test values(?,?)';
mysql> set @A = 123;
mysql> set @b = 123;
mysql> set @c = 123;

mysql> execute stm using @A; > ERROR 1210 (HY000): Incorrect arguments to EXECUTE

mysql> execute stm using @A, @b; > Query OK, 1 row affected (0.00 sec)

mysql> execute stm using @A, @, @c; > ERROR 1210 (HY000): Incorrect arguments to EXECUTE

That means the problem is in driver.

In scaladoc of sendPreparedStatement , I found something interesting:

You must provide as many parameters
as you have provided placeholders, so, if your query is as "INSERT INTO users
(login,email) VALUES (?,?)" you
have to provide an array with at least two values, as in:
Array("john-doe", "[email protected]")

at least made me think about this behavior is expected. But I'm still not sure.

Mysql driver fails for null TIMESTAMP

Test to reproduce:

    "support null TIMESTAMP on insert" in {
        withConnection {
            connection =>
                executeQuery(connection, "CREATE TEMPORARY TABLE TIMESTAMPBUG( id INT NOT NULL, VALUE TIMESTAMP, primary key (id))")
                executePreparedStatement(connection, "INSERT INTO TIMESTAMPBUG (VALUE, ID) VALUES (?, ?)", null, 10)
                val row = executePreparedStatement(connection, "SELECT VALUE, id FROM TIMESTAMPBUG").rows.get(0)
                row("id") === 10
                row("value") === null
        }
    }

Error:

[error] ! support empty TIMESTAMP on insert
[error] MySQLException: Error 1210 - #HY000 - Incorrect arguments to mysqld_stmt_execute (MySQLConnection.scala:114)
[error] com.github.mauricio.async.db.mysql.MySQLConnection.onError(MySQLConnection.scala:114)
[error] com.github.mauricio.async.db.mysql.codec.MySQLConnectionHandler.messageReceived(MySQLConnectionHandler.scala:116)
[error] org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:88)
[error] org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
[error] org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
[error] org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
[error] org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
[error] org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
[error] org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
[error] org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
[error] org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
[error] org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
[error] org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
[error] org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
[error] org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
[error] org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)
[error] org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
[error] org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
[error] org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
[error] org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
[error] org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)

Mysql driver fails for big strings (TEXT)

Test to reproduce the error:

    "support prepared statement with a big string (TEXT)" in {

        val bigString = {
            var s = ""
            for (i <- 0 until 400)
                s += "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
            s
        }

        withConnection {
            connection =>
                executeQuery(connection, "CREATE TEMPORARY TABLE BIGSTRING(STRING TEXT)")
                executePreparedStatement(connection, "INSERT INTO BIGSTRING VALUES (?)", bigString)
                val row = executePreparedStatement(connection, "SELECT STRING FROM BIGSTRING").rows.get(0)
                row("STRING") === bigString
                ok
        }
    }

Support prepared statement with more than 64 characters

The issue occurs because the driver is using the query itself as the prepared statement name and at the database level it is truncated in 64 characters, so statements with the same 64 chars prefix are considered as the same statement.

[0.2.6] Error with timezone parsing

20:49:39.924 [pool-1-thread-1] ERROR c.g.m.a.d.p.PostgreSQLConnection - Error on connection
java.lang.IllegalArgumentException: Invalid format: "2013-05-12 00:01:38+04" is malformed at "+04"
at org.joda.time.format.DateTimeFormatter.parseDateTime(DateTimeFormatter.java:871) ~[joda-time-2.2.jar:2.2]
at com.github.mauricio.async.db.column.TimestampEncoderDecoder.decode(TimestampEncoderDecoder.scala:42) ~[db-async-common_2.10-0.2.6.jar:0.2.6]
at com.github.mauricio.async.db.column.TimestampEncoderDecoder.decode(TimestampEncoderDecoder.scala:29) ~[db-async-common_2.10-0.2.6.jar:0.2.6]
at com.github.mauricio.async.db.column.ColumnDecoder$class.decode(ColumnDecoder.scala:27) ~[db-async-common_2.10-0.2.6.jar:0.2.6]
at com.github.mauricio.async.db.column.TimestampEncoderDecoder.decode(TimestampEncoderDecoder.scala:29) ~[db-async-common_2.10-0.2.6.jar:0.2.6]
at com.github.mauricio.async.db.postgresql.column.PostgreSQLColumnDecoderRegistry.decode(PostgreSQLColumnDecoderRegistry.scala:48) ~[postgresql-async_2.10-0.2.6.jar:0.2.6]
at com.github.mauricio.async.db.postgresql.PostgreSQLConnection.onDataRow(PostgreSQLConnection.scala:220) ~[postgresql-async_2.10-0.2.6.jar:0.2.6]
at com.github.mauricio.async.db.postgresql.codec.PostgreSQLConnectionHandler.channelRead0(PostgreSQLConnectionHandler.scala:149) ~[postgresql-async_2.10-0.2.6.jar:0.2.6]
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:98) ~[netty-all-4.0.7.Final.jar:na]
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:334) [netty-all-4.0.7.Final.jar:na]
at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:320) [netty-all-4.0.7.Final.jar:na]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:173) [netty-all-4.0.7.Final.jar:na]
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:334) [netty-all-4.0.7.Final.jar:na]
at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:320) [netty-all-4.0.7.Final.jar:na]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:785) [netty-all-4.0.7.Final.jar:na]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:100) [netty-all-4.0.7.Final.jar:na]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:497) [netty-all-4.0.7.Final.jar:na]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:465) [netty-all-4.0.7.Final.jar:na]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:359) [netty-all-4.0.7.Final.jar:na]
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:101) [netty-all-4.0.7.Final.jar:na]
at java.lang.Thread.run(Thread.java:722) [na:1.7.0_21]

Support (auto convert) Option in prepared stmt parameters

Passing an option (Some/None) fails at runtime, so I always have to use getOrElse (null). Am I doing anything wrong?

It would be great if this would be done automatically, unless there's a reason not to do so.

Is there already some possibility to hook into parameter binding?

Mispelling

NettyUtils#DefaultEventLoopGroup is actually DetaultEventLoopGroup.

mysql-async throws IllegalArgumentException when limit size is larger than the size of existing records

Thank you for your incredible work.

I found a mysql-async issue. mysql-async throws IllegalArgumentException when limit size is larger than the size of existing records. For instance, running select * from members limit 100 for members table which has only 10 records fails.

[info]   java.lang.IllegalArgumentException: length is negative: -1
[info]   at org.jboss.netty.buffer.SlicedChannelBuffer.checkIndex(SlicedChannelBuffer.java:225)
[info]   at org.jboss.netty.buffer.SlicedChannelBuffer.slice(SlicedChannelBuffer.java:125)
[info]   at org.jboss.netty.buffer.AbstractChannelBuffer.readSlice(AbstractChannelBuffer.java:323)
[info]   at com.github.mauricio.async.db.mysql.decoder.ResultSetRowDecoder.decode(ResultSetRowDecoder.scala:42)
[info]   at com.github.mauricio.async.db.mysql.codec.MySQLFrameDecoder.decodeQueryResult(MySQLFrameDecoder.scala:198)
[info]   at com.github.mauricio.async.db.mysql.codec.MySQLFrameDecoder.decode(MySQLFrameDecoder.scala:135)
[info]   at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:425)
[info]   at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
[info]   at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
[info]   at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
[info]   ...

Though I've not investigated about this issue in detail, I will send a pull request if I figure out the problem.

MySQL: QueryResult.rowsAffected not set on SELECT statements

In a PostgreSQL SELECT statement, the QueryResult.rowsAffected gets set to the number of rows selected. In MySQL this is set to 0. I'd like to have the number of results right away instead of counting myself, if that's possible.

This might be related to #42 as my client code is still the same, but the "rows" is always set to 0.

Exception with enum types in postgres driver

Similar to the bug for json types. Not sure how to map enum columns to your driver, I just assumed it would take a String type...

I added an enum column to database on your blog demo and got the stack trace:

play.api.Application$$anon$1: Execution exception[[GenericDatabaseException: ErrorMessage(fields=Map(Position -> 61, Line -> 510, Hint -> You will need to rewrite or cast the expression., File -> parse_target.c, SQLSTATE -> 42804, Routine -> transformAssignedExpr, Message -> column "feeling" is of type mood but expression is of type character varying, Severity -> ERROR))]]
at play.api.Application$class.handleError(Application.scala:289) ~[play_2.10.jar:2.1.1]
at play.api.DefaultApplication.handleError(Application.scala:383) [play_2.10.jar:2.1.1]
at play.core.server.netty.PlayDefaultUpstreamHandler$$anon$2$$anonfun$handle$1.apply(PlayDefaultUpstreamHandler.scala:144) [play_2.10.jar:2.1.1]
at play.core.server.netty.PlayDefaultUpstreamHandler$$anon$2$$anonfun$handle$1.apply(PlayDefaultUpstreamHandler.scala:140) [play_2.10.jar:2.1.1]
at play.api.libs.concurrent.PlayPromise$$anonfun$extend1$1.apply(Promise.scala:113) [play_2.10.jar:2.1.1]
at play.api.libs.concurrent.PlayPromise$$anonfun$extend1$1.apply(Promise.scala:113) [play_2.10.jar:2.1.1]
Caused by: com.github.mauricio.async.db.postgresql.exceptions.GenericDatabaseException: ErrorMessage(fields=Map(Position -> 61, Line -> 510, Hint -> You will need to rewrite or cast the expression., File -> parse_target.c, SQLSTATE -> 42804, Routine -> transformAssignedExpr, Message -> column "feeling" is of type mood but expression is of type character varying, Severity -> ERROR))
at com.github.mauricio.async.db.postgresql.PostgreSQLConnection.onError(PostgreSQLConnection.scala:165) ~[postgresql-async_2.10-0.2.10.jar:0.2.10]
at com.github.mauricio.async.db.postgresql.codec.PostgreSQLConnectionHandler.channelRead0(PostgreSQLConnectionHandler.scala:151) ~[postgresql-async_2.10-0.2.10.jar:0.2.10]
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:103) ~[netty-all-4.0.12.Final.jar:na]
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:338) ~[netty-all-4.0.12.Final.jar:na]
at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:324) ~[netty-all-4.0.12.Final.jar:na]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:153) ~[netty-all-4.0.12.Final.jar:na]


Code changes:

CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy');

CREATE TABLE messages
(
id bigserial NOT NULL,
content character varying(255) NOT NULL,
moment date NOT NULL,
feeling mood,
CONSTRAINT bigserial_column_pkey PRIMARY KEY (id )
);


case class Message ( id : Option[Long], content : String, moment : LocalDate = LocalDate.now(), feeling: String)


object MessageRepository {
val Insert = "INSERT INTO messages (content,moment,feeling) VALUES (?,?,?) RETURNING id"
val Update = "UPDATE messages SET content = ?, moment = ? WHERE id = ?"
val Select = "SELECT id, content, moment, feeling FROM messages ORDER BY id asc"
val SelectOne = "SELECT id, content, moment, feeling FROM messages WHERE id = ?"
}

class MessageRepository(pool: Connection) {

import MessageRepository._

def save(m: Message): Future[Message] = {

m.id match {
  case Some(id) => pool.sendPreparedStatement(Update, Array(m.content, m.moment, m.feeling, id)).map {
    queryResult => m
  }
  case None => pool.sendPreparedStatement(Insert, Array(m.content, m.moment, m.feeling)).map {
    queryResult => new Message(Some(queryResult.rows.get(0)("id").asInstanceOf[Long]), m.content, m.moment, m.feeling)
  }
}

}
//....
}

Update dependencies?

These are available.

  • Netty 4.0.9.Final
  • joda-time 2.3
  • joda-convert 1.4

Note:
Netty 4.0.9.Final breaks compatibility from 4.0.8.Final.
isDirectBufferPooled was added to ByteBufAllocator.

BufferNotFullyConsumedException: Buffer was not fully consumed by decoder

I'm attempting to try out the library and I'm running into the exception below. I'm obviously doing something terribly wrong. Any ideas?

Caused by: com.github.mauricio.async.db.exceptions.BufferNotFullyConsumedException: Buffer was not fully consumed by decoder, 23 bytes to read
    at com.github.mauricio.async.db.mysql.codec.MySQLFrameDecoder.decode(MySQLFrameDecoder.scala:164)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:425)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)
    at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
    at java.lang.Thread.run(Thread.java:680)

My scenario is basically something like:

val connection = Await.result(new MySQLConnection(configuration).connect, timeout)
connection.sendQuery(...)
...

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.