djc / bb8 Goto Github PK
View Code? Open in Web Editor NEWFull-featured async (tokio-based) postgres connection pool (like r2d2)
License: MIT License
Full-featured async (tokio-based) postgres connection pool (like r2d2)
License: MIT License
Hi,
When setting up this crate, I unfortunately encountered the following error:
error[E0599]: no method named `then` found for type `tokio_postgres::impls::Prepare` in the current scope
--> src/commands/loadout.rs:40:14
|
40 | .then(|st| {
| ^^^^
|
= help: items from traits can only be used if the trait is in scope
= note: the following trait is implemented but not in scope, perhaps add a `use` for it:
`use futures::future::Future;`
And yes, I have tried importing the following the suggested fix.
My Code: https://gist.github.com/Texlo-Dev/98f4213a20f5e75007f68e08b1715d29
Any help would be much appreciated.
@djc Hi, I was recently troubleshooting a memory leak, which may be related to connection pool. I saw that spawn_start_connections (method of PoolInner) locked on self.inner.internals, this is used to protect the entire spawn_replenishing_approvals method, right? I find that spawn_replenishing_approvals spawn a new task by tokio::spawn internally and without join or await on it, locker won't protect the code in that this task, is this correct or expected?
Hello!
Sorry for my ignorance - I'am newby in Rust.
I want to use hyper with bb8 and tokio-postgres. In every request I want to acquire new connection from pool. Please, can you provide me some example for this scenario?
Currently I do it like this:
fn main() {
let addr = "127.0.0.1:3000".parse().unwrap();
let pg_mgr =
PostgresConnectionManager::new("postgresql://auth:auth@localhost:5433/auth", NoTls);
rt::run(future::lazy(move || {
Pool::builder()
.build(pg_mgr)
.map_err(|e| eprintln!("Database error: {}", e))
.and_then(move |pool| {
let service = || service_fn(|req| router(req, pool.clone()));
let server = Server::bind(&addr)
.serve(service)
.map_err(|e| eprintln!("Server error: {}", e));
println!("Listening on http://{}", addr);
server
})
}))
}
fn router(
_req: Request<Body>,
_pool: Pool<PostgresConnectionManager<NoTls>>,
) -> Result<Response<Body>, hyper::Error> {
// do some staff with pool
}
But it won't compile:
error[E0597]: `pool` does not live long enough
--> src/main.rs:22:63
|
22 | let service = || service_fn(|req| router(req, pool.clone()));
| -- -----------------------------^^^^----------
| | | |
| | | borrowed value does not live long enough
| | returning this value requires that `pool` is borrowed for `'static`
| value captured here
...
30 | })
| - `pool` dropped here while still borrowed
Waht am I doing wrong? How to make my case work correctly?
Hi,
when I run
let pool = bb8::Pool::builder().build(manager)
without wrapping into future::lazy
the program crashes. The reason is, that Pool::new_inner
schedules a task to reap connections.
I suggest to do this the first time the future is polled.
If you agree I could try to implement this. Edit: I will try something else first.
I use tokio 0.2, I don't use the default executor, so I want a way that can specify the executor when building the pool.
such as
let rt = tokio::Runtime::new().unwrap();
let manager = SomeManager();
let pool = bb8::Builder().new()
.with_executor(rt.executor())
.build(manager)
.compat().await?;
let result = pool.run(|conn| ()).compat().await?;
When using a pool connection for multiple queries, the connection is eventually closed while still in use.
The following test reliably succeeds on the *_direct methods (without using bb8), but frequently (>50%) fails one or more of the pool-using tests.
use lazy_static::lazy_static;
use bb8_postgres::bb8::Pool;
use bb8_postgres::PostgresConnectionManager;
use bb8_postgres::tokio_postgres::{NoTls, Client};
const CONN_STR: &'static str = "dbname=demodb host=localhost user=postgres";
const ITERATIONS: u32 = 1000;
lazy_static! {
static ref CONNECTION_POOL: Pool<PostgresConnectionManager<NoTls>> = {
let manager = PostgresConnectionManager::new_from_stringlike(CONN_STR, NoTls).unwrap();
Pool::builder().build_unchecked(manager)
};
}
#[cfg(test)]
mod test {
use super::*;
#[tokio::test]
async fn much_insert_traffic_direct() {
much_traffic_direct("INSERT INTO foo(a,b) VALUES (1, 2) RETURNING id").await
}
#[tokio::test]
async fn much_select_traffic_direct() {
much_traffic_direct("SELECT MAX(id) FROM foo").await
}
#[tokio::test]
async fn much_update_traffic_direct() {
much_traffic_direct("UPDATE foo SET a = 81 WHERE id = 1919 RETURNING b").await;
}
#[tokio::test]
async fn much_insert_traffic() {
much_traffic("INSERT INTO foo(a,b) VALUES (1, 2) RETURNING id").await
}
#[tokio::test]
async fn much_select_traffic() {
much_traffic("SELECT MAX(id) FROM foo").await
}
#[tokio::test]
async fn much_update_traffic() {
much_traffic("UPDATE foo SET a = 81 WHERE id = 1919 RETURNING b").await;
}
async fn much_traffic_direct(stmt: &str) {
let (client, connection) = bb8_postgres::tokio_postgres::connect(CONN_STR, NoTls).await.expect("connect");
tokio::spawn(async {
connection.await
});
for i in 0..ITERATIONS {
let res = client.query_opt(stmt, &[]).await.expect(&format!("Perform repeat {} of {} ok", i, stmt));
}
}
async fn much_traffic(stmt: &str) {
let c = CONNECTION_POOL.get().await.expect("Get a connection");
let client = &*c;
for i in 0..ITERATIONS {
let res = client.query_opt(stmt, &[]).await.expect(&format!("Perform repeat {} of {} ok", i, stmt));
}
}
}
The reported error is always similar, some variation of the following
Perform repeat 8782 of UPDATE foo SET a = 81 WHERE id = 1919 RETURNING b ok: Error { kind: Closed, cause: None } thread 'test::much_update_traffic' panicked at 'Perform repeat 8782 of UPDATE foo SET a = 81 WHERE id = 1919 RETURNING b ok: Error { kind: Closed, cause: None }', src\main.rs:44:23
Just a quick question - is there a way to enable some optional features of tokio-postgres
? And if so, how would I do it?
We're getting the following error:
the package `event-store` depends on `bb8-postgres`, with features: `with-uuid, with-chronos, with-serde_json` but `bb8-postgres` does not have these features.
Enabling those features for the tokio_postgres
crate and using a bb8::Pool
doesn't work, so we were wondering if this could be due to the bb8's internal use of tokio_postgres
not including those features? Do you have any advice on what we should do?
I added timeout to code that connect to redis and sends a few commands using tokio::time::timeout. That works just by stopping polling future produced by pool().get(). Unfortunately bb8 leaks connections in certain cases, and after some time pool is empty and not usable.
One of problems is here in the get function below (bb8/src/lib.rs).
pub async fn get(&self) -> Result<PooledConnection<'_, M>, RunError<M::Error>> {
let conn = self.get_conn::<M::Error>().await?;
Ok(PooledConnection {
pool: self,
checkout: Instant::now(),
conn: Some(conn),
})
}
Polling may be stopped between await and construction of PooledConnection. Thus connection is leaked.
This problem may be solved I think by replacing this implementation with:
pub async fn get(&self) -> Result<PooledConnection<'_, M>, RunError<M::Error>> {
let conn = self.get_conn::<M::Error>();
conn.map(move |conn_result| {
conn_result.map(|conn| {
PooledConnection {
pool: self,
checkout: Instant::now(),
conn: Some(conn),
}
})
}).await
}
However, I think also get_conn internally may have this provlem whem validation is enabled. Maybe one solution would be to create PooledConnection as soon as it is removed from popped from conns
:
if let Some(conn) = internals.conns.pop_front() {
Have a bit of an issue. I decided to dip my toes in by trying to make actix-web run async/await with a pool. Went with bb8 because you're on new futures already 👍 🎊
After a little work, I ended up with this error over and over again:
thread '' panicked at 'called
Result::unwrap()
on anErr
value: SpawnError { is_shutdown: true }'
getting thrown from inside bb8's Pool. After looking into it some more, it's because the reaper is spawned off immediately on building the pool, but the tokio runtime isn't ready until you call System::run
on the actix system. However, I need to build the pool to give to the actix_web::HttpServer
before I start the system.
Not entirely a bb8 issue, I'm sure it works fine with hyper but I was wondering if you had any ideas.
Validated by forking and just commenting out where the reaper is scheduled. Works fine and I get a db connection.
Hey @khuey, how would you feel about just transferring the repo to me? Would make it a bit easier to manage. You could still be a collaborator if you like -- not sure what your level of interest is these days. Let me know.
Hi,
I would suggest to use the Config
struct from tokio_postgres to specify connection details instead of passing a string "postgres://...".
Currently, the string is passed to connect
from tokio_postgres, which is a helper function. There, the string is parsed to a Config
struct.
So the api would change from ::new("postgres://....")
to ::new("postgres://...".into())
.
This saves parsing a string if the connection details are not in the correct format. At the moment I have to assemble a string in the expected format which then gets parsed immediately.
I think this is also how r2d2 works.
I have a web api application that uses bb8-postgres, tokio_postgres and hyper. When I started to do performance tests, I noticed that if there're too many requests, it shows this error "Timed out in bb8" and it doesn't go away, only restart helps to get rid of it.
I do create a single instance of bb8::Pool
and clone it for each request. Then, I do db_pool.run(|conn| async {Ok((<some_func_that_does_queries_using_conn>.await, conn))}).await
.
I'm not sure if it's a bug or I'm doing something wrong. Could you help me, please?
Would be nice if bb8-postgres supported calling DISCARD ALL
after the connection is returned to the pool, so it's guaranteed to be "pristine" on the next checkout.
Unless I'm missing something, there isn't even a "post check-in" hook in bb8 right now, only test_on_check_out
. I suppose that could be used, but the best is to do it early, release the resources (e.g. UNLISTEN
) and reduce the latency on checkout (assuming test_on_checkout
is false).
I wanted to go full async with the application I'm currently building and tried both bb8 and l337 with tokio-postgres just to find that both performed worse than r2d2 with postgres:
https://bitbucket.org/bikeshedder/actix_web_async_postgres/src/master/
Am I doing anything wrong or is this maybe an issue with tokio-postgres?
Hi @khuey,
Since this repo hasn't been very active lately, and I need some pooling implementation that works with async/await and (ideally) has a released version on crates.io, I'd like to know what your plans are going forward. (It looks like many people might be in a similar situation since async/await has landed.) I like bb8 and as you've seen I'm happy to contribute to it, but if you don't have time for code reviews and releasing, I want to make sure I spend my time/energy wisely.
As previously mentioned, I'd be happy to help with maintenance. However, if you don't really have time to work on this crate outside some limited purpose (like, conceivably, your day job), please clarify this intention so that I can plan accordingly.
Thanks for the good code and enjoy any holiday time!
In current project my team needs a middleware service between redis and rest of the world. So I wrote one that use bb8-redis under the hood, but unfortunately it stucks under a lot of connections.
I create repo with instructions how to reproduce, here it is.
Hi,
this issue is conceptually linked to sfackler/r2d2-postgres#19
The problem is related to the fact that the PostgresConnectionManager<T>
struct has a generic parameter <T>
.
If <T>
is known at compile-time, then there are no problems; on the contrary, if it is only known at runtime then it has to be redeclared and propagated in every place where the connection and/or the Pool is used.
As already shown in sfackler/r2d2-postgres#19 , it is possible to slightly alter the PostgresConnectionManager
struct implementation to remove the generic T
and making the pool more ergonomic to use.
If you think it is not feasible, would you at least mind to provide both the alternatives of the PostgresConnectionManager
, one with and one without the generic param?
[package]
name = "db_pool"
version = "0.1.0"
edition = "2018"
[dependencies]
bb8 = "0.3.1"
bb8-postgres = "0.3.1"
diesel = { version = "1.4.3", features = ["postgres", "extras"] }
failure = "0.1"
futures = "0.1"
futures-state-stream = "0.2"
postgres = "0.17.0"
tokio-postgres = "0.5.1"
use bb8;
use bb8_postgres;
use futures::future;
use futures::prelude::*;
use tokio_postgres;
use tokio_postgres::NoTls;
#[derive(Clone, Debug)]
pub struct Pool {
inner: bb8::Pool<bb8_postgres::PostgresConnectionManager<NoTls>>
}
impl From<bb8::Pool<bb8_postgres::PostgresConnectionManager<NoTls>>> for Pool {
fn from(v: bb8::Pool<bb8_postgres::PostgresConnectionManager<NoTls>>) -> Self {
Self { inner: v }
}
}
I am getting following error:
error: the trait bound `tokio_postgres::NoTls: bb8_postgres::tokio_postgres::tls::MakeTlsConnect<bb8_postgres::tokio_postgres::Socket>` is not satisfied
label: the trait `bb8_postgres::tokio_postgres::tls::MakeTlsConnect<bb8_postgres::tokio_postgres::Socket>` is not implemented for `tokio_postgres::NoTls`
Hello and thanks for starting this much-needed crate.
I'll be using bb8
as part of a personal project, but I'd like to be able to pull this from crates.io instead of directly from git. That will also help with versioning in case there's an API change.
Are there any plans for publishing the bb8
crate soon?
#[async_trait]
impl bb8::ManageConnection for RedisConnectionManager {
type Connection = Option<Connection>;
type Error = RedisError;
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
self.client.get_async_connection().await.map(Some)
}
async fn is_valid(&self, mut conn: Self::Connection) -> Result<Self::Connection, Self::Error> {
// The connection should only be None after a failure.
redis::cmd("PING")
.query_async(conn.as_mut().unwrap())
.await
.map(|()| conn)
}
fn has_broken(&self, conn: &mut Self::Connection) -> bool {
conn.is_none()
}
}
the is_valid
method also unwraps this option, is that safe? under what conditions is it safe?
Currently I using actix_web + r2d2 and r2d2_postgres. The way I use it: Actix`s SyncArbiter creates 3 separated threads, each thread has it's own r2d2 pool of connections. But r2d2 isn't asynchronous. So going this way I will get bottle neck, if I understands correctly. How can I use bb8 crate with actix-web framework? Would be cool to see some examples. Thank you!
Since tokio 0.3 is out with some major changes, it would be a good idea to update.
Boxing is no longer necessary. On the other hand the returned future being Send
will give much more freedom to DB result consumers.
I'm testing bb8_redis along with other pooling systems like deadpool and mobc.
I've noticed that with every operation, a socket TIME_WAIT is created beside the ESTABLISHED one, like if the socket is discarded and recreated every time.
With bb8, using a min_idle > 0, this is even worse, because after every single operation, the number of TIME_WAIT sockets created is the same of the number of the idle connections.
Obviously redis-rs guy are telling me this is a pooling crate issue, and I fear that, with high request rates, I'll end up saturating available sockets.
I'm seeing the TIME_WAIT sockets with netstat -npa | grep 6379 where 6379 is redis port in use.
Trying to demonstrate the problem, I ended up creating a working pooling crate without the problem: https://github.com/nappa85/redis_dumbpool
Looks like tokio 1.0 is out. Not sure yet about 0.3 compatibility, but might be time to add support.
Would you be open to a PR that changes the Pool
reference on PooledConnection
to Weak<SharedPool>
? You can take a look here: master...kardeiz:local.
This would be useful for using the PooledConnection
in some async contexts (for example, I'm working on a sync adapter for bb8 where I'm sending the PooledConnection
from another thread).
struct Error
is private
--> postgres/src/lib.rs:13:34
|
13 | use tokio_postgres::{Connection, Error, TlsMode};
| ^^^^^
error[E0599]: no function or associated item named connect
found for type tokio_postgres::Connection
in the current scope
--> postgres/src/lib.rs:50:9
|
50 | Connection::connect(self.params.clone(), (self.tls_mode)(), &handle)
| ^^^^^^^^^^^^^^^^^^^ function or associated item not found in tokio_postgres::Connection
error[E0599]: no method named batch_execute
found for type tokio_postgres::Connection
in the current scope
--> postgres/src/lib.rs:57:14
|
57 | conn.batch_execute("")
| ^^^^^^^^^^^^^
error[E0599]: no method named is_desynchronized
found for type &mut tokio_postgres::Connection
in the current scope
--> postgres/src/lib.rs:61:14
|
61 | conn.is_desynchronized()
| ^^^^^^^^^^^^^^^^^
error: aborting due to 4 previous errors
It's conceivable that, even in a case where a connection is no longer valid, that other parts of the connection manager may want to access the connection value. For instance, the r2d2 CustomizeConnection trait will pass that to on_release(), which can perform more custom operations on it. Is there any reason, aside from backwards compatibility, that is_valid()
does not take Self::Connection by mutable reference, then have the Result be Result<(), Self::Error>
? I believe this would better deal with the circumstances, and still permit the connection itself to be dropped.
I try to use last 0.4 version from that repo, but library panic after try use connection. Pool is succesfuly created, but after send query - failed with TimedOut. Also i tried use that in my project with actix, and same - the pool is created successfully, but then it breaks when trying to send a request.
@djc Is this a known bug? Maybe it does not work because it is only an alpha version.
(I attach you because it looks like the project passed to you)
Hi,
I am running a small backend with Hyper, I was investigating this library to have an async threadpool.
I was wondering what's the difference between bb8
and bb8-postgres
.
Also, I join the others in asking a bit more documentation and examples (tokio itself is a bit hard to grasp for newbies). The example for Postgres looks like to rely on unreleased version of the library.
EDIT: also, do you have plans to push forward the development of the tokio-based branch of bb8
(now that async-std is out)?
Thank you!
The mutex around internals
is never held locked across an .await
as far as I can see, so it should not be using an asynchronous mutex. Changing to a synchronous mutex would allow you to fix the following issues:
Spin lock
https://github.com/khuey/bb8/blob/e2978dfb1ac86842901e344efcf198dc75fab488/bb8/src/lib.rs#L628-L632
Use of block_on
in destructor
https://github.com/khuey/bb8/blob/e2978dfb1ac86842901e344efcf198dc75fab488/bb8/src/lib.rs#L798-L809
even with the simple example, tokio.run always takes at least 30s to finish. Looks like it ends with time-out? is there any way to finish it after all futures finish?
time ../target/debug/examples/static_select
Output:
real 0m30.328s
user 0m0.017s
sys 0m0.010s
Example txn
has similar outcome.
time ../target/debug/examples/txn
Output:
real 0m30.021s
user 0m0.017s
sys 0m0.010s
But "result: 1" can be printed out very quickly. I just couldn't figure out why tokio.run needs 30s to finish.
Copypasted of issue that I create in tokio: tokio-rs/tokio#3500
Version
│ ├── tokio v1.1.0
│ │ └── tokio-macros v1.0.0
│ ├── tokio-util v0.6.2
│ │ ├── tokio v1.1.0 (*)
│ │ └── tokio-stream v0.1.2
│ │ └── tokio v1.1.0 (*)
│ ├── tokio v1.1.0 (*)
├── tokio v1.1.0 (*)
├── tokio-stream v0.1.2 (*)
├── tokio-util v0.6.2 (*)
│ ├── tokio v1.1.0 (*)
│ ├── tokio-stream v0.1.2 (*)
│ └── tokio v1.1.0 (*)
│ │ └── tokio v1.1.0 (*)
│ ├── tokio v1.1.0 (*)
│ ├── tokio-util v0.6.2 (*)
├── tokio v1.1.0 (*)
├── tokio-util v0.6.2 (*)
cargo tree | grep redis
:
├── bb8-redis v0.8.0
│ └── redis v0.19.0
cargo tree | grep bb8
:
├── bb8 v0.7.0
├── bb8-redis v0.8.0
│ ├── bb8 v0.7.0 (*)
Platform
Linux videotest01.dev.wb.ru 4.19.0-13-cloud-amd64 #1 SMP Debian 4.19.160-2 (2020-11-28) x86_64 GNU/Linux
Description
After moving to Tokio v1.0.1 our "reddis-proxy" service start to fail with "magic" errors:
Response was of incompatible type: "Response type not hashmap compatible" (response was status("PONG"))
This error happens when we call let response: redis::RedisResult<collections::HashMap<String, String>> = conn.hgetall(key).await;
Looks like async tasks mixed somehow and hgetall
get response from PING
request. Do not know where to dig into, will try with newer version of tokio...
We have async fn:
async fn ask_redis(
redis_pool: Pool<RedisConnectionManager>,
) -> Result<_> {
let mut conn = redis_pool.get().await?;
let key = todo!();
let response: redis::RedisResult<collections::HashMap<String, String>> =
conn.hgetall(key).await;
...
}
That called ~10 times per second. And sometimes (in 10-20% cases) this line says: Response was of incompatible type: "Response type not hashmap compatible" (response was status("PONG"))
.
Redis-pool have logic inside, that sends PING
requests to validate connection (I think it do that before each request in my case). So, looks like response on that requests goes in wrong future task... it is the only way (as I think) why hgetall
can receive PONG
string as a response. We do not have those values in redis.
And all works fine before we moved to tokio 1.x
My environment:
code repository: https://github.com/EMayej/rs-bb8-redis
rust version: rustc 1.40.0-nightly (2477e2493 2019-11-04)
I tried to use bb8 with tokio 0.2, when I run the program, it reports
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: SpawnError { is_shutdown: true }', src/libcore/result.rs:1165:5
Below is the output of env RUST_BACKTRACE=full cargo run
, it looks like stack 16 and 17 trigger the panic:
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: SpawnError { is_shutdown: true }', src/libcore/result.rs:1165:5
stack backtrace:
0: 0x109205e45 - backtrace::backtrace::libunwind::trace::h8f761892b88f7dc8
at /Users/runner/.cargo/registry/src/github.com-1ecc6299db9ec823/backtrace-0.3.40/src/backtrace/libunwind.rs:88
1: 0x109205e45 - backtrace::backtrace::trace_unsynchronized::he51ab0165d85becb
at /Users/runner/.cargo/registry/src/github.com-1ecc6299db9ec823/backtrace-0.3.40/src/backtrace/mod.rs:66
2: 0x109205e45 - std::sys_common::backtrace::_print_fmt::hadaad98187d7cb07
at src/libstd/sys_common/backtrace.rs:77
3: 0x109205e45 - <std::sys_common::backtrace::_print::DisplayBacktrace as core::fmt::Display>::fmt::h3c5a272958cd5a04
at src/libstd/sys_common/backtrace.rs:61
4: 0x109220830 - core::fmt::write::hd88fffaf4bc6db48
at src/libcore/fmt/mod.rs:1028
5: 0x109202f1b - std::io::Write::write_fmt::hce06fb5a3353c591
at src/libstd/io/mod.rs:1412
6: 0x109207b83 - std::sys_common::backtrace::_print::h8eacc794e79f0359
at src/libstd/sys_common/backtrace.rs:65
7: 0x109207b83 - std::sys_common::backtrace::print::h969a7cf78dc7e1fd
at src/libstd/sys_common/backtrace.rs:50
8: 0x109207b83 - std::panicking::default_hook::{{closure}}::h847a58d309a7cea4
at src/libstd/panicking.rs:188
9: 0x10920788a - std::panicking::default_hook::h0715616d08c05b60
at src/libstd/panicking.rs:205
10: 0x1092082cb - std::panicking::rust_panic_with_hook::hc7409c536d0d6111
at src/libstd/panicking.rs:464
11: 0x109207e29 - std::panicking::continue_panic_fmt::h2f41864899758f23
at src/libstd/panicking.rs:373
12: 0x109207d29 - rust_begin_unwind
at src/libstd/panicking.rs:302
13: 0x10921ddbc - core::panicking::panic_fmt::hb54121e5882c537a
at src/libcore/panicking.rs:139
14: 0x10921de99 - core::result::unwrap_failed::h0591d7542260370e
at src/libcore/result.rs:1165
15: 0x108f5b05a - core::result::Result<T,E>::unwrap::h733a06e0fd4fa49f
at /rustc/2477e2493e67527fc282c7239e019f7ebd513a1a/src/libcore/result.rs:933
16: 0x108f4b331 - tokio_executor::global::spawn::hf155d52fa6070f70
at /Users/EMayej/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-executor-0.1.8/src/global.rs:165
17: 0x108f4717c - bb8::Pool<M>::new_inner::hfcbe7c2a5fd92ff2
at /Users/EMayej/.cargo/git/checkouts/bb8-176170de965f79ee/0639389/src/lib.rs:664
18: 0x108f4815f - bb8::Builder<M>::build_inner::hca668ca11028a991
at /Users/EMayej/.cargo/git/checkouts/bb8-176170de965f79ee/0639389/src/lib.rs:303
19: 0x108f4830f - bb8::Builder<M>::build::hdbb6f0f1f6320636
at /Users/EMayej/.cargo/git/checkouts/bb8-176170de965f79ee/0639389/src/lib.rs:313
20: 0x108f569e6 - rs_bb8_redis::main::{{closure}}::h26a0c37b0ce9128c
at src/main.rs:8
21: 0x108f60ccf - <std::future::GenFuture<T> as core::future::future::Future>::poll::{{closure}}::hbab42c1f4be7221b
at /rustc/2477e2493e67527fc282c7239e019f7ebd513a1a/src/libstd/future.rs:43
22: 0x108f60b37 - std::future::set_task_context::he6fbccfce8487124
at /rustc/2477e2493e67527fc282c7239e019f7ebd513a1a/src/libstd/future.rs:79
23: 0x108f60c94 - <std::future::GenFuture<T> as core::future::future::Future>::poll::hcab46014fa3e0aa6
at /rustc/2477e2493e67527fc282c7239e019f7ebd513a1a/src/libstd/future.rs:43
24: 0x108f55b52 - tokio_executor::enter::Enter::block_on::h6a3e2db7777c96d8
at /Users/EMayej/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-executor-0.2.0-alpha.6/src/enter.rs:118
25: 0x108f4ae7b - tokio::runtime::threadpool::Runtime::block_on::{{closure}}::{{closure}}::h6d1a9e179b7dbcd8
at /Users/EMayej/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.0-alpha.6/src/runtime/threadpool/mod.rs:178
26: 0x108f613c1 - tracing_core::dispatcher::with_default::hf64f07a03cb40aee
at /Users/EMayej/.cargo/registry/src/github.com-1ecc6299db9ec823/tracing-core-0.1.7/src/dispatcher.rs:226
27: 0x108f4ad52 - tokio::runtime::threadpool::Runtime::block_on::{{closure}}::hce02d37d9193b599
at /Users/EMayej/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.0-alpha.6/src/runtime/threadpool/mod.rs:177
28: 0x108f5735d - tokio_executor::global::with_default::{{closure}}::h309ebf8d3174d7eb
at /Users/EMayej/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-executor-0.2.0-alpha.6/src/global.rs:173
29: 0x108f5cdff - std::thread::local::LocalKey<T>::try_with::h212b5914e2ac2f47
at /rustc/2477e2493e67527fc282c7239e019f7ebd513a1a/src/libstd/thread/local.rs:262
30: 0x108f5cc28 - std::thread::local::LocalKey<T>::with::he0eef222b3d0f0d1
at /rustc/2477e2493e67527fc282c7239e019f7ebd513a1a/src/libstd/thread/local.rs:239
31: 0x108f571c8 - tokio_executor::global::with_default::h8f93a710e16368ec
at /Users/EMayej/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-executor-0.2.0-alpha.6/src/global.rs:147
32: 0x108f4abeb - tokio::runtime::threadpool::Runtime::block_on::h948ab96b64ca55a4
at /Users/EMayej/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.0-alpha.6/src/runtime/threadpool/mod.rs:174
33: 0x108f4cfd6 - rs_bb8_redis::main::h0119947bfd362297
at src/main.rs:4
34: 0x108f6c882 - std::rt::lang_start::{{closure}}::haafd5f87b1839333
at /rustc/2477e2493e67527fc282c7239e019f7ebd513a1a/src/libstd/rt.rs:61
35: 0x109207ca8 - std::rt::lang_start_internal::{{closure}}::h59ed7b0d82d79ae9
at src/libstd/rt.rs:48
36: 0x109207ca8 - std::panicking::try::do_call::h8bf384ff55799751
at src/libstd/panicking.rs:287
37: 0x10920ab8f - __rust_maybe_catch_panic
at src/libpanic_unwind/lib.rs:78
38: 0x10920865e - std::panicking::try::h31172b278baba0d3
at src/libstd/panicking.rs:265
39: 0x10920865e - std::panic::catch_unwind::h4c69a67a533516a8
at src/libstd/panic.rs:396
40: 0x10920865e - std::rt::lang_start_internal::h8b637dc44c57cd2d
at src/libstd/rt.rs:47
41: 0x108f6c862 - std::rt::lang_start::h786ea050cfbeb1f9
at /rustc/2477e2493e67527fc282c7239e019f7ebd513a1a/src/libstd/rt.rs:61
42: 0x108f4d042 - rs_bb8_redis::main::h0119947bfd362297
Is this because tokio-executor 0.1.8 (stack 16) is been used? How could I correctly use them (tokio 0.2 and bbs) together? Does this relate to #31?
Thank you.
The fact that the Drop
implementation uses a block_on
executor strategy, combined with the fact that it blocks until it gets hold of a mutex, seems to be a problem. I had to fork the project and change to a tokio::spawn
-based approach that would return the connection to the pool asynchronously.
My app is retrieving a connection on every web request, and it was basically never responding after 10 requests (the configured minimum pool size). I debugged it to the point to figure out it was infinitely waiting to get()
a connection from the pool.
I did not dig into it further than that quick "bandage fix". Maybe this is something that needs to be addressed generally?
This is the relevant commit: https://github.com/RAnders00/bb8/commit/e456d4c8812adc445c85852b575f728981cb5e0a
could you kindly please add one more example for following use case?
I have struggled for hours but I can't make it work.
I have implemented above use case in r2d2 but I want to change database query to async.
much appreciated!
Hello and thank you for working on this crate!
I am in the process of migrating https://github.com/agersant/polaris to an async web framework. As part of this process, I am evaluating migrating from diesel
+ r2d2
to diesel
+ bb8
to interact with the SQLite database which power this app.
A feature that exists in r2d2
is the possibility to specify a ConnectionCustomizer
which performs initialization work on every connection that a pool creates. For my use case, I set some SQLite PRAGMA
configurations (journaling_mode, enable foreign keys, etc.) on every connection that comes out of the pool.
Could you consider adding a similar feature to bb8
pools?
Many thanks!
Hi!
Depending on settings I want to use Tls or NoTls
. How to specify this type for function that gets Pool
as argument?
Now I do it like this:
fn example<Tls>(
_: Request<Body>,
pool: &Pool<PostgresConnectionManager<Tls>>,
) -> impl Future<Item = Response<Body>, Error = hyper::Error>
where
Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
<Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
<Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
<<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
{
//do something with pool
}
But it looks a bit weird and hard to define this type in many functions. Is there a better solution?
First of all thank you for the recent update to bb8 :-)
I can never turn a conn in into a pubsub connection calling into_pubsub
for some reason. Have you had a chance to have a look pubsub with redis ?
let mut conn = pool.get().await.unwrap();
let conn = conn.as_mut().unwrap();
From redis
let client = redis::Client::open("redis://127.0.0.1/").unwrap();
let mut publish_conn = client.get_async_connection().await?;
let mut pubsub_conn = client.get_async_connection().await?.into_pubsub();
Hello,
I'm trying to make a transaction, but I am hitting a wall and can't figure it out.
Is it possible for an example with transaction and 2 inserts for example?
Would love to use bb8 with the new tokio-postgres v0.4.0-rc.2. Would be great if there were a was a small usage example that you could share in the readme? Great work!
The dedicated_connection()
function returns M::Connection
. For bb8-postgres
, this type maps to the tokio_postgres::Client
type. However, for receiving LISTEN
notifications from postgres
, we need the tokio_postgres::Connection
value. Is there some way to achieve this with bb8
?
Thanks!
Maybe I'm wrong here, but the way I see to prepare statements currently, you would call prepare, get back the statement and use it. I don't see, eyeing the code, that tokio-postgres will do any smart persistence of the prepared statements and not reprepare them until needed (like npgsql for c# for example).
I feel like adding this functionality into bb8 wouldn't be too hard. Potentially the user could specify statements in something like bb8_postgres::PostgresConnectionManager::new_with_prepared
, bb8_postgres::PostgresConnectionManager::connect
could prepare the statements and populate them into a custom type Connection
(that's just a wrapper around the tokio_postgres::Client
) in the ManageConnection
impl that allows the user to access them.
Any thoughts?
First of all - sorry for my ignorance, I'm new to Rust.
In the project I am working we want to use bb8 and we're successfully using it, but something like that:
let fut = self.db_pool
.run(move |mut conn| {
conn.prepare("SELECT channel_id, creator, deposit_asset, deposit_amount, valid_until FROM channels")
.then(move |res| match res {
Ok(stmt) => {
conn
.query(&stmt, &[])
.collect()
.into_future()
.then(|res| match res {
Ok(rows) => Ok((rows, conn)),
Err(err) => Err((err, conn)),
})
.into()
}
Err(err) => try_future!(Err((err, conn))),
})
.and_then(|(rows, conn)| {
let channels = rows.iter().map(|row| {
Channel {
id: row.get("channel_id"),
creator: row.get("creator"),
deposit_asset: row.get("deposit_asset"),
deposit_amount: row.get("deposit_amount"),
valid_until: row.get("valid_until"),
}
}).collect();
Ok((channels, conn))
})
})
.map_err(|err| handle_internal_error(&err));
Looks quite, well - unbreakable. + we cannot use await!()
inside the closure.
From my understanding some things are happening in run()
that handle the dropping/idling of the connection. Isn't it possible that this can be change in such a way that Drop
(or tbh I don't know what exactly) is implemented that run()
can return the connection and we can use it with await!()
, but also as a future and to get rid of the tidies Ok((channels, conn))
returns?
Thank you in advance! I will be happy to help and implement this as long as I understand how.
Given the scope of the project, to ensure production-readiness an adequate testing suite should be added.
cargo tarpaulin
might also give out a coverage report of the testing suite.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.